目次を表示する

CS基礎:計算量とデータ構造

ヒープと優先度付きキュー ── 「最大/最小を O(1) で取る」

ヒープと優先度付きキュー ── 「最大/最小を O(1) で取る」

扱う概念:二分ヒープ(Min-Heap / Max-Heap)、配列上のヒープ表現、heapify、Top-K 問題


ヒープと優先度付きキュー — Min-Heap・Top-K・メディアン

この章で何ができるようになるか:ヒープの内部構造を図解でき、「ソートせずに Top-K を O(n log k) で求める」テクニックを使えるようになる。


なぜヒープが必要か

「常に最小値(または最大値)を高速に取り出したい」。

ソート済み配列で実現すると:
  最小値の取り出し → O(1)(先頭)
  新しい要素の挿入 → O(n)(正しい位置に挿入してシフト)

ヒープで実現すると:
  最小値の取り出し → O(1)(ルート)+ O(log n)(再構築)
  新しい要素の挿入 → O(log n)

→ 挿入が頻繁な場面ではヒープが圧勝

二分ヒープの構造

Min-Heap の条件:全ノードが子より小さい(または等しい)。

      1
     / \
    3   2
   / \ / \
  7  6 5  4

配列表現:[1, 3, 2, 7, 6, 5, 4]
           0  1  2  3  4  5  6

親子関係:
  親:     (i - 1) // 2
  左の子: 2 * i + 1
  右の子: 2 * i + 2
class MinHeap:
    def __init__(self):
        self._data = []

    def push(self, val):
        """要素を追加 — O(log n)"""
        self._data.append(val)
        self._sift_up(len(self._data) - 1)

    def pop(self) -> int:
        """最小値を取り出し — O(log n)"""
        if not self._data:
            raise IndexError("Heap is empty")
        # ルートと末尾を交換 → 末尾を削除 → ルートから下にsift
        self._data[0], self._data[-1] = self._data[-1], self._data[0]
        min_val = self._data.pop()
        if self._data:
            self._sift_down(0)
        return min_val

    def peek(self) -> int:
        """最小値を参照 — O(1)"""
        return self._data[0]

    def _sift_up(self, i):
        """子から親に向かって修正"""
        while i > 0:
            parent = (i - 1) // 2
            if self._data[i] < self._data[parent]:
                self._data[i], self._data[parent] = self._data[parent], self._data[i]
                i = parent
            else:
                break

    def _sift_down(self, i):
        """親から子に向かって修正"""
        n = len(self._data)
        while True:
            smallest = i
            left = 2 * i + 1
            right = 2 * i + 2

            if left < n and self._data[left] < self._data[smallest]:
                smallest = left
            if right < n and self._data[right] < self._data[smallest]:
                smallest = right

            if smallest != i:
                self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
                i = smallest
            else:
                break

    def __len__(self):
        return len(self._data)

heapify:配列をヒープに変換

n 個の要素をヒープに入れる方法:

方法1: 1つずつ push → O(n log n)
方法2: heapify(ボトムアップ構築)→ O(n)
def heapify(arr: list):
    """配列をその場で Min-Heap に変換 — O(n)"""
    n = len(arr)
    # 最後の非葉ノードから逆順に sift_down
    for i in range(n // 2 - 1, -1, -1):
        _sift_down(arr, i, n)

def _sift_down(arr, i, n):
    while True:
        smallest = i
        left, right = 2 * i + 1, 2 * i + 2
        if left < n and arr[left] < arr[smallest]:
            smallest = left
        if right < n and arr[right] < arr[smallest]:
            smallest = right
        if smallest != i:
            arr[i], arr[smallest] = arr[smallest], arr[i]
            i = smallest
        else:
            break

なぜ O(n) なのか:葉ノード(全体の半分)は sift_down が不要。高さ h のノードの sift_down は O(h) だが、高さが高いノードは少ない。合計は O(n)。


Python の heapq

import heapq

# Min-Heap 操作
nums = [5, 3, 1, 4, 2]
heapq.heapify(nums)           # [1, 2, 5, 4, 3] — O(n)
heapq.heappush(nums, 0)       # [0, 2, 1, 4, 3, 5] — O(log n)
smallest = heapq.heappop(nums) # 0 — O(log n)

# Max-Heap(符号を反転して代用)
max_heap = []
for x in [5, 3, 1, 4, 2]:
    heapq.heappush(max_heap, -x)  # -5, -3, -1, -4, -2
largest = -heapq.heappop(max_heap)  # 5

# Top-K: n 個から上位 k 個を取得
top_3 = heapq.nlargest(3, [5, 3, 1, 4, 2])  # [5, 4, 3]

面接頻出:Top-K 問題

「100万件のデータから上位10件を効率的に取得する」

def top_k_largest(nums: list[int], k: int) -> list[int]:
    """
    Min-Heap をサイズ k で維持する
    → ヒープの中には常に「今まで見た中で最大の k 個」が入っている
    → ヒープのルート(最小値)= k 番目に大きい値
    """
    heap = []
    for num in nums:
        if len(heap) < k:
            heapq.heappush(heap, num)
        elif num > heap[0]:  # ルート(現在の k 番目)より大きければ
            heapq.heapreplace(heap, num)  # ルートを追い出して挿入
    return sorted(heap, reverse=True)

# O(n log k) — n 件全てをソート O(n log n) より高速(k << n のとき)

K番目に大きい要素(Kth Largest)

def find_kth_largest(nums: list[int], k: int) -> int:
    """サイズ k の Min-Heap を使う"""
    heap = nums[:k]
    heapq.heapify(heap)
    for num in nums[k:]:
        if num > heap[0]:
            heapq.heapreplace(heap, num)
    return heap[0]  # k 番目に大きい値 = Min-Heap のルート

メディアンの動的計算(2つのヒープ)

class MedianFinder:
    """ストリームデータのメディアンをリアルタイムに求める"""
    def __init__(self):
        self.lo = []  # Max-Heap(小さい半分)
        self.hi = []  # Min-Heap(大きい半分)

    def add_num(self, num: int):
        heapq.heappush(self.lo, -num)  # Max-Heap に追加
        # lo の最大値を hi へ移動(lo の最大 ≤ hi の最小を保証)
        heapq.heappush(self.hi, -heapq.heappop(self.lo))
        # サイズ調整(lo のサイズ ≥ hi のサイズ)
        if len(self.hi) > len(self.lo):
            heapq.heappush(self.lo, -heapq.heappop(self.hi))

    def find_median(self) -> float:
        if len(self.lo) > len(self.hi):
            return -self.lo[0]
        return (-self.lo[0] + self.hi[0]) / 2

ヒープソート

ヒープを使ったソート。O(n log n)・インプレース(追加メモリ O(1))。

def heap_sort(arr: list):
    n = len(arr)
    # Max-Heap を構築
    for i in range(n // 2 - 1, -1, -1):
        _sift_down_max(arr, i, n)

    # 最大値を末尾に移動し、ヒープサイズを縮小
    for i in range(n - 1, 0, -1):
        arr[0], arr[i] = arr[i], arr[0]
        _sift_down_max(arr, 0, i)

def _sift_down_max(arr, i, n):
    while True:
        largest = i
        left, right = 2 * i + 1, 2 * i + 2
        if left < n and arr[left] > arr[largest]:
            largest = left
        if right < n and arr[right] > arr[largest]:
            largest = right
        if largest != i:
            arr[i], arr[largest] = arr[largest], arr[i]
            i = largest
        else:
            break

まとめ

操作計算量説明
peek (min/max)O(1)ルートを参照するだけ
pushO(log n)末尾に追加 → sift up
popO(log n)ルートと末尾を交換 → sift down
heapifyO(n)ボトムアップ構築
Top-KO(n log k)サイズ k の Min-Heap を維持

ヒープは「ソートほど厳密な順序は要らないが、最大/最小だけは常に知りたい」場面で最適。Dijkstra の優先度付きキュー、Kafka のメッセージスケジューリング、OS のプロセススケジューラなど、実務での登場頻度は非常に高い。