8.堆

1.堆

  • (heap)是一种满足特定条件的完全二叉树
    • 小顶堆 (min heap):任意节点的值 \leq 其子节点的值。
    • 大顶堆 (max heap):任意节点的值 \geq 其子节点的值。
flowchart TB
	subgraph 大顶堆
	direction TB
	9'[9] --> 8'[8]
	9' --> 6'''[6]
	8' --> 6''''[6]
	6'''' --> 1'[1]
	6'''' --> 4'[4]
	8' --> 7'[7]
	7' --> 3'[3]
	7' --> 6'''''[6]
	6''' --> 5'[5]
    6''' --> 2''[2]
    5' --> 2'''[2]
	end

	subgraph 小顶堆
	direction TB
	1[1] --> 3[3]
	1 --> 2[2]
	3 --> 6[6]
	6 --> 8[8]
	6 --> 7[7]
	3 --> 4[4]
	4 --> 9[9]
	4 --> 6''[6]
	2 --> 2'[2]
	2 --> 6'[6]
	2' --> 5[5]	
	end
  • 堆是完全二叉树的一个特例
    • 最底层节点靠左填充其他层 的节点都被填满
    • 我们将二叉树的根节点 称为堆顶 ,将底层最靠右 的节点称为堆底
    • 对于大顶堆(小顶堆),堆顶元素(根节点)的值是最大(最小)的。

1.堆的常用操作

  • 许多编程语言提供的是优先队列 (priority queue),这是一种抽象的数据结构,定义为具有优先级排序 的队列。

  • 堆通常用于实现优先队列,大顶堆相当于元素按从大到小的顺序出队的优先队列。从使用角度来看,可以将优先队列 看作等价 的数据结构

方法名 描述 时间复杂度
push() 元素入堆 𝑂(log 𝑛)
pop() 堆顶元素出堆 𝑂(log 𝑛)
peek() 访问堆顶元素(对于大 / 小顶堆分别为最大 / 小值) 𝑂(1)
size() 获取堆的元素数量 𝑂(1)
isEmpty() 判断堆是否为空 𝑂(1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 初始化小顶堆
min_heap, flag = [], 1
# 初始化大顶堆
max_heap, flag = [], -1

# Python 的 heapq 模块默认实现小顶堆
# 考虑将“元素取负”后再入堆,这样就可以将大小关系颠倒,从而实现大顶堆
# 在本示例中,flag = 1 时对应小顶堆,flag = -1 时对应大顶堆

# 元素入堆
heapq.heappush(max_heap, flag * 1)
heapq.heappush(max_heap, flag * 3)
heapq.heappush(max_heap, flag * 2)
heapq.heappush(max_heap, flag * 5)
heapq.heappush(max_heap, flag * 4)

# 获取堆顶元素
peek: int = flag * max_heap[0] # 5

# 堆顶元素出堆
# 出堆元素会形成一个从大到小的序列
val = flag * heapq.heappop(max_heap) # 5
val = flag * heapq.heappop(max_heap) # 4
val = flag * heapq.heappop(max_heap) # 3
val = flag * heapq.heappop(max_heap) # 2
val = flag * heapq.heappop(max_heap) # 1

# 获取堆大小
size: int = len(max_heap)

# 判断堆是否为空
is_empty: bool = not max_heap

# 输入列表并建堆
min_heap: list[int] = [1, 3, 2, 5, 4]
heapq.heapify(min_heap)

2.堆的实现

# 若要将大顶堆转换为小顶堆,只需将大小逻辑判断进行逆转(即将\geq替换为\leq

1.堆的存储与表示
  • 由于完全二叉树适合用数组来表示,所以采用数组来存储堆
  • 当使用数组表示二叉树时,元素代表节点值,索引代表节点在二叉树中的位置。节点指针通过索引映射公式来实现
1
2
3
4
5
6
7
8
9
10
11
def left(self, i: int) -> int:
""" 获取左子节点的索引"""
return 2 * i + 1

def right(self, i: int) -> int:
""" 获取右子节点的索引"""
return 2 * i + 2

def parent(self, i: int) -> int:
""" 获取父节点的索引"""
return (i - 1) // 2 # 向下整除

# 将映射公式封装成函数

2.访问堆顶元素
1
2
3
def peek(self) -> int:
""" 访问堆顶元素"""
return self.max_heap[0]
3.元素入堆
  • 先将元素加入至堆底,但是加入的元素可能会破坏堆成立的条件,所以要修复插入节点到根节点的路径上的各个节点,这个操作被称为堆化

  • 从入堆节点开始。从底部至顶部 执行堆化,比较插入节点与其父节点的值 ,如果插入节点更大 ,则将它们交换 。然后继续执行此操作,从底至顶修复堆中的各个节点,直至越过根节点或遇到无须交换的节点时 结束。

flowchart LR
	subgraph 入堆
	direction TB
	9a[9] --> 8a[8]
	9a --> 6a[6]
	8a --> 6b[6]
	6b --> 1a[1]
	6b --> 4a[4]
	8a --> 7a[7]
	7a --> 3a[3]
	7a --> 6c[6]
	6a --> 5a[5]
    6a --> 2a[2]
    5a --> 2b[2]
    5a --> 7b[7]
	end

	subgraph 原堆
	direction TB
	9[9] --> 8[8]
	9 --> 6[6]
	8 --> 6'[6]
	6' --> 1[1]
	6' --> 4[4]
	8 --> 7[7]
	7 --> 3[3]
	7 --> 6''[6]
	6 --> 5[5]
    6 --> 2[2]
    5 --> 2'[2]
	end
	
	原堆 --> 入堆
flowchart LR
	subgraph 比较
	direction TB
	9a[9] --> 8a[8]
	9a --> 6a[6]
	8a --> 6b[6]
	6b --> 1a[1]
	6b --> 4a[4]
	8a --> 7a[7]
	7a --> 3a[3]
	7a --> 6c[6]
	6a --> 5a[5]
    6a --> 2a[2]
    5a --> 2b[2]
    5a --> 7b["7
    (与父节点比较)"]
	end

	subgraph 交换
	direction TB
	9[9] --> 8[8]
	9 --> 6[6]
	8 --> 6'[6]
	6' --> 1[1]
	6' --> 4[4]
	8 --> 7[7]
	7 --> 3[3]
	7 --> 6''[6]
	6 --> 7'[7]
    6 --> 2[2]
    7' --> 2'[2]
    subgraph 交换节点
    7' --> 5[5]
    end
	end
	
	比较 --> 交换
flowchart LR
	subgraph 比较
	direction TB
	9a[9] --> 8a[8]
	9a --> 6a[6]
	8a --> 6b[6]
	6b --> 1a[1]
	6b --> 4a[4]
	8a --> 7a[7]
	7a --> 3a[3]
	7a --> 6c[6]
	6a --> 7b["7
	(与父节点比较)"]
    6a --> 2a[2]
    7b --> 2b[2]
    7b --> 5a[5]
	end
	
	subgraph 交换
	direction TB
	9[9] --> 8[8]
	9 --> 7'[7]
	8 --> 6'[6]
	6' --> 1[1]
	6' --> 4[4]
	8 --> 7[7]
	7 --> 3[3]
	7 --> 6''[6]
	subgraph 交换节点
	7' --> 6[6]
	end
    6 --> 2[2]
    6 --> 5[5]
    7' --> 2'[2]
	end
	
	比较 --> 交换
flowchart LR
	subgraph 比较
	direction TB
	9a[9] --> 8a[8]
	9a --> 7b["7
	(与父节点比较)"]
	8a --> 6b[6]
	6b --> 1a[1]
	6b --> 4a[4]
	8a --> 7a[7]
	7a --> 3a[3]
	7a --> 6c[6]
	7b --> 6a[6]
    6a --> 2a[2]
    6a --> 5a[5]
    7b --> 2b[2]
	end
	
	subgraph 不执行交换
	direction TB
	9[9] --> 8[8]
	9 --> 7'[7]
	8 --> 6'[6]
	6' --> 1[1]
	6' --> 4[4]
	8 --> 7[7]
	7 --> 3[3]
	7 --> 6''[6]
	7' --> 6[6]
    6 --> 2[2]
    6 --> 5[5]
    7' --> 2'[2]
	end
	
	比较 --> 不执行交换

# 设节点总数为 𝑛 ,则树的高度为 𝑂(log 𝑛) 。由此可知,堆化操作的循环轮数最多为 𝑂(log 𝑛) ,元素入堆操作的时间复杂度为𝑂(log 𝑛)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def push(self, val: int):
""" 元素入堆"""
# 添加节点
self.max_heap.append(val)
# 从底至顶堆化
self.sift_up(self.size() - 1)

def sift_up(self, i: int):
""" 从节点 i 开始,从底至顶堆化"""
while True:
# 获取节点 i 的父节点
p = self.parent(i)
# 当“越过根节点”或“节点无须修复”时,结束堆化
if p < 0 or self.max_heap[i] <= self.max_heap[p]:
break
# 交换两节点
self.swap(i, p)
# 循环向上堆化
i = p
4.堆顶元素出堆
  • 堆顶元素是二叉树的根节点,即列表首元素。如果直接从列表中删除首元素,那么二叉树中所有节点的索引都会发生变化,这将使得后续使用堆化进行修复变得困难。为了尽量减少元素索引的变动,所以采用以下操作步骤。
    • 交换堆顶元素与堆底元素 (交换根节点与最右叶节点)。
    • 交换完成后,将堆底从列表中删除(注意,由于已经交换,因此实际上删除的是原来的堆顶元素)。
    • 从根节点开始,从顶至底执行堆化
flowchart	LR
	subgraph 交换堆顶堆底元素
	5a[5] --> 8a[8]
	5a --> 7b[7]
	8a --> 6b[6]
	6b --> 1a[1]
	6b --> 4a[4]
	8a --> 7a[7]
	7a --> 3a[3]
	7a --> 6c[6]
	7b --> 6a[6]
    6a --> 2a[2]
    6a --> 9a[9]
    7b --> 2b[2]
	end
	style 5a fill:#0ff
	style 9a fill:#0ff
	
	subgraph 原堆
	9[9] --> 8[8]
	9 --> 7'[7]
	8 --> 6'[6]
	6' --> 1[1]
	6' --> 4[4]
	8 --> 7[7]
	7 --> 3[3]
	7 --> 6''[6]
	7' --> 6[6]
    6 --> 2[2]
    6 --> 5[5]
    7' --> 2'[2]
    end
    style 9 fill:#0ff
    
    原堆 --> 交换堆顶堆底元素
flowchart	LR
	subgraph 删除当前堆底元素
	5a[5] --> 8a[8]
	5a --> 7b[7]
	8a --> 6b[6]
	6b --> 1a[1]
	6b --> 4a[4]
	8a --> 7a[7]
	7a --> 3a[3]
	7a --> 6c[6]
	7b --> 6a[6]
    6a --> 2a[2]
    7b --> 2b[2]
	end
	style 5a fill:#0ff
	
	subgraph 堆化
	subgraph A[与子节点进行比较,与较大的节点交换]
	5'[5] --> 8'[8]
	5' --> 7''[7]
	end
	8' --> 6''[6]
	6'' --> 1'[1]
	6'' --> 4[4]
	8' --> 7'[7]
	7' --> 3'[3]
	7' --> 6'''[6]
	7'' --> 6'[6]
    6' --> 2'[2]
    7'' --> 2''[2]
	end
	style 5' fill:#0ff
	
	删除当前堆底元素 --> 堆化
flowchart	LR
	subgraph 交换节点
	subgraph 交换5和8
	8a[8] --> 5a[5]
	end
	8a --> 7b[7]
	5a --> 6b[6]
	6b --> 1a[1]
	6b --> 4a[4]
	5a --> 7a[7]
	7a --> 3a[3]
	7a --> 6c[6]
	7b --> 6a[6]
    6a --> 2a[2]
    7b --> 2b[2]
	end
	style 5a fill:#0ff
	
	subgraph 堆化
	8'[8] --> 5'[5]
	8' --> 7''[7]
	subgraph A[与子节点进行比较,与较大的节点交换]
	5' --> 6''[6]
	5' --> 7'[7]
	end
	6'' --> 1'[1]
	6'' --> 4'[4]
	7' --> 3'[3]
	7' --> 6'''[6]
	7'' --> 6'[6]
	7'' --> 2''[2]
    6' --> 2'[2]
    
	end
	style 5' fill:#0ff
	
	交换节点 --> 堆化
flowchart LR	
	subgraph 交换节点
	8'[8] --> 7'[7]
	8' --> 7''[7]
	7' --> 6''[6]
	subgraph 交换5和7
	7' --> 5'[5]
	end
	6'' --> 1'[1]
	6'' --> 4'[4]
	5' --> 3'[3]
	5' --> 6'''[6]
	7'' --> 6'[6]
	7'' --> 2''[2]
    6' --> 2'[2]
	end
	style 5' fill:#0ff
	
	subgraph 堆化
	8a[8] --> 7a[7]
	8a --> 7b[7]
	7a --> 6b[6]
	7a --> 5a[5]
	6b --> 1a[1]
	6b --> 4a[4]
	subgraph A[与子节点进行比较,与较大的节点交换]
	5a --> 3a[3]
	5a --> 6c[6]
	end
	7b --> 2b[2]
	7b --> 6a[6]
    6a --> 2a[2]
	end
	style 5a fill:#0ff
	
	交换节点 --> 堆化
flowchart LR	
	subgraph 交换节点
	8a[8] --> 7a[7]
	8a --> 7b[7]
	7a --> 6b[6]
	7a --> 6c[6]
	6b --> 1a[1]
	6b --> 4a[4]
	6c --> 3a[3]
	subgraph 交换5和6
	6c --> 5a[5]
	end
	7b --> 6a[6]
	7b --> 2b[2]
    6a --> 2a[2]
	end
	style 5a fill:#0ff
	
	subgraph 完成出堆
	8'[8] --> 7'[7]
	8' --> 7''[7]
	7' --> 6''[6]
	7' --> 6'''[6]
	6'' --> 1'[1]
	6'' --> 4'[4]
	6''' --> 3'[3]
	6''' --> 5'[5]
	7'' --> 6'[6]
	7'' --> 2''[2]
    6' --> 2'[2]
	end
	style 5' fill:#0ff
	
	交换节点 --> 完成出堆
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def pop(self) -> int:
""" 元素出堆"""
# 判空处理
if self.is_empty():
raise IndexError(" 堆为空")
# 交换根节点与最右叶节点(交换首元素与尾元素)
self.swap(0, self.size() - 1)
# 删除节点
val = self.max_heap.pop()
# 从顶至底堆化
self.sift_down(0)
# 返回堆顶元素
return val

def sift_down(self, i: int):
""" 从节点 i 开始,从顶至底堆化"""
while True:
# 判断节点 i, l, r 中值最大的节点,记为 ma
l, r, ma = self.left(i), self.right(i), i
if l < self.size() and self.max_heap[l] > self.max_heap[ma]:
ma = l
if r < self.size() and self.max_heap[r] > self.max_heap[ma]:
ma = r
# 若节点 i 最大或索引 l, r 越界,则无须继续堆化,跳出
if ma == i:
break
# 交换两节点
self.swap(i, ma)
# 循环向下堆化
i = ma

# 与元素入堆操作相似,堆顶元素出堆操作的时间复杂度也为 𝑂(log 𝑛)

3.堆的常见应用

  • 优先队列 :堆通常作为实现优先队列的首选数据结构,其入队和出队操作的时间复杂度均为 𝑂(log 𝑛),而建堆操作为 𝑂(𝑛) ,这些操作都非常高效。
  • 堆排序 :给定一组数据,可以用它们建立一个堆,然后不断地执行元素出堆操作,从而得到有序数据 。然而,通常会使用一种更优雅的方式实现堆排序
  • 获取最大的𝑘个元素 :一个经典的算法问题,同时也是一种典型应用,例如选择热度前 10 的新闻作为微博热搜,选取销量前 10 的商品等。

2.建堆操作

  • 当需要用一个列表中的元素来构建一个堆的时候,这个过程被称为建堆操作

1.借助入堆操作实现

  • 首先创建一个空堆 ,然后遍历列表 ,依次对每个元素执行入堆操作 ,即先将元素添加至堆的尾部 ,再对该元素执行从底至顶堆化

  • 每当一个元素入堆,堆的长度就加一。由于节点是从顶到底依次被添加进二叉树的,因此堆是自上而下 构建的。

  • 设元素数量为 𝑛 ,每个元素的入堆操作使用 𝑂(log 𝑛) 时间,因此该建堆方法的时间复杂度为 𝑂(𝑛 log 𝑛)。

2.通过遍历堆化实现

  • 实际上,可以实现一种更为高效的建堆方法,共分为两步。
    • 将列表所有元素原封不动地添加到堆中,此时堆的性质尚未得到满足。
    • 倒序遍历堆(层序遍历的倒序),依次对每个非叶节点执行从顶至底堆化
  • 每当堆化一个节点后,以该节点为根节点的子树就形成一个合法的子堆。而由于是倒序遍历,因此堆是自下而上 构建的。
  • 之所以选择倒序遍历,是因为这样能够保证当前节点之下的子树已经是合法的子堆,这样堆化当前节点才是有效的。
  • 由于叶节点没有子节点,因此它们天然就是合法的子堆,无须堆化
1
2
3
4
5
6
7
def __init__(self, nums: list[int]):
""" 构造方法,根据输入列表建堆"""
# 将列表元素原封不动添加进堆
self.max_heap = nums
# 堆化除叶节点以外的其他所有节点
for i in range(self.parent(self.size() - 1), -1, -1):
self.sift_down(i)

3.复杂度分析

  • 计算通过遍历堆化实现的建堆操作的时间复杂度

    • 假设完全二叉树的节点数量为 𝑛 ,则叶节点数量为 (𝑛 + 1)/2 ,其中 / 为向下整除。因此需要堆化的节点数量为 (𝑛 − 1)/2 。
    • 在从顶至底堆化的过程中,每个节点最多堆化到叶节点,因此最大迭代次数为二叉树高度 log 𝑛 。
  • 将两者相乘,可得到建堆过程的时间复杂度为 𝑂(𝑛 log 𝑛) 。但这个估算结果并不准确,因为没有考虑到二叉树底层节点数量远多于顶层节点的性质

  • 通过验证完美二叉树来获得其时间复杂度

    • 从顶至底堆化的最大迭代次数等于其节点到叶节点的距离,即节点的高度
    • 可以对各层的节点数量 × 节点高度 求和,得到所有节点的堆化迭代次数的总和
      • T(h)=20h+21(h1)+22(h2)++2(h1)×1T(ℎ) = 2^0ℎ + 2^1 (ℎ − 1) + 2^2 (ℎ − 2) + ⋯ + 2^{(ℎ−1)} × 1
    • 再错位相加法
      • 2TT2T-T𝑇(h)=20h+21+22++2h1+2h𝑇(ℎ) = −2^0ℎ + 2^1 + 2^2 + ⋯ + 2^{ℎ−1} + 2^ℎ
    • 得到的是一个等比数列
    • 直接使用求和公式得到时间复杂度
      • T(h)=212h12h=2h+1h2=O(2h)T(h)=2\frac{1-2^h}{1-2}-h=2^{h+1}-h-2=O(2^h)
    • 高度为 ℎ 的完美二叉树的节点数量为 𝑛=2h+11𝑛 = 2^{ℎ+1} − 1 ,易得复杂度为 𝑂(2h)=𝑂(𝑛)𝑂(2^ℎ ) = 𝑂(𝑛) 。以上推算表明,输入列表并建堆的时间复杂度为 𝑂(𝑛) ,非常高效

3.Top-k问题

  • 当给定一个长度为n的无序数组,要求返回最大的k个元素

1.遍历选择

  • 直接遍历,在每一轮提取出第1,2,3,……,k大的元素
  • 但是这种方法时间复杂度为 𝑂(𝑛𝑘) 。只适用于 𝑘远小于 𝑛 的情况,因为当 𝑘 与 𝑛 比较接近时,其时间复杂度趋向于 𝑂(𝑛2)𝑂(𝑛^2 ) ,非常耗时。

# 当 𝑘 = 𝑛 时,就是要得到完整的有序序列,此时等价于选择排序 算法

2.排序

  • 可以先对数组进行排序,再返回最右边的 𝑘 个元素,时间复杂度为 𝑂(𝑛 log 𝑛) 。

3.堆

  • 初始化一个小顶堆 ,即其堆顶元素最小。

  • 先将数组的前 𝑘 个元素依次入堆。

  • 从第 𝑘 + 1 个元素开始,若当前元素大于堆顶元素,则将堆顶元素出堆,并将当前元素入堆。

  • 遍历完成后,堆中保存的就是最大的 𝑘 个元素。

  • 例:给定数组:1,7,6,3,2

  • k=3

flowchart LR
	subgraph A[将前k个元素入堆]
	1[1]
	end
	
	subgraph B[将前k个元素入堆]
	1a[1] --> 7a[7]
	end
	
	subgraph C[将前k个元素入堆]
	1'[1] --> 7'[7]
	1' --> 6'[6]
	end
	
	A --> B --> C
flowchart LR
	subgraph A[判断当前元素即第四个元素大于堆顶元素]
	1[1] --> 7[7]
	1 --> 6'[6]
	end
	
	subgraph B[将堆顶元素出堆]
	6a[6] --> 7a[7]
	end
	
	subgraph C[将当前元素入堆]
	3'[3] --> 7'[7]
	3' --> 6''[6]
	end
	
	A --> B --> C
flowchart LR
	subgraph A[判断当前元素即第五个元素小于堆顶元素,则跳过]
	3'[3] --> 7'[7]
	3' --> 6''[6]
	end
	
	subgraph B[完成遍历,返回堆这个数组即可]
	3a[3] --> 7[7]
	3a --> 6a[6]
	end
	
	A --> B
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def top_k_heap(nums: list[int], k: int) -> list[int]:
""" 基于堆查找数组中最大的 k 个元素"""
# 初始化小顶堆
heap = []
# 将数组的前 k 个元素入堆
for i in range(k):
heapq.heappush(heap, nums[i])
# 从第 k+1 个元素开始,保持堆的长度为 k
for i in range(k, len(nums)):
# 若当前元素大于堆顶元素,则将堆顶元素出堆、当前元素入堆
if nums[i] > heap[0]:
heapq.heappop(heap)
heapq.heappush(heap, nums[i])
return heap
  • 总共执行了 𝑛 轮入堆和出堆,堆的最大长度为 𝑘 ,因此时间复杂度为 𝑂(𝑛 log 𝑘) 。该方法的效率很高,当𝑘 较小时,时间复杂度趋向 𝑂(𝑛) ;当 𝑘 较大时,时间复杂度不会超过 𝑂(𝑛 log 𝑛) 。
  • 另外,该方法适用于动态数据流的使用场景。在不断加入数据时,可以持续维护堆内的元素,从而实现最大的 𝑘 个元素的动态更新。