目次を表示する

システム設計とCS概念

チャット / リアルタイムメッセージング(WhatsApp/Slack)── 順序保証とE2E暗号化

チャット / リアルタイムメッセージング(WhatsApp/Slack)── 順序保証とE2E暗号化

扱うCS概念:WebSocket / Long Polling、メッセージ順序保証(Lamport Clock / Vector Clock)、E2E暗号化(Signal Protocol)、オフライン同期、プレゼンス管理


チャットシステム — WebSocket・シーケンス順序・E2E暗号化

この章で何ができるようになるか:1対1チャットとグループチャットの設計の違いを説明でき、メッセージの順序保証・配信保証・E2E暗号化がどう実現されているかを理解できる。


問題設定

WhatsApp 規模のチャットアプリを設計する。

規模感:
  - ユーザー数:30億(MAU、2025年時点)
  - 同時接続:数億
  - メッセージ数:1日1000億件以上
  - グループ:最大1024人

機能要件:
  - 1対1チャット
  - グループチャット
  - メッセージのステータス(送信済み / 配信済み / 既読)
  - メディア送信(画像・動画)
  - オフライン時のメッセージ保持

非機能要件:
  - リアルタイム配信(数百ms 以内)
  - メッセージの順序保証
  - E2E暗号化(サーバーもメッセージを読めない)
  - 高可用性(99.99%)

接続管理:WebSocket が必要な理由

HTTP のリクエスト-レスポンスモデルでは「サーバーからクライアントにプッシュ」ができない。

HTTP Polling(短期ポーリング):
  クライアント →(1秒ごとに)→ サーバー: 「新しいメッセージある?」
  サーバー: 「ない」「ない」「ない」「1件あり」
  → リクエストの90%以上が空振り。大量のユーザーで非現実的。

Long Polling:
  クライアント → サーバー: 「新しいメッセージがあるまで待って」
  サーバー: (メッセージが来るまでレスポンスを保留)
  → 空振りは減るが、接続の張り直しが頻繁。タイムアウト管理が複雑。

WebSocket:
  クライアント ↔ サーバー: 双方向の永続接続
  → サーバーからいつでもプッシュできる。最も効率的。
# WebSocket 接続管理
class ConnectionManager:
    def __init__(self):
        # {user_id: WebSocket}
        self.active: dict[str, WebSocket] = {}
        # ユーザーがどのサーバーに接続しているかを Redis で管理
        self.redis = redis.Redis()

    async def connect(self, user_id: str, ws: WebSocket):
        self.active[user_id] = ws
        # どのサーバーに接続しているかを記録
        self.redis.hset("user:connections", user_id, self.server_id)

    async def disconnect(self, user_id: str):
        self.active.pop(user_id, None)
        self.redis.hdel("user:connections", user_id)

    async def send_to_user(self, user_id: str, message: dict):
        ws = self.active.get(user_id)
        if ws:
            await ws.send_json(message)
        else:
            # 別サーバーに接続中 or オフライン
            server_id = self.redis.hget("user:connections", user_id)
            if server_id:
                # Pub/Sub で該当サーバーに転送
                self.redis.publish(f"server:{server_id}", json.dumps({
                    "to": user_id,
                    "message": message
                }))
            else:
                # オフライン → キューに保存
                self.redis.rpush(f"offline:{user_id}", json.dumps(message))

メッセージフロー

sequenceDiagram
    participant Alice
    participant ChatSrvA as Chat Server A
    participant MQ as メッセージキュー
    participant ChatSrvB as Chat Server B
    participant Bob

    Alice->>ChatSrvA: WebSocket: {to: "bob", text: "hi"}
    ChatSrvA->>ChatSrvA: メッセージID生成(Snowflake)
    ChatSrvA->>MQ: メッセージをキューに投入
    ChatSrvA-->>Alice: ACK: sent ✓
    MQ->>ChatSrvB: Bob の接続先サーバーへ配信
    ChatSrvB->>Bob: WebSocket push: {from: "alice", text: "hi"}
    Bob-->>ChatSrvB: ACK: delivered ✓✓
    ChatSrvB->>MQ: 配信完了イベント
    MQ->>ChatSrvA: Alice に配信ステータスを通知
    ChatSrvA->>Alice: delivered ✓✓

    Note over Alice,Bob: Bob がメッセージを開いたとき
    Bob->>ChatSrvB: read receipt
    ChatSrvB->>MQ: 既読イベント
    MQ->>ChatSrvA: Alice に既読通知
    ChatSrvA->>Alice: read ✓✓(青チェック)

メッセージの順序保証

「Hello」→「World」の順に送ったのに「World」→「Hello」の順に表示される──これは許されない。

問題:分散環境では「時刻」が信頼できない

Alice のスマホ時計:14:00:01 に「Hello」送信
Alice のスマホ時計:14:00:02 に「World」送信
→ Alice 側では順序が保証される

しかし Bob のサーバーに届く順序はネットワーク遅延で逆転しうる
→ サーバー側のタイムスタンプも NTP ずれで信頼できない

解決策:会話単位のシーケンス番号

class Conversation:
    def __init__(self, conversation_id: str):
        self.conversation_id = conversation_id
        # Redis INCR でアトミックにシーケンス番号を発行
        self.seq_key = f"conv:{conversation_id}:seq"

    def create_message(self, sender_id: str, content: str) -> dict:
        # 会話ごとの単調増加シーケンス番号
        seq = redis.incr(self.seq_key)
        return {
            "message_id": generate_snowflake_id(),
            "conversation_id": self.conversation_id,
            "sender_id": sender_id,
            "content": content,
            "sequence": seq,  # これで順序を保証
            "server_timestamp": time.time(),
        }

クライアントはシーケンス番号で並び替えて表示する。サーバーのタイムスタンプはあくまで参考情報。


グループチャットの配信

1対1なら相手の接続先サーバーに送るだけだが、グループチャットでは全メンバーに配信する必要がある。

async def send_group_message(group_id: str, sender_id: str, content: str):
    message = create_message(sender_id, content)

    # グループメンバー一覧を取得
    members = await db.get_group_members(group_id)

    # メッセージを永続化
    await db.insert_message(group_id, message)

    # 各メンバーへ配信
    for member_id in members:
        if member_id == sender_id:
            continue
        await connection_manager.send_to_user(member_id, message)
        # オフラインメンバーには未読カウンターを増加
        if not connection_manager.is_online(member_id):
            await redis.incr(f"unread:{member_id}:{group_id}")

Fan-out の問題(Ch.10 と同じ構造):1024人グループでメッセージが来るたびに1024件の配信が走る。高頻度なグループではメッセージキューを介して非同期配信する。


E2E 暗号化(End-to-End Encryption)

WhatsApp は Signal Protocol を使い、サーバーがメッセージの内容を一切読めない設計になっている。

鍵交換の流れ(簡略版):
  1. Alice と Bob が鍵ペア(公開鍵・秘密鍵)を生成
  2. 公開鍵をサーバーに登録
  3. Alice が Bob に送信するとき:
     a. Bob の公開鍵をサーバーから取得
     b. Diffie-Hellman 鍵交換で共有秘密鍵を導出
     c. 共有秘密鍵で AES-256-CBC 暗号化 + HMAC-SHA256 認証
     d. 暗号文をサーバー経由で Bob に送信
  4. Bob は自分の秘密鍵で共有秘密鍵を導出し、復号

サーバーは暗号文だけを中継。平文を見ることができない。
# Signal Protocol の Double Ratchet(簡略化した概念コード)
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os

class E2ESession:
    """メッセージごとに鍵が更新される(Forward Secrecy)"""

    def __init__(self, shared_secret: bytes):
        self.chain_key = shared_secret
        self.message_number = 0

    def _derive_keys(self) -> tuple[bytes, bytes]:
        """チェーンキーからメッセージキーと次のチェーンキーを導出"""
        import hashlib, hmac
        mk = hmac.new(self.chain_key, b"\x01", hashlib.sha256).digest()
        next_ck = hmac.new(self.chain_key, b"\x02", hashlib.sha256).digest()
        self.chain_key = next_ck
        return mk, next_ck

    def encrypt(self, plaintext: bytes) -> bytes:
        message_key, _ = self._derive_keys()
        nonce = os.urandom(12)
        aesgcm = AESGCM(message_key)
        ciphertext = aesgcm.encrypt(nonce, plaintext, None)
        self.message_number += 1
        return nonce + ciphertext

    def decrypt(self, data: bytes) -> bytes:
        nonce, ciphertext = data[:12], data[12:]
        message_key, _ = self._derive_keys()
        aesgcm = AESGCM(message_key)
        self.message_number += 1
        return aesgcm.decrypt(nonce, ciphertext, None)

Forward Secrecy:メッセージごとに鍵が変わるため、仮に1つの鍵が漏洩しても過去のメッセージは復号できない。


オフライン同期

Alice がオフラインの間に Bob がメッセージを送った場合:

1. サーバーは Alice の未配信キューにメッセージを保存
2. Alice がオンラインに復帰
3. WebSocket 接続確立後、未配信メッセージを一括配信
4. Alice のクライアントがシーケンス番号でソートして表示
async def on_user_reconnect(user_id: str, ws: WebSocket):
    """ユーザーが再接続したとき、未配信メッセージを送信"""
    # 最後に受信したシーケンス番号をクライアントから受け取る
    client_state = await ws.receive_json()
    last_seen_seq = client_state.get("last_sequence", 0)

    # 各会話の未配信メッセージを取得
    conversations = await db.get_user_conversations(user_id)
    for conv_id in conversations:
        messages = await db.get_messages_after(conv_id, last_seen_seq)
        for msg in messages:
            await ws.send_json(msg)

    # オフラインキューも配信
    while True:
        msg = redis.lpop(f"offline:{user_id}")
        if not msg:
            break
        await ws.send_json(json.loads(msg))

まとめ

課題解決策CS概念
サーバー → クライアントのプッシュWebSocket(双方向永続接続)全二重通信
複数サーバー間の配信Redis Pub/Sub + 接続レジストリメッセージブローカー
メッセージ順序の保証会話単位のシーケンス番号単調増加カウンター
グループチャットの配信Fan-out + 非同期キューCh.10 と同じ構造
サーバーに内容を見せないE2E暗号化(Signal Protocol)DH鍵交換 + AES-CBC + HMAC
鍵漏洩時の過去メッセージ保護Double Ratchet(メッセージごとに鍵更新)Forward Secrecy
オフライン復帰時の同期未配信キュー + シーケンスベース差分同期イベントソーシング