Stream 設計の意思決定
Stream の本質を踏まえて、実際の設計の 5 つの意思決定を扱う。
判断 1:イベント粒度 ─ Fine-grained vs Coarse
「1 event に何を含めるか」が最初の判断。
Fine-grained(細かい)
// 各操作を個別 event に
{ type: 'OrderItemAdded', orderId: 1, itemId: 'A', quantity: 2 }
{ type: 'OrderItemAdded', orderId: 1, itemId: 'B', quantity: 1 }
{ type: 'OrderShippingSet', orderId: 1, address: { ... } }
{ type: 'OrderPaid', orderId: 1, paymentId: 'p1' }
pros: 監査が詳細、event sourcing と相性◎、変更履歴が完全 cons: 数が多い、consumer が多い event を扱う必要
Coarse-grained(粗い)
// 1 つの "完了" を 1 event に
{ type: 'OrderPlaced', orderId: 1, items: [...], shipping: {...}, payment: {...} }
pros: consumer が単純、event 数が少ない cons: 何が変わったか分かりにくい、内部状態が外に漏れやすい
判断軸
| 場面 | 粒度 |
|---|---|
| Event Sourcing で aggregate を再構築 | Fine |
| 他 Bounded Context への “通知” | Coarse |
| 監査ログ用途 | Fine |
| OLAP / 分析向け | Coarse(fact event) |
Adam Bellemare『Building Event-Driven Microservices』 の指針:
public events は coarse、private events は fine。
つまり Bounded Context の境界を超える event は coarse(事実の通知)、内部の event sourcing は fine(詳細な変化)。これは DDD の Aggregate 内外の区別と整合する。
判断 2:Partition Key 設計
第 12 章で「順序保証は partition 内のみ」と書いた。何を同一 partition に集めるかは重要な設計判断。
同一 Aggregate を同 partition に
// Aggregate ID で partition
const partitionKey = orderId; // 同じ Order の event は同 partition
// → Order に対する全 event が順序保証される
// → 異なる Order 間は並列処理可能
これが最も無難な選択。Aggregate の整合性境界がそのまま partition key になる。
Tenant 単位
// Multi-tenant SaaS
const partitionKey = tenantId;
// → 同一 tenant の全 event が順序保証
// → tenant 横断で並列処理
注意: tenant 単位だと大きな tenant が hot partition になる可能性。これは第 16 章 共通基盤の Hot tenant 対策にも関連。
順序保証が要らない場合
const partitionKey = null; // ランダム配分
// → 最大の並列度
// → 順序は保証されない
ログのような「順序が要らない、流量が多い」用途。
Partition 数の決定
- Consumer 並列度の上限 = Partition 数
- 後から増やすのは可能だが、partition 間の hash 再配置を考慮する必要あり
- 最初は将来の並列度の 2-3 倍 を見越して多めに
1 consumer = 1 partition でしか並列化できない
→ 24 partition なら最大 24 並列
判断 3:Schema 進化
5 年前の event を、今のコードで読めるか。event は immutable なので、schema の互換性が極めて重要。
Avro / Protobuf を使う
JSON は “schema-less” に見えるが、実は暗黙の schema を持つ。これが進化を難しくする。Avro / Protobuf は schema を明示 し、互換性を機械的に検証できる。
// proto の例
message OrderPlaced {
string order_id = 1;
int64 user_id = 2;
repeated OrderItem items = 3;
// 後から追加された field
optional string promo_code = 4;
}
互換性の種類
| 種類 | 意味 |
|---|---|
| Backward | 新 schema で書いた event を古い schema で読める |
| Forward | 古い schema で書いた event を新 schema で読める |
| Full | 両方 |
| None | 互換性なし(避ける) |
実用上は Backward + Forward = Full を目指す。
進化のルール
| 操作 | Backward | Forward |
|---|---|---|
| Field 追加(optional) | OK | OK |
| Field 削除(optional) | OK | OK(新 reader が無視) |
| Field 追加(required) | NG | OK |
| Field 削除(required) | OK | NG |
| 型変更 | 多くは NG | 多くは NG |
| Field 名変更 | NG | NG |
ルール:field の追加・削除は optional に、型変更は禁止、名前変更は禁止。
Schema Registry の活用
Confluent Schema Registry のようなツールで、schema を中央管理。互換性を機械的に検証する。
判断 4:Retention 戦略
第 12 章で 5 つの選択肢を出した。何を選ぶかは用途で決まる。
用途別の選択
| 用途 | Retention |
|---|---|
| マイクロサービス間の通知 | 時間(7 日 - 30 日) |
| 監査ログ | Forever or 法定期間 |
| CDC(DB 同期) | Compaction(key 最新) |
| Event Sourcing の Aggregate | Forever(または Snapshot + 部分 retention) |
| メトリクス・テレメトリ | 時間(数日) |
| 設定配信 | Compaction |
Tiered Storage で Forever を実現
Kafka 自身は近年 Tiered Storage をサポート(Confluent Cloud など)。Hot data は SSD、Cold data は S3。これで 無期限 retention が現実的に可能になった。
これは Pat Helland の Outside data の考え方がインフラとして実装された例。
判断 5:Outbox Pattern ─ DB と Stream の atomic な書き込み
最後に最重要のパターン。OLTP DB の更新と Stream への publish を atomic にする仕組み。
❌ アンチパターン:Dual Write
async function placeOrder(order: Order): Promise<void> {
// OLTP に保存
await db.save(order);
// ↓ ここでクラッシュしたら?
// → DB は更新されたが、event は publish されていない
await kafka.publish('orders', new OrderPlacedEvent(order));
}
Dual Write は本質的に壊れる(partial failure が常に起きる)。
✅ Outbox Pattern
async function placeOrder(order: Order): Promise<void> {
await db.transaction(async (tx) => {
// 1. Aggregate を保存
await tx.upsert('orders', orderToRow(order));
// 2. 同じ Trans で outbox にも書く
for (const event of order.events) {
await tx.insert('outbox_events', {
id: uuidv7(),
aggregate_id: order.id,
type: event.type,
payload: JSON.stringify(event),
published_at: null,
});
}
});
// 別プロセス(Outbox Publisher)が outbox を読んで Kafka に publish
}
ポイント:
- DB 更新と event 記録が同一 Trans で atomic
- 別プロセスが
outbox_eventsを読んで Kafka に publish - publish 成功したら
published_atを更新(or 行を削除) - At-least-once 配信(同じ event が複数回流れる可能性 → consumer 側で idempotent に)
sequenceDiagram
participant App as Application
participant DB as DB
participant Pub as Outbox Publisher
participant K as Kafka
App->>DB: TX: orders + outbox_events INSERT
DB-->>App: OK
Note over Pub: ポーリング or CDC
Pub->>DB: SELECT FROM outbox_events WHERE published_at IS NULL
DB-->>Pub: events
Pub->>K: publish
K-->>Pub: ACK
Pub->>DB: UPDATE outbox_events SET published_at = NOW()
CDC を使う変形
Outbox publisher の代わりに CDC (Change Data Capture) ツール(Debezium 等)で、outbox_events テーブルの変更を Kafka に流す。さらに ASE:OLTP テーブル自体の WAL を Kafka に流す ことも可能。
このパターンは「OLTP(Inside data)から Stream(Outside data)への橋渡し」の標準実装になっている。
Stream 設計の意思決定マトリクス
graph TB
Q1[イベント粒度] --> A1[Public は coarse / Private は fine]
Q2[Partition Key] --> A2[Aggregate ID 推奨 / Hot 注意]
Q3[Schema] --> A3[Avro/Protobuf + Registry]
Q4[Retention] --> A4[用途で選択 / Tiered Storage]
Q5[Outbox] --> A5[Dual Write 禁止 / Outbox or CDC]
style Q1 fill:#e1f5ff
style Q2 fill:#e1f5ff
style Q3 fill:#e1f5ff
style Q4 fill:#e1f5ff
style Q5 fill:#e1f5ff
ドメインから見た Stream
Stream は Bounded Context 間の通信プロトコル として理想的。
- 疎結合:publisher は consumer を知らない
- 時間的疎結合:consumer が遅れても publisher は影響を受けない
- 重ね掛け可能:1 つの event を複数の consumer が処理
DDD の Domain Event が、技術的には Kafka topic として実装される。Aggregate の境界 = Stream の partition key という綺麗な対応関係がある。
この章の要点
- イベント粒度: public は coarse、private は fine
- Partition Key は Aggregate ID が無難。Hot partition に注意
- Schema は Avro / Protobuf + Registry で機械的に管理
- Retention は用途別、Tiered Storage で Forever も現実的
- Outbox Pattern が DB と Stream の atomic 書き込みの標準解
- Stream は Bounded Context 間通信プロトコルとして DDD と整合
次章への問いかけ
Stream の append-only と immutability はここまでの章で見てきた。だが「全リージョンで強整合な書き込み」を実現する DB がある。
次章は Geo-distributed / NewSQL の本質 ── TrueTime と Raft、原子時計のあるなしで分かれた 2 つのアプローチ。