目次を表示する

システム設計とCS概念

位置情報・ライドシェア(Uber)── GeoHash と近傍探索

位置情報・ライドシェア(Uber)── GeoHash と近傍探索

扱うCS概念:GeoHash、Quadtree、空間インデックス(R-Tree)、WebSocket、リアルタイムマッチング


位置情報サービス — GeoHash・Quadtree・WebSocket

この章で何ができるようになるか:「近くのドライバーを探す」という操作がどんなデータ構造で実現されているかを説明できる。位置情報の更新が秒単位で届く高更新頻度システムの設計パターンを理解できる。


問題設定

Uber のコア機能を設計する。

要件:
  - ドライバー数:100万人(同時アクティブ)
  - ライダー数:1000万人
  - ドライバーの位置情報更新:4秒ごと(GPS)
  - 位置情報更新レート:100万 ÷ 4秒 = 25万 updates/秒
  - ライダーが配車リクエスト → 周囲1km以内のドライバーを即座に返す
  - マッチング後のリアルタイム追跡:1秒ごとに地図上でドライバーの位置が動く

核心的な技術問題

  1. 「半径 N km 以内のドライバー」をどう高速に検索するか
  2. 1秒に25万件の位置更新をどう捌くか
  3. リアルタイムな地図上の動きをどう実現するか

近傍探索の素朴な解法とその限界

-- ❌ 緯度経度の範囲検索(フルスキャン)
SELECT driver_id, latitude, longitude
FROM drivers
WHERE latitude BETWEEN 35.65 AND 35.70
  AND longitude BETWEEN 139.68 AND 139.74
  AND status = 'available';

2次元の範囲検索は B-Tree インデックスで効率化できない(1次元のレンジスキャンには使えるが、緯度と経度の組み合わせは難しい)。100万ドライバーに対してフルスキャンは非現実的だ。


GeoHash:2次元座標を1次元文字列に変換する

GeoHash は地球を再帰的に格子状に分割し、各格子に文字列を割り当てる仕組みだ。

精度の例(東京駅付近):
  精度1(約5000km × 5000km): "x"
  精度2(約1250km × 625km):  "xn"
  精度4(約39km × 20km):     "xn76"
  精度6(約1.2km × 0.6km):   "xn76gu"
  精度8(約38m × 19m):       "xn76guf5"
  ※ GeoHash のセルは正方形ではなく、経度方向と緯度方向で幅が異なる

重要な性質:プレフィックスが共通 → 地理的に近い。

# geohash ライブラリを使った実装
import geohash2

lat, lng = 35.6812, 139.7671  # 東京駅

# エンコード(精度6 = 約1.2km四方)
gh = geohash2.encode(lat, lng, precision=6)
# → "xn76gu"

# 周辺8セルを取得(探索範囲を広げるために使う)
neighbors = geohash2.neighbors(gh)
search_cells = [gh] + neighbors  # 自身 + 周囲8セル = 9セル

# Redis の Sorted Set でドライバーを GeoHash でインデックス
def update_driver_location(driver_id: int, lat: float, lng: float):
    gh = geohash2.encode(lat, lng, precision=6)
    # ZADD: スコアを GeoHash の数値表現として保存
    redis.hset(f"driver:location", driver_id, f"{lat},{lng}")
    redis.hset(f"driver:geohash", driver_id, gh)
    
    # セル内のドライバーリストを更新
    old_gh = redis.hget(f"driver:geohash_prev", driver_id)
    if old_gh and old_gh != gh:
        redis.srem(f"geohash:{old_gh}:drivers", driver_id)
    redis.sadd(f"geohash:{gh}:drivers", driver_id)
    redis.hset(f"driver:geohash_prev", driver_id, gh)

def find_nearby_drivers(lat: float, lng: float, radius_km: float = 1.0) -> list[int]:
    gh = geohash2.encode(lat, lng, precision=6)
    search_cells = [gh] + list(geohash2.neighbors(gh))
    
    candidate_driver_ids = []
    for cell in search_cells:
        drivers = redis.smembers(f"geohash:{cell}:drivers")
        candidate_driver_ids.extend(drivers)
    
    # 候補ドライバーの実際の距離を計算して絞り込む
    result = []
    for driver_id in candidate_driver_ids:
        driver_loc = redis.hget("driver:location", driver_id)
        d_lat, d_lng = map(float, driver_loc.split(','))
        dist = haversine(lat, lng, d_lat, d_lng)
        if dist <= radius_km:
            result.append((driver_id, dist))
    
    return sorted(result, key=lambda x: x[1])  # 距離でソート

def haversine(lat1, lng1, lat2, lng2) -> float:
    """2点間の距離をkm で返す(Haversine公式)"""
    R = 6371  # 地球の半径(km)
    dlat = math.radians(lat2 - lat1)
    dlng = math.radians(lng2 - lng1)
    a = (math.sin(dlat/2)**2 +
         math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng/2)**2)
    return R * 2 * math.asin(math.sqrt(a))

GeoHash の弱点:格子の境界付近では、地理的に近いのに GeoHash が全く異なることがある(境界問題)。周囲8セルを検索対象に含めることでこれを軽減する。


Redis の GeoSpatial コマンド

Redis 3.2 以降、GeoHash を内部で使った地理空間コマンドが組み込まれている。

import redis

r = redis.Redis()

# ドライバーの位置を登録(内部で GeoHash として Sorted Set に保存)
r.geoadd("drivers", [
    (139.7671, 35.6812, "driver:001"),  # (lng, lat, member)
    (139.7580, 35.6762, "driver:002"),
    (139.7800, 35.6900, "driver:003"),
])

# 東京駅から1km以内のドライバーを検索
nearby = r.geosearch(
    "drivers",
    longitude=139.7671,
    latitude=35.6812,
    radius=1,
    unit="km",
    withdist=True,   # 距離も返す
    withcoord=True,  # 座標も返す
    sort="ASC"       # 距離が近い順
)
# → [("driver:001", 0.0, (139.7671, 35.6812)), ...]

Quadtree:動的な密度対応

GeoHash は静的な格子だが、都市部(ドライバーが密集)と郊外(まばら)で格子の精度が不均一だと問題が起きる。

Quadtree は空間を動的に分割する木構造だ。エリアにドライバーが多すぎると4分割し、少なければ分割しない。

class QuadTreeNode:
    MAX_CAPACITY = 10  # 1ノードの最大ドライバー数

    def __init__(self, min_lat, max_lat, min_lng, max_lng):
        self.boundary = (min_lat, max_lat, min_lng, max_lng)
        self.drivers: list[tuple] = []  # (driver_id, lat, lng)
        self.children: list['QuadTreeNode'] | None = None

    def insert(self, driver_id: int, lat: float, lng: float) -> bool:
        if not self._in_boundary(lat, lng):
            return False

        if self.children is None:
            self.drivers.append((driver_id, lat, lng))
            if len(self.drivers) > self.MAX_CAPACITY:
                self._subdivide()  # 4分割
            return True
        else:
            return any(child.insert(driver_id, lat, lng) for child in self.children)

    def _subdivide(self):
        min_lat, max_lat, min_lng, max_lng = self.boundary
        mid_lat = (min_lat + max_lat) / 2
        mid_lng = (min_lng + max_lng) / 2
        
        self.children = [
            QuadTreeNode(min_lat, mid_lat, min_lng, mid_lng),  # SW
            QuadTreeNode(min_lat, mid_lat, mid_lng, max_lng),  # SE
            QuadTreeNode(mid_lat, max_lat, min_lng, mid_lng),  # NW
            QuadTreeNode(mid_lat, max_lat, mid_lng, max_lng),  # NE
        ]
        # 既存のドライバーを子ノードに再分配
        for driver in self.drivers:
            for child in self.children:
                child.insert(*driver)
        self.drivers = []

    def query_range(self, lat, lng, radius_km) -> list[tuple]:
        """半径 radius_km 以内のドライバーを返す"""
        result = []
        if not self._intersects_circle(lat, lng, radius_km):
            return result

        if self.children is None:
            for d_id, d_lat, d_lng in self.drivers:
                if haversine(lat, lng, d_lat, d_lng) <= radius_km:
                    result.append((d_id, d_lat, d_lng))
        else:
            for child in self.children:
                result.extend(child.query_range(lat, lng, radius_km))
        return result
graph TD
    Root[世界全体] --> NW[北西:過疎地域<br/>分割なし]
    Root --> NE[北東]
    Root --> SW[南西]
    Root --> SE[南東:都市部<br/>ドライバー密集 → 再分割]
    SE --> SE_NW[SE の北西]
    SE --> SE_NE[SE の北東]
    SE --> SE_SW[SE の南西]
    SE --> SE_SE[SE の南東<br/>さらに密集 → 再分割]

リアルタイム追跡:WebSocket の設計

配車後のドライバー追跡は「1秒ごとに地図上でドライバーが動く」体験を提供する。HTTP のポーリングでは不十分(レイテンシ・負荷)なため、WebSocket を使う。

# FastAPI + WebSocket でのドライバー追跡
from fastapi import FastAPI, WebSocket
import asyncio
import json

app = FastAPI()
# 接続中のライダー: {rider_id → WebSocket}
connections: dict[int, WebSocket] = {}

@app.websocket("/ws/track/{rider_id}")
async def track_driver(websocket: WebSocket, rider_id: int):
    await websocket.accept()
    connections[rider_id] = websocket
    
    try:
        while True:
            # ライダーの担当ドライバーを DB から取得
            trip = db.get_active_trip(rider_id)
            if trip:
                loc = redis.hgetall(f"driver:location:{trip.driver_id}")
                await websocket.send_json({
                    "lat": float(loc["lat"]),
                    "lng": float(loc["lng"]),
                    "eta_seconds": calculate_eta(loc, trip.destination)
                })
            await asyncio.sleep(1)
    except Exception:
        del connections[rider_id]

# ドライバー側:位置情報を送信
@app.websocket("/ws/driver/{driver_id}")
async def driver_location(websocket: WebSocket, driver_id: int):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            lat, lng = data["lat"], data["lng"]
            
            # Redis に位置情報を保存
            redis.hset(f"driver:location:{driver_id}", mapping={"lat": lat, "lng": lng})
            
            # GeoHash を更新
            update_driver_location(driver_id, lat, lng)
    except Exception:
        pass

スケールの問題:WebSocket はステートフルだ(接続が特定サーバーに張られる)。100万の同時接続を1台で処理できないため、接続を分散する必要がある。

解決策:接続サーバーの分離 + Pub/Sub

graph LR
    Driver --> ConnServer1[接続サーバー1]
    Rider --> ConnServer2[接続サーバー2]
    
    ConnServer1 -->|位置情報を publish| PubSub[(Redis Pub/Sub)]
    PubSub -->|trip:123 の更新| ConnServer2
    ConnServer2 -->|WebSocket push| Rider

マッチングアルゴリズム

「近くにいるドライバーを配車する」は単純に見えて奥深い。

def find_best_driver(rider_lat, rider_lng, rider_destination) -> int | None:
    # 候補ドライバーを近傍探索
    candidates = find_nearby_drivers(rider_lat, rider_lng, radius_km=3.0)
    
    if not candidates:
        return None
    
    # スコアリング(単純な距離だけでなく、多要素評価)
    def score_driver(driver_id, distance_km):
        driver = db.get_driver(driver_id)
        eta = distance_km / 30 * 60  # 平均時速30kmで推定ETA(分)
        
        # 目的地方向ボーナス(ドライバーの次の移動と一致すると良い)
        heading_bonus = compute_heading_match(
            driver["last_heading"],
            bearing(rider_lat, rider_lng, *rider_destination)
        )
        
        # レーティングボーナス
        rating_bonus = (driver["rating"] - 4.0) * 0.5
        
        return -(eta - heading_bonus - rating_bonus)  # スコアが高いほど優先

    scored = [(score_driver(d_id, dist), d_id) for d_id, dist in candidates[:20]]
    scored.sort(reverse=True)
    
    return scored[0][1] if scored else None

まとめ

課題解決策CS概念
2次元の近傍探索GeoHash / Quadtree空間インデックス
動的な密度分布への対応Quadtree(動的分割)再帰的空間分割
秒間25万件の位置更新Redis GeoSpatial + Shardingインメモリ KV
リアルタイム地図追跡WebSocket + Pub/Subプッシュ型通信
国境・都市の境界問題周囲8セルを探索GeoHash の境界補正

GeoHash と Quadtree はどちらも「2次元の問題を1次元に落とし込む」発想だ。GeoHash は文字列プレフィックスで、Quadtree はパスで位置を表現する。アクセスパターンと密度の均一性によって使い分ける。