分散トランザクション・決済(Stripe)── Saga パターンと冪等性
扱うCS概念:2PC(二相コミット)、Sagaパターン、冪等性(Idempotency)、補償トランザクション、Outbox パターン

この章で何ができるようになるか:マイクロサービス間でまたがるトランザクションをどう設計するかを説明できるようになる。決済システムの「二重請求しない・取りこぼさない」を実現するための設計パターンを理解できる。
問題設定
EC サイトの「注文確定」を設計する。
1. 在庫サービス:商品の在庫を1減らす
2. 決済サービス:クレジットカードに課金する
3. 注文サービス:注文レコードを作成する
4. 通知サービス:メール/プッシュ通知を送る
これら4つは別々のマイクロサービスで、別々のデータベースを持つ。
全て成功したら「注文完了」。どれか失敗したら「全てロールバック」したい。
なぜ難しいか:単一のRDBMSなら BEGIN TRANSACTION ... COMMIT で解決できる。しかし4つのサービスが別々のDBを持つ分散環境では、ACID トランザクションが使えない。
素朴な解法:2PC(Two-Phase Commit、二相コミット)
sequenceDiagram
participant C as コーディネーター
participant I as 在庫サービス
participant P as 決済サービス
participant O as 注文サービス
Note over C,O: フェーズ1:Prepare
C->>I: PREPARE(在庫を仮押さえせよ)
C->>P: PREPARE(決済を仮押さえせよ)
C->>O: PREPARE(注文を仮作成せよ)
I-->>C: READY
P-->>C: READY
O-->>C: READY
Note over C,O: フェーズ2:Commit
C->>I: COMMIT
C->>P: COMMIT
C->>O: COMMIT
2PC の問題点:
-
ブロッキング:フェーズ1で READY を返した参加者は、フェーズ2のコミットが来るまでリソースをロックし続ける。コーディネーターがクラッシュするとデッドロック。
-
可用性の低下:全参加者が同時に生きていないとコミットできない。「A は OK、B がタイムアウト」で全体が止まる。
-
マイクロサービスには不向き:各サービスが独立して開発・デプロイされる設計思想と、2PC の「全員が協調する」設計は相性が悪い。
実際の大規模分散システム(Stripe、Amazon、Netflix)では 2PC を使わない。代わりに Saga パターンを使う。
Saga パターン:補償トランザクションで整合性を保つ
Saga はトランザクションを「一連のローカルトランザクション」に分解し、失敗時は「補償トランザクション(undo 操作)」で元に戻す。
注文フロー(Saga):
成功パス:
1. 在庫サービス:在庫を1減らす
2. 決済サービス:課金する
3. 注文サービス:注文レコードを作成する
4. 通知サービス:メールを送る → 完了
失敗パス(決済サービスが失敗した場合):
1. 在庫サービス:在庫を1減らす ✅
2. 決済サービス:課金する ❌ → 失敗!
→ 補償トランザクション:在庫サービスの在庫を1戻す(逆方向に実行)
# Saga の実装(コレオグラフィー方式:イベントドリブン)
# 在庫サービス
async def on_order_created(event: OrderCreatedEvent):
try:
# ローカルトランザクション
await db.execute("UPDATE stock SET quantity = quantity - 1 WHERE product_id = ?",
event.product_id)
# 成功イベントを発行 → 次のサービスへ
await kafka.publish("stock.reserved", StockReservedEvent(
order_id=event.order_id,
product_id=event.product_id
))
except InsufficientStockError:
# 失敗イベントを発行 → 補償が必要
await kafka.publish("order.failed", OrderFailedEvent(
order_id=event.order_id,
reason="insufficient_stock"
))
# 決済サービス
async def on_stock_reserved(event: StockReservedEvent):
try:
charge = await stripe.create_charge(
amount=event.amount,
customer_id=event.customer_id
)
await kafka.publish("payment.completed", PaymentCompletedEvent(...))
except StripeError as e:
await kafka.publish("payment.failed", PaymentFailedEvent(
order_id=event.order_id,
reason=str(e)
))
# 補償ハンドラー:決済失敗 → 在庫を戻す
async def on_payment_failed(event: PaymentFailedEvent):
# 補償トランザクション:在庫を元に戻す
await db.execute("UPDATE stock SET quantity = quantity + 1 WHERE product_id = ?",
event.product_id)
await kafka.publish("stock.released", StockReleasedEvent(...))
graph LR
OrderCreated[注文作成] --> ReserveStock[在庫確保]
ReserveStock -->|成功| ProcessPayment[決済処理]
ReserveStock -->|失敗| CompensateOrder[注文キャンセル]
ProcessPayment -->|成功| CreateOrder[注文確定]
ProcessPayment -->|失敗| ReleaseStock[在庫解放(補償)]
ReleaseStock --> CompensateOrder
CreateOrder --> SendNotification[通知送信]
冪等性(Idempotency):同じ操作を何度実行しても結果が同じ
ネットワーク障害でリクエストがタイムアウトしたとき、「決済が完了したかどうかわからない」状態になる。再送すると二重請求になる可能性がある。
**冪等性キー(Idempotency Key)**でこれを解決する。
# クライアント側:冪等性キーを生成して送る
import uuid
idempotency_key = str(uuid.uuid4())
response = requests.post("/api/payments",
headers={"Idempotency-Key": idempotency_key},
json={"amount": 5000, "currency": "JPY", "customer_id": "cus_123"}
)
# タイムアウトした場合、同じキーで再送しても安全
response = requests.post("/api/payments",
headers={"Idempotency-Key": idempotency_key}, # 同じキー
json={"amount": 5000, "currency": "JPY", "customer_id": "cus_123"}
)
# → 最初のリクエストの結果を返す(再実行しない)
# サーバー側:冪等性の保証
async def create_payment(request: PaymentRequest, idempotency_key: str):
# 1. このキーで既に処理済みか確認
cached = await redis.get(f"idempotency:{idempotency_key}")
if cached:
return json.loads(cached) # 前回の結果をそのまま返す
# 2. 処理中フラグを立てる(他のリクエストが同じキーで来ても待機させる)
acquired = await redis.set(
f"idempotency_lock:{idempotency_key}", "processing",
nx=True, ex=30 # 30秒ロック
)
if not acquired:
# 同じキーの別リクエストが処理中 → 少し待ってリトライ
raise RetryAfterError()
try:
# 3. 実際の決済処理
result = await stripe.charge(request.amount, request.customer_id)
response = {"payment_id": result.id, "status": "succeeded"}
# 4. 結果をキャッシュ(24時間)
await redis.setex(
f"idempotency:{idempotency_key}",
86400,
json.dumps(response)
)
return response
finally:
await redis.delete(f"idempotency_lock:{idempotency_key}")
Stripe の実装:Stripe API は全てのリクエストで Idempotency-Key ヘッダーをサポートする。同じキーのリクエストは24時間、同一の結果を返す(24時間経過後にキーは自動削除される)。
Outbox パターン:「DBへの書き込み」と「イベント発行」を原子的に行う
Saga の中で難しいのは「DBに書き込んだ後、Kafka にイベントを発行する」という操作だ。
# ❌ 問題のある実装
async def reserve_stock(order_id: str, product_id: str):
await db.execute("UPDATE stock SET quantity = quantity - 1 ...") # 成功
# ↑ ここでクラッシュしたら?
await kafka.publish("stock.reserved", event) # 実行されない!
# → DB は更新されたが、次のサービスには何も届かない
Outbox パターンは、イベントを「まず同じDBに書く」ことで原子性を保証する。
# ✅ Outbox パターン
async def reserve_stock(order_id: str, product_id: str):
async with db.transaction(): # 1つのトランザクション内で
# ドメインの変更
await db.execute("UPDATE stock SET quantity = quantity - 1 ...")
# イベントを outbox テーブルに書く(Kafka にはまだ送らない)
await db.execute("""
INSERT INTO outbox (event_type, payload, status)
VALUES ('stock.reserved', ?, 'pending')
""", json.dumps({"order_id": order_id, "product_id": product_id}))
# トランザクションここまで。どちらかが失敗したら両方ロールバック
# 別プロセス:Outbox ポーラーが pending イベントを Kafka に送る
async def outbox_poller():
while True:
events = await db.query(
"SELECT * FROM outbox WHERE status = 'pending' ORDER BY created_at LIMIT 100"
)
for event in events:
await kafka.publish(event.event_type, json.loads(event.payload))
await db.execute("UPDATE outbox SET status = 'sent' WHERE id = ?", event.id)
await asyncio.sleep(0.1) # 100ms ポーリング
sequenceDiagram
participant App
participant DB as DB(ドメイン + Outbox)
participant Poller as Outboxポーラー
participant Kafka
App->>DB: BEGIN TRANSACTION
App->>DB: UPDATE stock(在庫変更)
App->>DB: INSERT outbox(イベント書き込み)
App->>DB: COMMIT
Note over App,DB: 原子的に完了(両方か、なしか)
Poller->>DB: SELECT pending events
DB-->>Poller: [stock.reserved イベント]
Poller->>Kafka: publish("stock.reserved")
Poller->>DB: UPDATE outbox SET status='sent'
Outbox ポーラー vs CDC(Change Data Capture):
ポーリングより高効率な代替として、PostgreSQL の WAL を CDC(Debezium 等)で監視し、outbox テーブルの変更を直接 Kafka に流す方式もある。これだとポーリング間隔なしにほぼリアルタイムで配信できる。
決済システムの状態機械
決済は「作成→処理中→成功/失敗」という状態遷移を持つ。状態機械として設計することで不正な遷移を防げる。
from enum import Enum
class PaymentStatus(Enum):
CREATED = "created"
PROCESSING = "processing"
SUCCEEDED = "succeeded"
FAILED = "failed"
REFUNDED = "refunded"
VALID_TRANSITIONS = {
PaymentStatus.CREATED: [PaymentStatus.PROCESSING],
PaymentStatus.PROCESSING: [PaymentStatus.SUCCEEDED, PaymentStatus.FAILED],
PaymentStatus.SUCCEEDED: [PaymentStatus.REFUNDED],
PaymentStatus.FAILED: [], # 終端状態
PaymentStatus.REFUNDED: [], # 終端状態
}
async def transition_payment(payment_id: str, new_status: PaymentStatus):
payment = await db.get_payment(payment_id)
current = PaymentStatus(payment.status)
if new_status not in VALID_TRANSITIONS[current]:
raise InvalidStateTransitionError(
f"Cannot transition from {current} to {new_status}"
)
# 楽観的ロック:他のプロセスが同時に状態を変えていないか確認
updated = await db.execute("""
UPDATE payments
SET status = ?, version = version + 1
WHERE id = ? AND version = ? AND status = ?
""", new_status.value, payment_id, payment.version, current.value)
if updated == 0:
raise ConcurrentModificationError("Payment was modified by another process")
まとめ
| 課題 | 解決策 | CS概念 |
|---|---|---|
| 分散サービス間のトランザクション | Saga パターン(補償TX) | 結果整合性 |
| ネットワーク障害による二重実行 | 冪等性キー | Idempotency |
| DB書き込みとイベント発行の原子性 | Outbox パターン | WAL + CDC |
| 不正な状態遷移の防止 | 状態機械 + 楽観的ロック | 有限状態機械 |
| 全参加者の同期的協調 | 2PC → 使わない(可用性低下) | CAP定理 |
決済システムの設計は「正確性」と「可用性」を両立するための工夫の塊だ。「絶対に二重請求しない」「絶対に取りこぼさない」を同時に実現するには、冪等性・補償トランザクション・状態機械の組み合わせが必要になる。