確率的データ構造 ── Bloom Filter・HyperLogLog・Count-Min Sketch
扱う概念:確率的判定、False Positive / False Negative、ビット配列、カーディナリティ推定、頻度推定

この章で何ができるようになるか:「100%正確でなくていい」場面を見抜き、メモリを劇的に削減する確率的データ構造を選択できるようになる。
なぜ確率的データ構造が必要か
問題: 「このURLは既にクロール済みか?」(10億URL)
HashSet で厳密に管理: 10億 × 50バイト/URL ≈ 50GB
Bloom Filter で近似判定: 10億件 × 10bit ≈ 1.2GB(40倍削減)
問題: 「今日のユニークビジター数は?」(数十億アクセス)
HashSet: 数十GB
HyperLogLog: 12KB(!)
トレードオフ: メモリ削減の代わりに、わずかな誤差を許容する
Bloom Filter:「確実にない」か「たぶんある」
特性:
False Positive: あり得る(「ある」と言ったが実はなかった)
False Negative: 絶対にない(「ない」と言えば確実にない)
→ 「ない」と判定された場合は 100% 信頼できる
→ 「ある」と判定された場合は確認が必要(偽陽性の可能性)
import hashlib
class BloomFilter:
def __init__(self, expected_items: int, fp_rate: float = 0.01):
"""
expected_items: 想定される要素数
fp_rate: 許容する False Positive 率
"""
import math
# 最適なビット数とハッシュ関数数を計算
self.size = int(-expected_items * math.log(fp_rate) / (math.log(2) ** 2))
self.hash_count = int((self.size / expected_items) * math.log(2))
self.bit_array = bytearray(self.size // 8 + 1)
def _hashes(self, item: str) -> list[int]:
"""k 個の独立したハッシュ値を生成"""
result = []
for i in range(self.hash_count):
h = int(hashlib.sha256(f"{i}:{item}".encode()).hexdigest(), 16)
result.append(h % self.size)
return result
def _get_bit(self, pos: int) -> bool:
return bool(self.bit_array[pos // 8] & (1 << (pos % 8)))
def _set_bit(self, pos: int):
self.bit_array[pos // 8] |= (1 << (pos % 8))
def add(self, item: str):
for pos in self._hashes(item):
self._set_bit(pos)
def might_contain(self, item: str) -> bool:
"""True → たぶんある (FPの可能性)、False → 確実にない"""
return all(self._get_bit(pos) for pos in self._hashes(item))
# 使用例
bf = BloomFilter(expected_items=1_000_000, fp_rate=0.01)
bf.add("https://example.com/page1")
bf.add("https://example.com/page2")
bf.might_contain("https://example.com/page1") # True(正しい)
bf.might_contain("https://example.com/page3") # False(確実にない)
bf.might_contain("https://example.com/pageX") # True の可能性(偽陽性)
False Positive 率の制御
FP 率 ≈ (1 - e^(-kn/m))^k
m = ビット数
n = 要素数
k = ハッシュ関数数
例: 100万件、FP率1%
m = 9,585,058 bits ≈ 1.14 MB
k = 7 ハッシュ関数
例: 100万件、FP率0.1%
m = 14,377,588 bits ≈ 1.71 MB
k = 10 ハッシュ関数
実システムでの使用例
| システム | 用途 |
|---|---|
| Google Chrome | 悪意あるURLのブラックリスト判定 |
| Cassandra / HBase | SSTable にキーが存在するかの事前判定 |
| Medium | 「既に読んだ記事」の判定 |
| Bitcoin | SPV ノードのトランザクションフィルタリング |
HyperLogLog:ユニーク数を 12KB で推定する
問題: 「今日何種類のIPアドレスからアクセスがあったか」
HashSet: 各IP を保存 → 数十億件 × 16バイト = 数十GB
HyperLogLog: 12KB で誤差 0.81% の推定値
12KB で数十億のカーディナリティを推定できる
アイデア(直感)
ランダムなハッシュ値のビット列を見る:
先頭に0が1個連続する確率: 1/2
先頭に0が2個連続する確率: 1/4
先頭に0が k個連続する確率: 1/2^k
もし最大 k=10 が観測されたら、
少なくとも 2^10 = 1024 個のユニーク要素がある推定できる
精度を上げる:
入力を 2^14 = 16384 個のバケットに分け、各バケットで最大 k を記録
各バケットの推定値の調和平均を取る → 誤差 0.81%
import hashlib
import math
class HyperLogLog:
def __init__(self, precision: int = 14):
"""precision: バケット数 = 2^precision(14 → 16384バケット)"""
self.p = precision
self.m = 1 << precision # バケット数
self.registers = [0] * self.m
# 補正定数
self.alpha = 0.7213 / (1 + 1.079 / self.m)
def add(self, item: str):
h = int(hashlib.sha256(item.encode()).hexdigest(), 16)
# 上位 p ビットでバケットを決定
bucket = h >> (256 - self.p)
# 残りのビット列で先頭のゼロ数 + 1 を計算
remaining = h & ((1 << (256 - self.p)) - 1)
leading_zeros = (256 - self.p) - remaining.bit_length() + 1
self.registers[bucket] = max(self.registers[bucket], leading_zeros)
def count(self) -> int:
"""カーディナリティの推定値を返す"""
# 調和平均
indicator = sum(2.0 ** (-r) for r in self.registers)
estimate = self.alpha * self.m * self.m / indicator
# 小さい値の補正
if estimate <= 2.5 * self.m:
zeros = self.registers.count(0)
if zeros > 0:
estimate = self.m * math.log(self.m / zeros)
return int(estimate)
# 使用例
hll = HyperLogLog()
for i in range(1_000_000):
hll.add(f"user_{i}")
print(hll.count()) # ≈ 1,000,000(誤差 ±0.81%)
# メモリ: 16384 × 6bit ≈ 12KB
Redis での HyperLogLog
# Redis の HyperLogLog(12KB per key)
redis.pfadd("daily_visitors", "user:123")
redis.pfadd("daily_visitors", "user:456")
redis.pfadd("daily_visitors", "user:123") # 重複は自動で無視
count = redis.pfcount("daily_visitors") # ≈ 2
# 複数の HyperLogLog をマージ
redis.pfmerge("weekly", "mon", "tue", "wed", "thu", "fri")
weekly_unique = redis.pfcount("weekly")
Count-Min Sketch:頻度の近似カウント
「この単語は何回出現したか?」を省メモリで推定する。
import hashlib
class CountMinSketch:
def __init__(self, width: int = 10000, depth: int = 7):
"""
width × depth のカウンター行列
メモリ: width × depth × 4bytes = 10000 × 7 × 4 = 280KB
→ 数億件のクエリを 280KB で扱える
"""
self.width = width
self.depth = depth
self.table = [[0] * width for _ in range(depth)]
def _hash(self, item: str, seed: int) -> int:
return int(hashlib.md5(f"{seed}:{item}".encode()).hexdigest(), 16) % self.width
def add(self, item: str, count: int = 1):
for i in range(self.depth):
self.table[i][self._hash(item, i)] += count
def estimate(self, item: str) -> int:
"""最小値を取る(衝突による過大カウントを最小化)"""
return min(self.table[i][self._hash(item, i)] for i in range(self.depth))
# 特性:
# - 推定値 ≥ 真の値(過小評価しない)
# - 推定値 ≤ 真の値 + ε × N(ε は width 依存の誤差率、N は全カウント)
3つの確率的データ構造の比較
| Bloom Filter | HyperLogLog | Count-Min Sketch | |
|---|---|---|---|
| 問う質問 | 「存在するか?」 | 「何種類あるか?」 | 「何回出たか?」 |
| 誤差の方向 | 偽陽性あり | ±0.81% | 過大推定のみ |
| メモリ | ~1MB / 100万件 | 12KB(件数無関係) | ~280KB |
| 追加操作 | ✅ | ✅ | ✅ |
| 削除操作 | ❌ | ❌ | ❌(Counting BF は可) |
| マージ | ✅ (OR) | ✅ (union) | ✅ (行列加算) |
まとめ
確率的データ構造は「100% 正確でなくていい場面を見抜く設計力」を求める。
見抜くべきサイン:
「大量のデータに対して存在チェックがしたい → Bloom Filter」
「ユニーク数を大雑把に知りたい → HyperLogLog」
「アクセス頻度のランキングが欲しい → Count-Min Sketch」
「厳密な答えが必要 → HashSet / HashMap(メモリを覚悟)」