目次を表示する

システム設計とCS概念

オートコンプリート ── Trie・Count-Min Sketch・HyperLogLog

オートコンプリート ── Trie・Count-Min Sketch・HyperLogLog

扱うCS概念:Trie(プレフィックスツリー)、Count-Min Sketch、HyperLogLog、Top-K 問題、近似アルゴリズム


オートコンプリート — Trie・Count-Min Sketch・HyperLogLog

この章で何ができるようになるか:検索窓の「候補表示」がどんなデータ構造で高速に実現されているかを説明できるようになる。「数億のクエリから人気順 Top-K を出す」という問題に対する近似アルゴリズムの使い方を理解できる。


問題設定

「go」と入力したとき、「google」「golang」「go to」「gopher」のような候補をミリ秒以内に返す。

要件:
  - 1日1億件の検索クエリ
  - プレフィックスから候補を最大10件
  - 候補は「人気順」(検索頻度が高いものを上位に)
  - レスポンスタイム:100ms 以内
  - 候補は「そのプレフィックスを含む検索語」のうちスコア上位10個
  
非機能要件:
  - リアルタイム更新(1時間以内に最新のトレンドが反映される)
  - メモリ効率(1億の検索語を低メモリで扱う)

データ構造:Trie(プレフィックスツリー)

Trie はプレフィックス検索に特化した木構造だ。「全ての文字列を文字単位のノードで表現する」。

"go", "google", "golang", "gopher", "get" を Trie に挿入:

root
└── g
    ├── o (終端: "go", freq=100)
    │   ├── o
    │   │   └── g
    │   │       └── l
    │   │           └── e (終端: "google", freq=500)
    │   └── l
    │       └── a
    │           └── n
    │               └── g (終端: "golang", freq=200)
    └── e
        └── t (終端: "get", freq=150)
class TrieNode:
    def __init__(self):
        self.children: dict[str, 'TrieNode'] = {}
        self.is_end: bool = False
        self.frequency: int = 0
        # パフォーマンス最適化:このノード配下のTop-10候補をキャッシュ
        self.top_suggestions: list[tuple[int, str]] = []  # (freq, word)

class Trie:
    def __init__(self):
        self.root = TrieNode()

    def insert(self, word: str, frequency: int):
        node = self.root
        for char in word:
            if char not in node.children:
                node.children[char] = TrieNode()
            node = node.children[char]
        node.is_end = True
        node.frequency = frequency

    def search_prefix(self, prefix: str) -> TrieNode | None:
        node = self.root
        for char in prefix:
            if char not in node.children:
                return None
            node = node.children[char]
        return node

    def get_suggestions(self, prefix: str, top_k: int = 10) -> list[str]:
        node = self.search_prefix(prefix)
        if not node:
            return []
        
        # ノードにキャッシュがあればそれを使う(最適化)
        if node.top_suggestions:
            return [word for _, word in node.top_suggestions[:top_k]]
        
        # DFS で候補を収集
        results = []
        self._dfs(node, prefix, results)
        results.sort(reverse=True)  # 頻度降順
        
        # キャッシュに保存
        node.top_suggestions = results[:top_k]
        return [word for _, word in results[:top_k]]

    def _dfs(self, node: TrieNode, current: str, results: list):
        if node.is_end:
            results.append((node.frequency, current))
        for char, child in node.children.items():
            self._dfs(child, current + char, results)

Trie のメモリ問題と最適化

1億の検索語を Trie に入れると、メモリが大量に必要になる。

最適化1:圧縮 Trie(Radix Tree)

1本道の連続した文字を1ノードにまとめる。

通常の Trie:
  g → o → o → g → l → e("google")
  
Radix Tree:
  g → o → "ogle"(1ノードで表現)
class RadixTree:
    """一本道の連続ノードを圧縮する"""
    def __init__(self):
        self.children: dict[str, tuple['RadixTree', str]] = {}
        # key: 最初の文字, value: (子ノード, そのエッジのラベル全体)
        self.is_end = False
        self.frequency = 0

最適化2:各ノードに Top-K を事前計算してキャッシュ

DFS で毎回全候補を探索するのは遅い。各ノードに「自分の配下の Top-10」を事前計算して持っておく。

def precompute_top_k(node: TrieNode, k: int = 10):
    """ボトムアップに Top-K をキャッシュする"""
    if not node.children:
        if node.is_end:
            node.top_suggestions = [(node.frequency, "")]
        return
    
    all_suggestions = []
    for char, child in node.children.items():
        precompute_top_k(child, k)
        for freq, suffix in child.top_suggestions:
            all_suggestions.append((freq, char + suffix))
    
    if node.is_end:
        all_suggestions.append((node.frequency, ""))
    
    all_suggestions.sort(reverse=True)
    node.top_suggestions = all_suggestions[:k]

これにより検索は O(prefix_len) で完了する。


Top-K の集計問題:1日1億クエリからランキングを作る

Trie に入れる「各単語の頻度」はどうやって求めるか。

素朴な解法:全クエリを DB に保存して COUNT + ORDER BY。

SELECT query, COUNT(*) as freq
FROM search_logs
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY query
ORDER BY freq DESC
LIMIT 100;

1億行の集計クエリを毎時間実行するのは現実的でない。


Count-Min Sketch:メモリ効率の良い頻度カウンター

Count-Min Sketch は「正確な頻度」でなく「誤差が小さい頻度の近似値」をメモリ効率よく計算するデータ構造だ。

アイデア:
  k 個の異なるハッシュ関数 × w 個のカウンター配列を用意する

  クエリが来るたびに:
    各ハッシュ関数でインデックスを計算し、対応カウンターをインクリメント

  頻度を知りたいとき:
    各ハッシュ関数でインデックスを計算し、対応カウンターの最小値を取る
    (ハッシュ衝突で大きくなることはあっても、小さくなることはない)
import hashlib

class CountMinSketch:
    def __init__(self, width: int = 1000, depth: int = 5):
        """
        width: カウンター配列の幅(大きいほど精度が高いが、メモリも増える)
        depth: ハッシュ関数の数(大きいほど精度が高いが、計算コストも増える)
        メモリ:width × depth × 4 bytes = 1000 × 5 × 4 = 20KB
        (1億クエリを 20KB で扱える!)
        """
        self.width = width
        self.depth = depth
        self.table = [[0] * width for _ in range(depth)]

    def _hash(self, item: str, seed: int) -> int:
        h = hashlib.md5(f"{seed}:{item}".encode()).hexdigest()
        return int(h, 16) % self.width

    def increment(self, item: str, count: int = 1):
        for i in range(self.depth):
            idx = self._hash(item, i)
            self.table[i][idx] += count

    def estimate(self, item: str) -> int:
        return min(self.table[i][self._hash(item, i)] for i in range(self.depth))

# 使用例
sketch = CountMinSketch(width=10000, depth=5)  # ~200KB

for query in search_queries:  # 1億クエリを処理
    sketch.increment(query)

print(sketch.estimate("golang"))  # 頻度の近似値(誤差は小さい)

誤差の保証:幅 w = e/ε(e は自然対数の底、ε は許容誤差率)、深さ d = ln(1/δ)(δ は失敗確率)と設定すると、推定値は真の値を最大 ε×N(N は総カウント数)だけ超えることが確率 1-δ で保証される。


HyperLogLog:ユニークな検索語の数を推定する

「今日の検索で、何種類のユニークな単語が使われたか」を正確に求めるには、全クエリのセットを保持する必要がある(最大100億バイト)。

HyperLogLog12KB 程度で、数十億のユニーク数を 0.81% の誤差で推定できる。

アイデア(直感的な説明):
  ハッシュ値のビット列の「先頭に連続する 0 が何個あるか」を記録する。
  
  ランダムなハッシュなら、先頭に k 個の 0 が連続する確率は 1/2^k。
  もし最大 k = 10 が観測されたなら、少なくとも 2^10 = 1024 個のユニーク要素がある推定できる。

実際は:
  複数のサブセット(通常 2^14 = 16384 個)に分けて平均化することで精度を上げる。
# Redis の HyperLogLog(内部実装は Redis が提供)
redis.pfadd("daily_queries", "golang")
redis.pfadd("daily_queries", "python")
redis.pfadd("daily_queries", "golang")  # 重複は無視される

unique_count = redis.pfcount("daily_queries")
# → 約 2(誤差率 0.81%)

# 複数日のユニーク数を合計
redis.pfmerge("weekly_queries", "daily_queries_mon", "daily_queries_tue", ...)
weekly_unique = redis.pfcount("weekly_queries")

なぜ近似で十分か:「今日の検索で500万ユニーク語か510万ユニーク語か」を厳密に知る必要はない。トレンド分析やキャパシティプランニングには近似値で十分だ。


Top-K の分散集計:ヒープ + スケッチの組み合わせ

graph TD
    Queries[全検索クエリ<br/>1日1億件] --> Kafka[Kafka ストリーム]
    Kafka --> Workers[集計ワーカー群<br/>Count-Min Sketch で集計]
    Workers --> Aggregator[アグリゲーター<br/>Top-K 候補を抽出]
    Aggregator --> TopK[(Top-K ストア<br/>Redis Sorted Set)]
    TopK --> Trie[Trie に反映<br/>頻度更新]
    Trie --> API[オートコンプリート API]
# Redis Sorted Set を使った Top-K 管理
class TopKTracker:
    def __init__(self, redis_client, k: int = 1000):
        self.redis = redis_client
        self.k = k
        self.key = "top_k_queries"

    def record_query(self, query: str, count: int = 1):
        # ZINCRBY:スコアを加算(なければ追加)
        self.redis.zincrby(self.key, count, query)
        
        # Top-K 件より多くなったら最下位を削除
        size = self.redis.zcard(self.key)
        if size > self.k * 1.1:  # 10%バッファを超えたら削除
            # スコアが低い順に削除
            self.redis.zremrangebyrank(self.key, 0, size - self.k - 1)

    def get_top_k(self, prefix: str, k: int = 10) -> list[tuple[str, float]]:
        # ZRANGEBYSCORE で prefix にマッチするものを取得
        # (Redis の Sorted Set は辞書順でも取得できる)
        candidates = self.redis.zrangebylex(
            self.key,
            f"[{prefix}",       # prefix 以上
            f"[{prefix}\xff",   # prefix + 最大文字(\xff)以下
            0, 50               # 最大50件の候補を取得
        )
        
        # 各候補のスコアを取得してソート
        scored = [(c, self.redis.zscore(self.key, c)) for c in candidates]
        scored.sort(key=lambda x: -x[1])
        return scored[:k]

リアルタイムトレンドへの対応

「今まさに話題のキーワード」を候補に反映するには、過去の累積スコアより直近の頻度を重視する必要がある。

# 指数移動平均(EMA)を使ったスコア計算
def update_score_with_ema(current_score: float, new_count: int,
                           alpha: float = 0.1) -> float:
    """
    alpha: 新しいデータへの重み(0に近いほど過去を重視)
    alpha = 0.1:直近10時間分を強く反映
    """
    return alpha * new_count + (1 - alpha) * current_score

# 時間ウィンドウ付きカウント(直近1時間のみ集計)
def get_trending_queries(window_minutes: int = 60) -> list[str]:
    now = time.time()
    window_start = now - window_minutes * 60
    
    # Redis Sorted Set に (タイムスタンプ, クエリ) を保存
    recent_queries = redis.zrangebyscore("query_log", window_start, now)
    counter = Counter(recent_queries)
    return [query for query, _ in counter.most_common(10)]

まとめ

課題データ構造CS概念メモリ効率
プレフィックス検索Trie(Radix Tree)プレフィックスツリー○ 共通プレフィックスを共有
頻度の近似カウントCount-Min Sketch確率的データ構造◎ 1億クエリ → 数百KB
ユニーク数の推定HyperLogLog確率的データ構造◎ 数十億 → 12KB
Top-K のリアルタイム管理Redis Sorted Setスコア付き集合○ K 件のみ保持
トレンドの反映EMA(指数移動平均)時系列スムージング◎ 定数メモリ

近似アルゴリズムの哲学:厳密な正確さが不要な問題(トレンド分析、ユニーク数の推定、頻度ランキング)では、メモリや計算量を劇的に削減できる近似アルゴリズムが強力な武器になる。「100%正確でなくていい場面を見抜く」のが設計力の一つだ。