目次を表示する

CS基礎:計算量とデータ構造

確率的データ構造 ── Bloom Filter・HyperLogLog・Count-Min Sketch

確率的データ構造 ── Bloom Filter・HyperLogLog・Count-Min Sketch

扱う概念:確率的判定、False Positive / False Negative、ビット配列、カーディナリティ推定、頻度推定


確率的データ構造 — Bloom Filter・HyperLogLog・Count-Min Sketch

この章で何ができるようになるか:「100%正確でなくていい」場面を見抜き、メモリを劇的に削減する確率的データ構造を選択できるようになる。


なぜ確率的データ構造が必要か

問題: 「このURLは既にクロール済みか?」(10億URL)
  HashSet で厳密に管理: 10億 × 50バイト/URL ≈ 50GB
  Bloom Filter で近似判定: 10億件 × 10bit ≈ 1.2GB(40倍削減)

問題: 「今日のユニークビジター数は?」(数十億アクセス)
  HashSet: 数十GB
  HyperLogLog: 12KB(!)

トレードオフ: メモリ削減の代わりに、わずかな誤差を許容する

Bloom Filter:「確実にない」か「たぶんある」

特性:
  False Positive: あり得る(「ある」と言ったが実はなかった)
  False Negative: 絶対にない(「ない」と言えば確実にない)

→ 「ない」と判定された場合は 100% 信頼できる
→ 「ある」と判定された場合は確認が必要(偽陽性の可能性)
import hashlib

class BloomFilter:
    def __init__(self, expected_items: int, fp_rate: float = 0.01):
        """
        expected_items: 想定される要素数
        fp_rate: 許容する False Positive 率
        """
        import math
        # 最適なビット数とハッシュ関数数を計算
        self.size = int(-expected_items * math.log(fp_rate) / (math.log(2) ** 2))
        self.hash_count = int((self.size / expected_items) * math.log(2))
        self.bit_array = bytearray(self.size // 8 + 1)

    def _hashes(self, item: str) -> list[int]:
        """k 個の独立したハッシュ値を生成"""
        result = []
        for i in range(self.hash_count):
            h = int(hashlib.sha256(f"{i}:{item}".encode()).hexdigest(), 16)
            result.append(h % self.size)
        return result

    def _get_bit(self, pos: int) -> bool:
        return bool(self.bit_array[pos // 8] & (1 << (pos % 8)))

    def _set_bit(self, pos: int):
        self.bit_array[pos // 8] |= (1 << (pos % 8))

    def add(self, item: str):
        for pos in self._hashes(item):
            self._set_bit(pos)

    def might_contain(self, item: str) -> bool:
        """True → たぶんある (FPの可能性)、False → 確実にない"""
        return all(self._get_bit(pos) for pos in self._hashes(item))

# 使用例
bf = BloomFilter(expected_items=1_000_000, fp_rate=0.01)
bf.add("https://example.com/page1")
bf.add("https://example.com/page2")

bf.might_contain("https://example.com/page1")  # True(正しい)
bf.might_contain("https://example.com/page3")  # False(確実にない)
bf.might_contain("https://example.com/pageX")  # True の可能性(偽陽性)

False Positive 率の制御

FP 率 ≈ (1 - e^(-kn/m))^k

m = ビット数
n = 要素数
k = ハッシュ関数数

例: 100万件、FP率1%
  m = 9,585,058 bits ≈ 1.14 MB
  k = 7 ハッシュ関数

例: 100万件、FP率0.1%
  m = 14,377,588 bits ≈ 1.71 MB
  k = 10 ハッシュ関数

実システムでの使用例

システム用途
Google Chrome悪意あるURLのブラックリスト判定
Cassandra / HBaseSSTable にキーが存在するかの事前判定
Medium「既に読んだ記事」の判定
BitcoinSPV ノードのトランザクションフィルタリング

HyperLogLog:ユニーク数を 12KB で推定する

問題: 「今日何種類のIPアドレスからアクセスがあったか」

HashSet: 各IP を保存 → 数十億件 × 16バイト = 数十GB
HyperLogLog: 12KB で誤差 0.81% の推定値

12KB で数十億のカーディナリティを推定できる

アイデア(直感)

ランダムなハッシュ値のビット列を見る:
  先頭に0が1個連続する確率: 1/2
  先頭に0が2個連続する確率: 1/4
  先頭に0が k個連続する確率: 1/2^k

もし最大 k=10 が観測されたら、
少なくとも 2^10 = 1024 個のユニーク要素がある推定できる

精度を上げる:
  入力を 2^14 = 16384 個のバケットに分け、各バケットで最大 k を記録
  各バケットの推定値の調和平均を取る → 誤差 0.81%
import hashlib
import math

class HyperLogLog:
    def __init__(self, precision: int = 14):
        """precision: バケット数 = 2^precision(14 → 16384バケット)"""
        self.p = precision
        self.m = 1 << precision  # バケット数
        self.registers = [0] * self.m
        # 補正定数
        self.alpha = 0.7213 / (1 + 1.079 / self.m)

    def add(self, item: str):
        h = int(hashlib.sha256(item.encode()).hexdigest(), 16)
        # 上位 p ビットでバケットを決定
        bucket = h >> (256 - self.p)
        # 残りのビット列で先頭のゼロ数 + 1 を計算
        remaining = h & ((1 << (256 - self.p)) - 1)
        leading_zeros = (256 - self.p) - remaining.bit_length() + 1
        self.registers[bucket] = max(self.registers[bucket], leading_zeros)

    def count(self) -> int:
        """カーディナリティの推定値を返す"""
        # 調和平均
        indicator = sum(2.0 ** (-r) for r in self.registers)
        estimate = self.alpha * self.m * self.m / indicator

        # 小さい値の補正
        if estimate <= 2.5 * self.m:
            zeros = self.registers.count(0)
            if zeros > 0:
                estimate = self.m * math.log(self.m / zeros)

        return int(estimate)

# 使用例
hll = HyperLogLog()
for i in range(1_000_000):
    hll.add(f"user_{i}")
print(hll.count())  # ≈ 1,000,000(誤差 ±0.81%)
# メモリ: 16384 × 6bit ≈ 12KB

Redis での HyperLogLog

# Redis の HyperLogLog(12KB per key)
redis.pfadd("daily_visitors", "user:123")
redis.pfadd("daily_visitors", "user:456")
redis.pfadd("daily_visitors", "user:123")  # 重複は自動で無視

count = redis.pfcount("daily_visitors")  # ≈ 2

# 複数の HyperLogLog をマージ
redis.pfmerge("weekly", "mon", "tue", "wed", "thu", "fri")
weekly_unique = redis.pfcount("weekly")

Count-Min Sketch:頻度の近似カウント

「この単語は何回出現したか?」を省メモリで推定する。

import hashlib

class CountMinSketch:
    def __init__(self, width: int = 10000, depth: int = 7):
        """
        width × depth のカウンター行列
        メモリ: width × depth × 4bytes = 10000 × 7 × 4 = 280KB
        → 数億件のクエリを 280KB で扱える
        """
        self.width = width
        self.depth = depth
        self.table = [[0] * width for _ in range(depth)]

    def _hash(self, item: str, seed: int) -> int:
        return int(hashlib.md5(f"{seed}:{item}".encode()).hexdigest(), 16) % self.width

    def add(self, item: str, count: int = 1):
        for i in range(self.depth):
            self.table[i][self._hash(item, i)] += count

    def estimate(self, item: str) -> int:
        """最小値を取る(衝突による過大カウントを最小化)"""
        return min(self.table[i][self._hash(item, i)] for i in range(self.depth))

# 特性:
#   - 推定値 ≥ 真の値(過小評価しない)
#   - 推定値 ≤ 真の値 + ε × N(ε は width 依存の誤差率、N は全カウント)

3つの確率的データ構造の比較

Bloom FilterHyperLogLogCount-Min Sketch
問う質問「存在するか?」「何種類あるか?」「何回出たか?」
誤差の方向偽陽性あり±0.81%過大推定のみ
メモリ~1MB / 100万件12KB(件数無関係)~280KB
追加操作
削除操作❌(Counting BF は可)
マージ✅ (OR)✅ (union)✅ (行列加算)

まとめ

確率的データ構造は「100% 正確でなくていい場面を見抜く設計力」を求める。

見抜くべきサイン:
  「大量のデータに対して存在チェックがしたい → Bloom Filter」
  「ユニーク数を大雑把に知りたい → HyperLogLog」
  「アクセス頻度のランキングが欲しい → Count-Min Sketch」
  「厳密な答えが必要 → HashSet / HashMap(メモリを覚悟)」