通知システム(FCM/APNs)── 配信保証と優先度制御
扱うCS概念:プッシュ通知のプロトコルスタック、Fan-out、配信保証(At-Least-Once)、重複排除、優先度キュー、バックオフ戦略

この章で何ができるようになるか:プッシュ通知・メール・SMS を統合的に扱う通知システムの設計ができるようになる。「通知の重複」「通知の欠落」「通知疲れ」をそれぞれどう防ぐかを説明できる。
問題設定
EC サイトの通知基盤を設計する。
通知チャネル:
- プッシュ通知(iOS: APNs, Android: FCM)
- メール(SendGrid / SES)
- SMS(Twilio)
- アプリ内通知(WebSocket)
通知種別と要件:
注文確認:即時・プッシュ+メール・絶対に欠落不可
配送ステータス:即時・プッシュ・重要
おすすめ商品:バッチ・メール・欠落許容
セキュリティアラート:即時・全チャネル・最優先
規模:
1日5億件の通知リクエスト
プッシュ通知のレイテンシ:5秒以内
アーキテクチャ全体
graph TD
Service1[注文サービス] --> API[通知 API]
Service2[配送サービス] --> API
Service3[マーケティング] --> API
API --> Validate[バリデーション<br/>・ユーザー設定確認<br/>・重複排除<br/>・レート制限]
Validate --> Priority[優先度キュー]
Priority --> |HIGH| WorkerH[ワーカー群<br/>高優先度]
Priority --> |MEDIUM| WorkerM[ワーカー群<br/>中優先度]
Priority --> |LOW| WorkerL[ワーカー群<br/>低優先度]
WorkerH & WorkerM & WorkerL --> Router{チャネルルーター}
Router --> Push[プッシュ通知<br/>FCM / APNs]
Router --> Email[メール<br/>SendGrid / SES]
Router --> SMS[SMS<br/>Twilio]
Router --> InApp[アプリ内<br/>WebSocket]
重複排除(Deduplication)
「同じ通知が2回届く」はユーザー体験を著しく損なう。
class NotificationService:
def send(self, notification: dict) -> bool:
# 冪等性キーで重複チェック
dedup_key = f"notif_dedup:{notification['idempotency_key']}"
if redis.get(dedup_key):
return False # 既に処理済み
# 処理中フラグ(TTL=60秒)
if not redis.set(dedup_key, "processing", nx=True, ex=60):
return False # 並行リクエスト
try:
self._deliver(notification)
# 成功 → 7日間重複排除を維持
redis.setex(dedup_key, 604800, "sent")
return True
except Exception:
redis.delete(dedup_key)
raise
重複が起きる原因:
- プロデューサーのリトライ(ACK がタイムアウトしたが実は成功していた)
- Kafka の At-Least-Once セマンティクス
- ユーザーのダブルタップ(購入ボタン2回押し)
優先度キュー
セキュリティアラートと「おすすめ商品」を同じキューで処理すると、大量のマーケティング通知がセキュリティアラートの配信を遅延させる。
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # セキュリティアラート、2FA
HIGH = 1 # 注文確認、配送通知
MEDIUM = 2 # コメント返信、いいね
LOW = 3 # マーケティング、おすすめ
class PriorityNotificationQueue:
"""優先度別に Kafka トピックを分離"""
TOPIC_MAP = {
Priority.CRITICAL: "notifications.critical",
Priority.HIGH: "notifications.high",
Priority.MEDIUM: "notifications.medium",
Priority.LOW: "notifications.low",
}
def enqueue(self, notification: dict, priority: Priority):
topic = self.TOPIC_MAP[priority]
kafka_producer.send(topic, value=json.dumps(notification))
# ワーカーの割り当て:
# CRITICAL: 専用ワーカー 10台(常に即座に処理)
# HIGH: ワーカー 20台
# MEDIUM: ワーカー 10台
# LOW: ワーカー 5台(遅延許容)
プッシュ通知の配信フロー
iOS(APNs)
import jwt
import httpx
import time
class APNsSender:
def __init__(self, key_id: str, team_id: str, private_key: str, bundle_id: str):
self.key_id = key_id
self.team_id = team_id
self.private_key = private_key
self.bundle_id = bundle_id
def _generate_token(self) -> str:
headers = {"alg": "ES256", "kid": self.key_id}
payload = {"iss": self.team_id, "iat": int(time.time())}
return jwt.encode(payload, self.private_key, algorithm="ES256", headers=headers)
async def send(self, device_token: str, title: str, body: str,
data: dict = None) -> bool:
url = f"https://api.push.apple.com/3/device/{device_token}"
payload = {
"aps": {
"alert": {"title": title, "body": body},
"sound": "default",
"badge": 1,
},
"data": data or {},
}
async with httpx.AsyncClient(http2=True) as client:
resp = await client.post(
url,
json=payload,
headers={
"authorization": f"bearer {self._generate_token()}",
"apns-topic": self.bundle_id,
"apns-priority": "10", # 即時配信
"apns-push-type": "alert",
},
)
if resp.status_code == 410:
# デバイストークンが無効(アプリ削除)→ DBから削除
await self._invalidate_token(device_token)
return False
return resp.status_code == 200
Android(FCM)
class FCMSender:
async def send(self, device_token: str, title: str, body: str,
data: dict = None) -> bool:
url = "https://fcm.googleapis.com/v1/projects/my-project/messages:send"
message = {
"message": {
"token": device_token,
"notification": {"title": title, "body": body},
"data": data or {},
"android": {
"priority": "high",
"notification": {"channel_id": "default"},
},
}
}
async with httpx.AsyncClient() as client:
resp = await client.post(
url, json=message,
headers={"Authorization": f"Bearer {self._get_access_token()}"},
)
return resp.status_code == 200
通知疲れ(Notification Fatigue)対策
「通知が多すぎてユーザーが通知を無効化する」のは本末転倒だ。
class NotificationThrottler:
"""ユーザーごとの通知レート制限"""
LIMITS = {
Priority.CRITICAL: None, # 制限なし
Priority.HIGH: (20, 3600), # 1時間に20件まで
Priority.MEDIUM: (10, 3600), # 1時間に10件まで
Priority.LOW: (3, 86400), # 1日3件まで
}
def should_send(self, user_id: str, priority: Priority) -> bool:
limit = self.LIMITS.get(priority)
if limit is None:
return True
max_count, window = limit
key = f"notif_throttle:{user_id}:{priority}:{int(time.time()) // window}"
current = redis.incr(key)
if current == 1:
redis.expire(key, window)
return current <= max_count
class NotificationAggregator:
"""同種の通知をまとめる(「5件の新しいいいね」)"""
async def aggregate(self, user_id: str, notification: dict):
agg_key = f"notif_agg:{user_id}:{notification['type']}"
redis.rpush(agg_key, json.dumps(notification))
redis.expire(agg_key, 300) # 5分間集約
count = redis.llen(agg_key)
if count == 1:
# 5分後にまとめて送信
await schedule_delayed_send(user_id, notification["type"], delay=300)
async def flush(self, user_id: str, notification_type: str):
agg_key = f"notif_agg:{user_id}:{notification_type}"
items = redis.lrange(agg_key, 0, -1)
redis.delete(agg_key)
if len(items) == 1:
await send_single(json.loads(items[0]))
elif len(items) > 1:
# 集約通知を送信
await send_aggregated(user_id, notification_type, len(items))
# 「田中さん他4人があなたの投稿にいいねしました」
リトライ戦略(指数バックオフ)
外部サービス(FCM/SendGrid/Twilio)への送信失敗時のリトライ。
async def send_with_retry(send_fn, notification: dict,
max_retries: int = 5, base_delay: float = 1.0):
for attempt in range(max_retries):
try:
return await send_fn(notification)
except (ConnectionError, TimeoutError) as e:
if attempt == max_retries - 1:
# 最終リトライも失敗 → Dead Letter Queue へ
await dead_letter_queue.push(notification)
raise
# 指数バックオフ + ジッター
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
# 1s, 2s, 4s, 8s, 16s + ランダム
await asyncio.sleep(delay)
なぜジッターを入れるか:全ワーカーが同じタイミングでリトライすると、外部サービスに再びスパイクが起きる(Thundering Herd)。ランダムなジッターで分散させる。
まとめ
| 課題 | 解決策 | 設計原則 |
|---|---|---|
| 通知の重複 | 冪等性キー + Redis 重複排除 | At-Most-Once 配信 |
| 通知の欠落 | Kafka + リトライ + DLQ | At-Least-Once 配信 |
| 優先度の管理 | トピック分離 + ワーカー配分 | 優先度キュー |
| 通知疲れ | レート制限 + 集約 | ユーザー体験の保護 |
| 外部サービス障害 | 指数バックオフ + ジッター | 復元力(Resilience) |
| デバイストークン管理 | 410応答で自動無効化 | キャッシュの鮮度管理 |