目次を表示する

システム設計とCS概念

通知システム(FCM/APNs)── 配信保証と優先度制御

通知システム(FCM/APNs)── 配信保証と優先度制御

扱うCS概念:プッシュ通知のプロトコルスタック、Fan-out、配信保証(At-Least-Once)、重複排除、優先度キュー、バックオフ戦略


通知システム — 優先度キュー・重複排除・集約

この章で何ができるようになるか:プッシュ通知・メール・SMS を統合的に扱う通知システムの設計ができるようになる。「通知の重複」「通知の欠落」「通知疲れ」をそれぞれどう防ぐかを説明できる。


問題設定

EC サイトの通知基盤を設計する。

通知チャネル:
  - プッシュ通知(iOS: APNs, Android: FCM)
  - メール(SendGrid / SES)
  - SMS(Twilio)
  - アプリ内通知(WebSocket)

通知種別と要件:
  注文確認:即時・プッシュ+メール・絶対に欠落不可
  配送ステータス:即時・プッシュ・重要
  おすすめ商品:バッチ・メール・欠落許容
  セキュリティアラート:即時・全チャネル・最優先

規模:
  1日5億件の通知リクエスト
  プッシュ通知のレイテンシ:5秒以内

アーキテクチャ全体

graph TD
    Service1[注文サービス] --> API[通知 API]
    Service2[配送サービス] --> API
    Service3[マーケティング] --> API

    API --> Validate[バリデーション<br/>・ユーザー設定確認<br/>・重複排除<br/>・レート制限]
    Validate --> Priority[優先度キュー]

    Priority --> |HIGH| WorkerH[ワーカー群<br/>高優先度]
    Priority --> |MEDIUM| WorkerM[ワーカー群<br/>中優先度]
    Priority --> |LOW| WorkerL[ワーカー群<br/>低優先度]

    WorkerH & WorkerM & WorkerL --> Router{チャネルルーター}
    Router --> Push[プッシュ通知<br/>FCM / APNs]
    Router --> Email[メール<br/>SendGrid / SES]
    Router --> SMS[SMS<br/>Twilio]
    Router --> InApp[アプリ内<br/>WebSocket]

重複排除(Deduplication)

「同じ通知が2回届く」はユーザー体験を著しく損なう。

class NotificationService:
    def send(self, notification: dict) -> bool:
        # 冪等性キーで重複チェック
        dedup_key = f"notif_dedup:{notification['idempotency_key']}"

        if redis.get(dedup_key):
            return False  # 既に処理済み

        # 処理中フラグ(TTL=60秒)
        if not redis.set(dedup_key, "processing", nx=True, ex=60):
            return False  # 並行リクエスト

        try:
            self._deliver(notification)
            # 成功 → 7日間重複排除を維持
            redis.setex(dedup_key, 604800, "sent")
            return True
        except Exception:
            redis.delete(dedup_key)
            raise

重複が起きる原因

  • プロデューサーのリトライ(ACK がタイムアウトしたが実は成功していた)
  • Kafka の At-Least-Once セマンティクス
  • ユーザーのダブルタップ(購入ボタン2回押し)

優先度キュー

セキュリティアラートと「おすすめ商品」を同じキューで処理すると、大量のマーケティング通知がセキュリティアラートの配信を遅延させる。

from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 0  # セキュリティアラート、2FA
    HIGH = 1      # 注文確認、配送通知
    MEDIUM = 2    # コメント返信、いいね
    LOW = 3       # マーケティング、おすすめ

class PriorityNotificationQueue:
    """優先度別に Kafka トピックを分離"""

    TOPIC_MAP = {
        Priority.CRITICAL: "notifications.critical",
        Priority.HIGH:     "notifications.high",
        Priority.MEDIUM:   "notifications.medium",
        Priority.LOW:      "notifications.low",
    }

    def enqueue(self, notification: dict, priority: Priority):
        topic = self.TOPIC_MAP[priority]
        kafka_producer.send(topic, value=json.dumps(notification))

    # ワーカーの割り当て:
    # CRITICAL: 専用ワーカー 10台(常に即座に処理)
    # HIGH:     ワーカー 20台
    # MEDIUM:   ワーカー 10台
    # LOW:      ワーカー 5台(遅延許容)

プッシュ通知の配信フロー

iOS(APNs)

import jwt
import httpx
import time

class APNsSender:
    def __init__(self, key_id: str, team_id: str, private_key: str, bundle_id: str):
        self.key_id = key_id
        self.team_id = team_id
        self.private_key = private_key
        self.bundle_id = bundle_id

    def _generate_token(self) -> str:
        headers = {"alg": "ES256", "kid": self.key_id}
        payload = {"iss": self.team_id, "iat": int(time.time())}
        return jwt.encode(payload, self.private_key, algorithm="ES256", headers=headers)

    async def send(self, device_token: str, title: str, body: str,
                   data: dict = None) -> bool:
        url = f"https://api.push.apple.com/3/device/{device_token}"
        payload = {
            "aps": {
                "alert": {"title": title, "body": body},
                "sound": "default",
                "badge": 1,
            },
            "data": data or {},
        }

        async with httpx.AsyncClient(http2=True) as client:
            resp = await client.post(
                url,
                json=payload,
                headers={
                    "authorization": f"bearer {self._generate_token()}",
                    "apns-topic": self.bundle_id,
                    "apns-priority": "10",  # 即時配信
                    "apns-push-type": "alert",
                },
            )

            if resp.status_code == 410:
                # デバイストークンが無効(アプリ削除)→ DBから削除
                await self._invalidate_token(device_token)
                return False

            return resp.status_code == 200

Android(FCM)

class FCMSender:
    async def send(self, device_token: str, title: str, body: str,
                   data: dict = None) -> bool:
        url = "https://fcm.googleapis.com/v1/projects/my-project/messages:send"
        message = {
            "message": {
                "token": device_token,
                "notification": {"title": title, "body": body},
                "data": data or {},
                "android": {
                    "priority": "high",
                    "notification": {"channel_id": "default"},
                },
            }
        }

        async with httpx.AsyncClient() as client:
            resp = await client.post(
                url, json=message,
                headers={"Authorization": f"Bearer {self._get_access_token()}"},
            )
            return resp.status_code == 200

通知疲れ(Notification Fatigue)対策

「通知が多すぎてユーザーが通知を無効化する」のは本末転倒だ。

class NotificationThrottler:
    """ユーザーごとの通知レート制限"""

    LIMITS = {
        Priority.CRITICAL: None,       # 制限なし
        Priority.HIGH:     (20, 3600), # 1時間に20件まで
        Priority.MEDIUM:   (10, 3600), # 1時間に10件まで
        Priority.LOW:      (3, 86400), # 1日3件まで
    }

    def should_send(self, user_id: str, priority: Priority) -> bool:
        limit = self.LIMITS.get(priority)
        if limit is None:
            return True

        max_count, window = limit
        key = f"notif_throttle:{user_id}:{priority}:{int(time.time()) // window}"
        current = redis.incr(key)

        if current == 1:
            redis.expire(key, window)

        return current <= max_count

class NotificationAggregator:
    """同種の通知をまとめる(「5件の新しいいいね」)"""

    async def aggregate(self, user_id: str, notification: dict):
        agg_key = f"notif_agg:{user_id}:{notification['type']}"
        redis.rpush(agg_key, json.dumps(notification))
        redis.expire(agg_key, 300)  # 5分間集約

        count = redis.llen(agg_key)
        if count == 1:
            # 5分後にまとめて送信
            await schedule_delayed_send(user_id, notification["type"], delay=300)

    async def flush(self, user_id: str, notification_type: str):
        agg_key = f"notif_agg:{user_id}:{notification_type}"
        items = redis.lrange(agg_key, 0, -1)
        redis.delete(agg_key)

        if len(items) == 1:
            await send_single(json.loads(items[0]))
        elif len(items) > 1:
            # 集約通知を送信
            await send_aggregated(user_id, notification_type, len(items))
            # 「田中さん他4人があなたの投稿にいいねしました」

リトライ戦略(指数バックオフ)

外部サービス(FCM/SendGrid/Twilio)への送信失敗時のリトライ。

async def send_with_retry(send_fn, notification: dict,
                           max_retries: int = 5, base_delay: float = 1.0):
    for attempt in range(max_retries):
        try:
            return await send_fn(notification)
        except (ConnectionError, TimeoutError) as e:
            if attempt == max_retries - 1:
                # 最終リトライも失敗 → Dead Letter Queue へ
                await dead_letter_queue.push(notification)
                raise

            # 指数バックオフ + ジッター
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            # 1s, 2s, 4s, 8s, 16s + ランダム
            await asyncio.sleep(delay)

なぜジッターを入れるか:全ワーカーが同じタイミングでリトライすると、外部サービスに再びスパイクが起きる(Thundering Herd)。ランダムなジッターで分散させる。


まとめ

課題解決策設計原則
通知の重複冪等性キー + Redis 重複排除At-Most-Once 配信
通知の欠落Kafka + リトライ + DLQAt-Least-Once 配信
優先度の管理トピック分離 + ワーカー配分優先度キュー
通知疲れレート制限 + 集約ユーザー体験の保護
外部サービス障害指数バックオフ + ジッター復元力(Resilience)
デバイストークン管理410応答で自動無効化キャッシュの鮮度管理