必要不可欠な5つの要素 ── Event / Stream / Store / Projection / Snapshot
Event Sourcing を構成する要素は意外と少ない。5つだけ だ。
1. Event ── 起きた事実を表す不変オブジェクト
2. Stream ── 1つの集約に紐づくイベントの時系列
3. Event Store ── ストリームを永続化する追記専用ログ
4. Projection ── ストリームから読み取り用ビューを派生させる仕組み
5. Snapshot ── 長いストリームの再生コストを下げる最適化
これらの責務を正しく分けることが、Event Sourcing 実装の品質を決める。本章では、採用管理システムの ScreeningProcess(選考プロセス)を題材に、各要素の責務と相互作用を見ていく。
全体図:5要素の関係
先に全体像を示しておく。
graph LR
subgraph 書き込み側
A["コマンド\n(AdvanceScreening)"] --> B["集約\n(ScreeningProcess)"]
B -->|発行| C["Event\n(FirstInterviewCompleted)"]
C -->|追記| D[("Event Store\n(append-only)")]
end
D -->|"特定の集約のStream"| B
D -->|"全Stream / 特定型のEvent"| E["Projection\n(投影プロセス)"]
E --> F[("Read Model\n(非正規化ビュー)")]
F --> G["クエリ"]
D -.->|"再生コスト削減"| H["Snapshot Store"]
H -.-> B
style D fill:#fff3e0,stroke:#ff9800
style F fill:#f0fff4,stroke:#99ddaa
style H fill:#f0e6ff,stroke:#9966cc
書き込みは「集約 → イベント → ストア」の一方通行。読み取りは「ストア → プロジェクション → リードモデル」の派生経路。集約の復元時には「ストア → 集約」(必要なら Snapshot 経由)。この 3つのパス を区別することが Event Sourcing の核だ。
要素1:Event ── 起きた事実を表す不変オブジェクト
定義
イベントとは、「ドメインで過去に起きた事実を、不変のオブジェクトとして表したもの」 だ。3つの要件がある。
1. 過去形で命名する
❌ ScheduleFirstInterview(命令形:これは Command の名前)
✅ FirstInterviewScheduled(過去形:起きたことを表す)
2. 不変である
一度発行されたイベントの内容は決して変更しない
3. ドメイン語彙で表現される
技術用語("Updated"・"Modified")ではなく、業務上の意味のある名前
採用管理での具体例
// イベントの基底型
interface DomainEvent {
readonly eventId: string // イベント自体の一意ID
readonly eventType: string // イベントの型名("FirstInterviewCompleted" 等)
readonly aggregateId: string // どの集約のイベントか
readonly aggregateVersion: number // 集約内でのシーケンス番号
readonly occurredAt: Date // いつ起きたか
readonly metadata: EventMetadata // 操作者ID、リクエストID等
}
// 具体的なイベント
class FirstInterviewScheduled implements DomainEvent {
readonly eventType = 'FirstInterviewScheduled'
constructor(
readonly eventId: string,
readonly aggregateId: string,
readonly aggregateVersion: number,
readonly occurredAt: Date,
readonly metadata: EventMetadata,
// ── ドメイン固有のペイロード ──
readonly scheduledFor: Date,
readonly interviewerId: string,
readonly venue: 'online' | 'onsite',
) {}
}
class FirstInterviewCompleted implements DomainEvent { /* ... */ }
class FirstInterviewCancelled implements DomainEvent { /* ... */ }
class CandidateWithdrew implements DomainEvent { /* ... */ }
ペイロードに含めるべき情報は 「そのイベントが起きた瞬間に確定していた事実」 だ。「面接官は誰だったか」はそのイベント時点で確定している事実なので含める。一方で「現在の選考ステージ」は派生情報なので含めない(イベント列から計算される)。
イベントとコマンドの違い
混同しがちなので明示しておく。
Command(命令):
- 命令形で命名("AdvanceScreening", "ScheduleFirstInterview")
- 「これから何をしたい」という意図
- 拒否される可能性がある(バリデーションを通過しない場合)
Event(事実):
- 過去形で命名("FirstInterviewScheduled")
- 「すでに起きた事実」
- 拒否されることはない(既に起きたことだから)
集約は「コマンドを受けて、バリデーションを通したあとに、イベントを発行する」という流れになる。
要素2:Stream ── 1つの集約のイベント時系列
定義
Stream とは、「ある1つの集約に紐づくイベントの時系列」 だ。
Stream ID イベント列
──────────────────────────────────────────────────────────────────────
screening-process-sp-001 v1: ScreeningProcessStarted
v2: DocumentScreeningPassed
v3: FirstInterviewScheduled
v4: FirstInterviewCompleted
v5: SecondInterviewScheduled
...
screening-process-sp-002 v1: ScreeningProcessStarted
v2: DocumentScreeningRejected
...
各イベントは Stream 内で 連続したバージョン番号(aggregateVersion) を持つ。これが後述する楽観的並行性制御の鍵になる。
Stream ID の設計
Stream ID は集約の識別子と一対一で対応する。一般的な命名規則は <集約タイプ>-<集約ID> だ。
✅ 推奨:
screening-process-sp-001
candidate-c-1234
job-posting-j-5678
❌ 避けるべき:
events_2026_05 (日付や月で区切るのは Stream の本来の使い方ではない)
all_events (複数集約を混ぜるな。Stream は集約と一対一)
Stream 単位の読み書きが基本
Event Sourcing で最も多い操作は 「特定の Stream を全て読む」 だ。
// 集約を復元する(ここでは説明のため戻り値を配列としているが、
// 実装によっては AsyncIterable を返す。KurrentDB の TS クライアントは for-await で読む)
const events = await eventStore.readStream('screening-process-sp-001')
const process = ScreeningProcess.reconstitute(events)
この操作が高速で安定して動くことが、Event Store の設計上の最重要要件になる。
要素3:Event Store ── 追記専用ログ
責務
Event Store は次の責務を持つ。
1. Stream 単位での追記(append)
2. Stream 単位での読み取り(read by stream ID)
3. 全 Stream を横断した読み取り(read all / read by event type)
4. 楽観的並行性制御(expected version)
5. (任意)サブスクリプション機構
楽観的並行性制御 ── Event Sourcing の心臓部
複数のリクエストが同時に同じ集約に対してコマンドを実行したとき、どうやって整合性を保つか? Event Sourcing では 楽観的並行性制御(Optimistic Concurrency Control) を使う。
sequenceDiagram
participant U1 as ユーザーA
participant U2 as ユーザーB
participant ES as Event Store
Note over ES: Stream は v5 まで存在
U1->>ES: readStream(streamId)
ES-->>U1: events [v1..v5]
U2->>ES: readStream(streamId)
ES-->>U2: events [v1..v5]
Note over U1,U2: 両者ともv5を最新として認識
U1->>U1: コマンド実行<br/>イベント生成
U2->>U2: コマンド実行<br/>イベント生成
U1->>ES: append(expectedVersion=5)
ES-->>U1: ✅ v6として追記
Note over ES: Stream は v6 まで
U2->>ES: append(expectedVersion=5)
ES-->>U2: ❌ ConcurrencyException<br/>(現在は v6)
Note over U2: 読み直して再試行
// 楽観的並行性制御の擬似コード(説明用)
class EventStore {
async append(
streamId: string,
expectedVersion: number,
events: DomainEvent[],
): Promise<void> {
const currentVersion = await this.getCurrentVersion(streamId)
if (currentVersion !== expectedVersion) {
throw new ConcurrencyException(
`Expected v${expectedVersion} but stream is at v${currentVersion}`
)
}
await this.appendEvents(streamId, events, currentVersion + 1)
}
}
上記は説明用の擬似コードで、
getCurrentVersionとappendEventsの間に他のリクエストが書き込めば破綻する。実際の実装では 単一 SQL(条件付き INSERT)または DB の UNIQUE 制約 で原子的に行う。例えば PostgreSQL ならINSERT ... WHERE NOT EXISTSか(stream_id, stream_version)の UNIQUE 制約、DynamoDB ならConditionExpression(後述)、KurrentDB ならexpectedRevisionパラメータがそれに当たる。
なぜロックではなく楽観的制御なのか
ロック(悲観的):
書き込む前に Stream をロックする
❌ Stream が長い場合、待ち時間が大きい
❌ デッドロックのリスク
❌ 分散環境で実装が難しい
楽観的並行性制御:
書き込み時にバージョン不一致なら失敗させる
✅ ロックがないので待機不要、デッドロックなし
✅ 分散環境でも追記の原子性さえ保てれば動く
✅ 失敗時は「読み直して再試行」で済む
集約の境界が業務上「同時並行更新が滅多に起きない」ように設計されていれば、楽観的制御の競合発生率は低く抑えられる。これが「集約は小さく保つ」設計原則の根拠の1つだ。
Append-only という制約
Event Store では UPDATE / DELETE が原則として存在しない。
許される操作:
- append (末尾に追記)
- read (読み取り)
許されない操作:
- update (イベントの内容を書き換える)
- delete (個別イベントを消す)
これは「不便な制約」ではなく、Event Sourcing の整合性保証の基盤 だ。書き込みが追記のみなら、Stream は不変な過去のスナップショットになる。複数のプロジェクションが同じ Stream を読んでも、必ず同じ結果に至る。
ただし、現実には「個人情報の削除権(GDPR)」のような要求から、限定的な削除メカニズムが必要になる。これは ch06 で Cryptographic Erasure(暗号鍵削除) として扱う。
要素4:Projection ── 派生する読み取りモデル
定義
Projection とは、「イベントストリームを入力として、読み取り側のニーズに合わせたビューを構築する仕組み」 だ。
Event Store Projection(プロセス) Read Model
───────────────── ─────────────────── ──────────────────
v1: ScreeningProcessStarted ──→ on(...) ──→ INSERT screening_views
v2: DocumentScreeningPassed ──→ on(...) ──→ UPDATE stage='document_passed'
v3: FirstInterviewScheduled ──→ on(...) ──→ UPDATE stage='first_interview_scheduled'
...
Projection はイベントを購読し、各イベントに対する処理を定義する。結果を蓄積する Read Model は、業務 DB とは独立した、読み取り用に最適化されたデータストア だ。
TypeScript での実装例
// 採用ダッシュボード用の Projection
class ScreeningDashboardProjection {
constructor(private readonly readDb: ReadDatabase) {}
async on(event: DomainEvent): Promise<void> {
// discriminated union として絞り込み(型ガードで安全に各フィールドへアクセス)
if (isScreeningProcessStarted(event)) {
await this.readDb.execute(
`INSERT INTO screening_views
(process_id, candidate_id, job_id, stage, started_at)
VALUES (?, ?, ?, 'document', ?)`,
[event.aggregateId, event.candidateId, event.jobId, event.occurredAt]
)
} else if (isFirstInterviewScheduled(event)) {
await this.readDb.execute(
`UPDATE screening_views
SET stage = 'first_interview',
scheduled_at = ?,
last_updated_at = ?
WHERE process_id = ?`,
[event.scheduledFor, event.occurredAt, event.aggregateId]
)
}
// ... 他のイベント型
}
}
// 型ガード(実装は eventType の比較)
const isScreeningProcessStarted = (e: DomainEvent): e is ScreeningProcessStarted =>
e.eventType === 'ScreeningProcessStarted'
const isFirstInterviewScheduled = (e: DomainEvent): e is FirstInterviewScheduled =>
e.eventType === 'FirstInterviewScheduled'
Projection の3つの重要特性
特性1:冪等であるべき
同じイベントを2回処理しても、結果が変わらないことを保証する。
// 悪い例:冪等でない
await readDb.execute(
`UPDATE screening_views SET interview_count = interview_count + 1 WHERE ...`
)
// 良い例:イベントID で重複チェックする、または絶対値で更新する
await readDb.execute(
`INSERT INTO processed_events (event_id) VALUES (?)
ON CONFLICT DO NOTHING`,
[event.eventId]
)
// 影響行数が0なら、すでに処理済み → スキップ
なぜ冪等性が重要か。Event Store のサブスクリプションは at-least-once delivery(少なくとも1回配送)が一般的で、ネットワーク障害時にイベントが2回届くことがある。冪等でないと Read Model が壊れる。
特性2:再構築可能であるべき
Read Model は いつでもイベントストリームから再構築できる ように作る。
Read Model の運用パターン:
1. 通常時:新しいイベントを購読して逐次更新
2. スキーマ変更時:Read Model を全削除し、ストリームを最初から再生
3. バグ修正時:影響範囲のイベント型を再処理して上書き
「Read Model が壊れたらストリームから再生し直せる」ことは、Event Sourcing の保険として強力だ。逆に言うと、Read Model に書いてしまうと、Event Store からは再生できないデータ を入れてはいけない(外部からの読み取り専用 API データなど)。
特性3:結果整合性であることを前提に設計する
Projection は通常、書き込みと非同期に動く。書き込み直後にクエリしても、Read Model にはまだ反映されていない可能性がある。
書き込み時系列:
T+0ms : コマンド実行、イベント追記成功
T+1ms : クライアントに「成功」レスポンス
T+5ms : Projection がイベントを処理
T+10ms : Read Model 更新完了
→ T+1ms の時点で読み取るとまだ古い状態
UI 側では 「書き込んだ直後にその結果が読めない可能性がある」 ことを前提にする必要がある。これは Event Sourcing 特有の制約ではなく、結果整合性(eventual consistency)を採用する全システム共通 の制約だ。
対処法は ch06 で議論する(Outbox パターン、expectedVersion の利用、UI 側での楽観的更新など)。
要素5:Snapshot ── 再生コストを下げる最適化
なぜ必要か
理論的には、集約の状態は毎回 Stream の最初から再生して復元できる。だが現実には、Stream が長くなると再生コストが無視できなくなる。
イベント数 vs 復元時間(典型的な値):
100 events → 5ms
1,000 events → 50ms
10,000 events → 500ms ← この辺から無視できなくなる
選考プロセスは数十イベントで完了するので Snapshot 不要だが、「永続化される顧客口座」 や 「数年運用される契約管理」 のような長寿命の集約では、数千〜数万イベントになることがある。
Snapshot の仕組み
graph LR
subgraph nosnap["Snapshot なし:O(N)"]
E1[v1] --> E2[v2] --> E3[...] --> EN[v5237]
EN --> S1["集約状態<br/>復元完了"]
E1 -.全件再生.-> S1
end
subgraph withsnap["Snapshot あり:O(K)"]
SN[("📸 Snapshot<br/>v5000")] --> SE1[v5001]
SE1 --> SE2[v5002] --> SE3[...] --> SEN[v5237]
SEN --> S2["集約状態<br/>復元完了"]
SN -.差分のみ再生.-> S2
end
style nosnap fill:#ffe8e8,stroke:#cc6666
style withsnap fill:#e8f5e9,stroke:#43a047
style SN fill:#fff3e0,stroke:#fb8c00
実装例
class ScreeningProcessRepository {
async load(processId: string): Promise<ScreeningProcess> {
// 1. 最新の Snapshot をロード(あれば)
const snapshot = await this.snapshotStore.loadLatest(processId)
// 2. Snapshot から状態を復元(または空の状態から始める)
const process = snapshot
? ScreeningProcess.fromSnapshot(snapshot)
: ScreeningProcess.empty(processId)
// 3. Snapshot 以降のイベントを再生
const events = await this.eventStore.readStream(
`screening-process-${processId}`,
{ fromVersion: snapshot ? snapshot.version + 1 : 1 }
)
for (const event of events) {
process.apply(event)
}
return process
}
async save(process: ScreeningProcess): Promise<void> {
const events = process.getUncommittedEvents()
await this.eventStore.append(
`screening-process-${process.id}`,
process.expectedVersion,
events,
)
process.clearUncommittedEvents()
// Snapshot 戦略:N イベントごとにスナップショットを取る
if (process.version % 100 === 0) {
await this.snapshotStore.save(process.toSnapshot())
}
}
}
Snapshot 戦略の選び方
戦略A:N イベントごとに取る
単純で実装しやすい
大半の用途でこれで十分
戦略B:時間ベースで取る(24時間ごと等)
書き込み頻度が低い集約に向く
コールドスタート時の再生コストを上限化できる
戦略C:特定イベント発生時に取る
ビジネス上の節目(年度締め等)で取る
業務上の意味を持つ Snapshot になる
採用管理システムでは、選考完了時(内定確定 or 不採用確定)に最終 Snapshot を取る 戦略が自然だ。完了後の Stream は読まれる頻度が低く、最終状態が分かれば十分なケースが多い。
重要:Snapshot は「最適化」であって「真実」ではない
これが Snapshot の最も重要な性質だ。
graph TB
ES[("Event Store<br/>━━━━━━━<br/>🏛 真実<br/>(Single Source of Truth)")]
SS[("Snapshot Store<br/>━━━━━━━<br/>⚡ 最適化<br/>(派生物・捨てて再生成可)")]
RM[("Read Model<br/>━━━━━━━<br/>👁 派生<br/>(派生物・捨てて再生成可)")]
ES -->|派生| SS
ES -->|投影| RM
SS -. スキーマ変更時は捨てる .-> X1["✅ 再生成すればよい"]
RM -. スキーマ変更時は捨てる .-> X2["✅ 再構築すればよい"]
ES -. 失うと .-> X3["💀 致命的"]
style ES fill:#fff3e0,stroke:#e65100,stroke-width:3px
style SS fill:#f3e5f5
style RM fill:#e8f5e9
style X3 fill:#ffcdd2
Snapshot のスキーマが変わったら? 全 Snapshot を捨ててストリームから再生すればいい。バグで Snapshot が壊れたら? 同じく捨てて再生すればいい。真実は Event Store にしかない という原則を曲げてはいけない。
5要素の責務マトリクス
整理として、5要素の責務を表にしておく。
| 要素 | 真実か派生か | 主な責務 | 失敗時の影響 |
|---|---|---|---|
| Event | 真実 | 起きた事実の不変な記録 | 誤ったイベントが書かれた場合は 補償イベント(複式簿記の逆仕訳と同じ発想)で打ち消す |
| Stream | 真実の構造 | 集約単位のイベント順序 | 順序が壊れると整合性破綻 |
| Event Store | 真実の永続化 | Append-only / 楽観的並行性 | 致命的(バックアップ必須) |
| Projection | 派生 | 読み取り用ビューの構築 | 再構築で復旧可能 |
| Snapshot | 派生 | 復元の高速化 | 捨てて再生成可能 |
「真実は Event Store だけで、それ以外は派生」という構造が見える。これは Event Sourcing の アーキテクチャ的な重心 だ。
まとめ:5要素の最小実装
最小限の Event Sourcing は、これだけで動く。
// 1. Event:不変の事実
interface DomainEvent { /* eventType, aggregateId, version, occurredAt, ... */ }
// 2. Stream:集約 ID 単位のイベント列
type StreamId = string // "screening-process-sp-001"
// 3. Event Store:追記専用+楽観的並行性制御
interface EventStore {
append(streamId: StreamId, expectedVersion: number, events: DomainEvent[]): Promise<void>
readStream(streamId: StreamId, options?: { fromVersion?: number }): Promise<DomainEvent[]>
subscribe(handler: (event: DomainEvent) => Promise<void>): Subscription
}
// 4. Projection:イベントから Read Model を派生
interface Projection {
on(event: DomainEvent): Promise<void>
}
// 5. Snapshot:再生コスト最適化(任意)
interface SnapshotStore {
save(snapshot: Snapshot): Promise<void>
loadLatest(aggregateId: string): Promise<Snapshot | null>
}
実装の細部はツールによって違うが、責務の分担はどのツールでも変わらない。次章では、これらの責務を実際に提供する デファクトツール/フレームワーク を比較する。「KurrentDB と Axon Framework と Marten のどれを選ぶべきか」── その判断軸を整理する。