ヒープと優先度付きキュー ── 「最大/最小を O(1) で取る」
扱う概念:二分ヒープ(Min-Heap / Max-Heap)、配列上のヒープ表現、heapify、Top-K 問題

この章で何ができるようになるか:ヒープの内部構造を図解でき、「ソートせずに Top-K を O(n log k) で求める」テクニックを使えるようになる。
なぜヒープが必要か
「常に最小値(または最大値)を高速に取り出したい」。
ソート済み配列で実現すると:
最小値の取り出し → O(1)(先頭)
新しい要素の挿入 → O(n)(正しい位置に挿入してシフト)
ヒープで実現すると:
最小値の取り出し → O(1)(ルート)+ O(log n)(再構築)
新しい要素の挿入 → O(log n)
→ 挿入が頻繁な場面ではヒープが圧勝
二分ヒープの構造
Min-Heap の条件:全ノードが子より小さい(または等しい)。
1
/ \
3 2
/ \ / \
7 6 5 4
配列表現:[1, 3, 2, 7, 6, 5, 4]
0 1 2 3 4 5 6
親子関係:
親: (i - 1) // 2
左の子: 2 * i + 1
右の子: 2 * i + 2
class MinHeap:
def __init__(self):
self._data = []
def push(self, val):
"""要素を追加 — O(log n)"""
self._data.append(val)
self._sift_up(len(self._data) - 1)
def pop(self) -> int:
"""最小値を取り出し — O(log n)"""
if not self._data:
raise IndexError("Heap is empty")
# ルートと末尾を交換 → 末尾を削除 → ルートから下にsift
self._data[0], self._data[-1] = self._data[-1], self._data[0]
min_val = self._data.pop()
if self._data:
self._sift_down(0)
return min_val
def peek(self) -> int:
"""最小値を参照 — O(1)"""
return self._data[0]
def _sift_up(self, i):
"""子から親に向かって修正"""
while i > 0:
parent = (i - 1) // 2
if self._data[i] < self._data[parent]:
self._data[i], self._data[parent] = self._data[parent], self._data[i]
i = parent
else:
break
def _sift_down(self, i):
"""親から子に向かって修正"""
n = len(self._data)
while True:
smallest = i
left = 2 * i + 1
right = 2 * i + 2
if left < n and self._data[left] < self._data[smallest]:
smallest = left
if right < n and self._data[right] < self._data[smallest]:
smallest = right
if smallest != i:
self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
i = smallest
else:
break
def __len__(self):
return len(self._data)
heapify:配列をヒープに変換
n 個の要素をヒープに入れる方法:
方法1: 1つずつ push → O(n log n)
方法2: heapify(ボトムアップ構築)→ O(n)
def heapify(arr: list):
"""配列をその場で Min-Heap に変換 — O(n)"""
n = len(arr)
# 最後の非葉ノードから逆順に sift_down
for i in range(n // 2 - 1, -1, -1):
_sift_down(arr, i, n)
def _sift_down(arr, i, n):
while True:
smallest = i
left, right = 2 * i + 1, 2 * i + 2
if left < n and arr[left] < arr[smallest]:
smallest = left
if right < n and arr[right] < arr[smallest]:
smallest = right
if smallest != i:
arr[i], arr[smallest] = arr[smallest], arr[i]
i = smallest
else:
break
なぜ O(n) なのか:葉ノード(全体の半分)は sift_down が不要。高さ h のノードの sift_down は O(h) だが、高さが高いノードは少ない。合計は O(n)。
Python の heapq
import heapq
# Min-Heap 操作
nums = [5, 3, 1, 4, 2]
heapq.heapify(nums) # [1, 2, 5, 4, 3] — O(n)
heapq.heappush(nums, 0) # [0, 2, 1, 4, 3, 5] — O(log n)
smallest = heapq.heappop(nums) # 0 — O(log n)
# Max-Heap(符号を反転して代用)
max_heap = []
for x in [5, 3, 1, 4, 2]:
heapq.heappush(max_heap, -x) # -5, -3, -1, -4, -2
largest = -heapq.heappop(max_heap) # 5
# Top-K: n 個から上位 k 個を取得
top_3 = heapq.nlargest(3, [5, 3, 1, 4, 2]) # [5, 4, 3]
面接頻出:Top-K 問題
「100万件のデータから上位10件を効率的に取得する」
def top_k_largest(nums: list[int], k: int) -> list[int]:
"""
Min-Heap をサイズ k で維持する
→ ヒープの中には常に「今まで見た中で最大の k 個」が入っている
→ ヒープのルート(最小値)= k 番目に大きい値
"""
heap = []
for num in nums:
if len(heap) < k:
heapq.heappush(heap, num)
elif num > heap[0]: # ルート(現在の k 番目)より大きければ
heapq.heapreplace(heap, num) # ルートを追い出して挿入
return sorted(heap, reverse=True)
# O(n log k) — n 件全てをソート O(n log n) より高速(k << n のとき)
K番目に大きい要素(Kth Largest)
def find_kth_largest(nums: list[int], k: int) -> int:
"""サイズ k の Min-Heap を使う"""
heap = nums[:k]
heapq.heapify(heap)
for num in nums[k:]:
if num > heap[0]:
heapq.heapreplace(heap, num)
return heap[0] # k 番目に大きい値 = Min-Heap のルート
メディアンの動的計算(2つのヒープ)
class MedianFinder:
"""ストリームデータのメディアンをリアルタイムに求める"""
def __init__(self):
self.lo = [] # Max-Heap(小さい半分)
self.hi = [] # Min-Heap(大きい半分)
def add_num(self, num: int):
heapq.heappush(self.lo, -num) # Max-Heap に追加
# lo の最大値を hi へ移動(lo の最大 ≤ hi の最小を保証)
heapq.heappush(self.hi, -heapq.heappop(self.lo))
# サイズ調整(lo のサイズ ≥ hi のサイズ)
if len(self.hi) > len(self.lo):
heapq.heappush(self.lo, -heapq.heappop(self.hi))
def find_median(self) -> float:
if len(self.lo) > len(self.hi):
return -self.lo[0]
return (-self.lo[0] + self.hi[0]) / 2
ヒープソート
ヒープを使ったソート。O(n log n)・インプレース(追加メモリ O(1))。
def heap_sort(arr: list):
n = len(arr)
# Max-Heap を構築
for i in range(n // 2 - 1, -1, -1):
_sift_down_max(arr, i, n)
# 最大値を末尾に移動し、ヒープサイズを縮小
for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0]
_sift_down_max(arr, 0, i)
def _sift_down_max(arr, i, n):
while True:
largest = i
left, right = 2 * i + 1, 2 * i + 2
if left < n and arr[left] > arr[largest]:
largest = left
if right < n and arr[right] > arr[largest]:
largest = right
if largest != i:
arr[i], arr[largest] = arr[largest], arr[i]
i = largest
else:
break
まとめ
| 操作 | 計算量 | 説明 |
|---|---|---|
| peek (min/max) | O(1) | ルートを参照するだけ |
| push | O(log n) | 末尾に追加 → sift up |
| pop | O(log n) | ルートと末尾を交換 → sift down |
| heapify | O(n) | ボトムアップ構築 |
| Top-K | O(n log k) | サイズ k の Min-Heap を維持 |
ヒープは「ソートほど厳密な順序は要らないが、最大/最小だけは常に知りたい」場面で最適。Dijkstra の優先度付きキュー、Kafka のメッセージスケジューリング、OS のプロセススケジューラなど、実務での登場頻度は非常に高い。