Streaming と Tool Use を組み込む ── チャット UI の基礎を作る
ch06 のままでは「チャット」になっていない
前章で helpdesk-ai の最小版は動きました。ConverseCommand に質問を放り込み、戻ってきたテキストを console.log する。確かに会話は成立しています。が、これをそのまま Slack 風の UI に貼り付けると、3 秒間まったく無音のあと、突然全文がドンと表示される ── という、いかにも「裏で何かが詰まっている」見た目になります。
しかも helpdesk-ai には、もう 1 つやらないといけないことがあります。社員の質問は FAQ で答えられるものばかりではありません。
「私の有給、残り何日ある?」
この瞬間、AI は モデルの中の知識ではなく、社内の人事 API に答えを取りに行かないといけません。Foundation Model はその社員の有給残数を知らない、当たり前です。LLM が外の世界と話す道具、それが Tool Use(Function Calling) です。
この章では、ch06 の最小版に次の 2 つを追加します。
- Streaming:
ConverseStreamCommandでトークンを逐次受け取り、UI で typing 表示にする - Tool Use:
get_holiday_balance(employee_id)を関数として宣言し、モデルから呼ばせる
読み終わると、helpdesk-ai は「FAQ も社内 API も組み合わせて答えるチャットボット」の体裁になります。同時に、ch08 以降で重ねていく RAG・Agents・AgentCore のための足場 ができます。
| 項目 | 内容 |
|---|---|
| 対象読者 | ch06 を読み終え、TypeScript / AWS SDK v3 で Bedrock を 1 度叩いた人 |
| 難易度 | ★★★☆☆(中級) |
| 読了時間 | 約 30 分 |
| 対象バージョン | @aws-sdk/client-bedrock-runtime v3 系 / Node.js 22+ / TypeScript 5+ |
それでは ch06 の bedrock クライアントをそのまま使い回す前提で、最初に Streaming から入りましょう。
なぜ Streaming が必要か ── TimeToFirstToken の話
ch06 で使った ConverseCommand は 同期型です。await client.send(...) が返ってきた瞬間、レスポンス本文は 完成している。これはバッチ処理やバックエンド間連携には向いていますが、チャット UI には致命的に向きません。
理由は単純で、人間は「すぐ何かが返ってくる」ことを期待するからです。3 秒間まったく文字が出ない画面と、0.3 秒で 1 文字目が出てそのあとぱらぱらと続く画面では、体感速度がまったく違う。ここで効くのが TimeToFirstToken(TTFT)、つまり「リクエスト送信から最初の 1 トークンが返ってくるまでの時間」という指標です。Streaming は 総生成時間を短縮するわけではないけれど、TTFT を短くすることで「待たされてる感」を消します。この観点は ch13 の観測章で再登場します。
Bedrock では Streaming 用に ConverseStreamCommand が用意されています。同期版の ConverseCommand と「ほぼ同じパラメータ」を受け取り、戻り値だけが イベントの async iterable になっています。これを for await で回しながら、来たそばから UI に書き出す。それだけです。
sequenceDiagram
participant U as ユーザー
participant L as Lambda / Next.js Route
participant B as Bedrock Runtime
participant M as Foundation Model
U->>L: 「経費精算の上限額は?」
L->>B: ConverseStreamCommand
B->>M: 推論開始
M-->>B: messageStart
B-->>L: messageStart
M-->>B: contentBlockDelta("経")
B-->>L: contentBlockDelta("経")
L-->>U: SSE chunk("経")
M-->>B: contentBlockDelta("費")
B-->>L: contentBlockDelta("費")
L-->>U: SSE chunk("費")
Note over M,L: ...デルタが続く...
M-->>B: contentBlockStop / messageStop
B-->>L: metadata(usage / metrics)
L-->>U: SSE done
ConverseStream のイベント一覧
for await で受け取れるイベントは、ざっくり 5 種類です。
| イベント | いつ来るか | 何が入っているか |
|---|---|---|
messageStart | アシスタントメッセージの開始 | role(基本 "assistant") |
contentBlockStart | 1 つの content block の開始 | block index、Tool Use のときは toolUse.toolUseId 等 |
contentBlockDelta | テキストや Tool 引数の追記 | delta.text または delta.toolUse.input(JSON 文字列の断片) |
contentBlockStop | content block の終了 | block index |
messageStop | メッセージ全体の終了 | stopReason("end_turn" / "tool_use" / "max_tokens" 等) |
metadata | 最後の最後 | usage(input/output tokens)と metrics(latency) |
ここで覚えておきたいのは、テキストも Tool 引数も同じ contentBlockDelta イベントで流れてくる点です。テキストなら delta.text が、Tool Use なら delta.toolUse.input に JSON の 文字列断片({"emplo" → "yee_id": → "E001"} のように刻まれて来る)が入ります。後でつなげるのは受け側の責任です。
Streaming の最小実装
src/handlers/stream.ts に新しいハンドラを置いてみましょう。ch06 の src/client.ts の bedrock / MODEL_ID をそのまま使います。
// src/handlers/stream.ts
import {
ConverseStreamCommand,
type Message,
} from "@aws-sdk/client-bedrock-runtime";
import { bedrock, MODEL_ID } from "../client.js";
/**
* 1 回のターンを streaming で実行し、テキストを少しずつ yield する。
* UI 側は for await でこの async generator を回せばよい。
*/
export async function* streamAnswer(
messages: Message[],
): AsyncGenerator<string, void, void> {
const res = await bedrock.send(
new ConverseStreamCommand({
modelId: MODEL_ID,
messages,
system: [
{ text: "You are helpdesk-ai, an internal IT/HR helpdesk assistant." },
],
// Streaming でも max_tokens は必ず明示する(ch06 と同じ理由)
inferenceConfig: { maxTokens: 1024, temperature: 0.3, topP: 0.9 },
}),
);
for await (const event of res.stream ?? []) {
// テキストの逐次出力
if (event.contentBlockDelta?.delta?.text) {
yield event.contentBlockDelta.delta.text;
}
// 最終メタデータはログに残す(ch13 観測章で再利用する形)
if (event.metadata) {
console.error("[usage]", event.metadata.usage);
console.error("[metrics]", event.metadata.metrics);
}
}
}
呼び出し側は、たとえば Next.js の Route Handler から SSE で流すならこうです。
// app/api/chat/route.ts(抜粋)
import { streamAnswer } from "@/src/handlers/stream.js";
export async function POST(req: Request) {
const { question } = await req.json();
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const enc = new TextEncoder();
try {
for await (const chunk of streamAnswer([
{ role: "user", content: [{ text: question }] },
])) {
controller.enqueue(enc.encode(`data: ${JSON.stringify({ chunk })}\n\n`));
}
controller.enqueue(enc.encode(`data: [DONE]\n\n`));
} catch (e) {
controller.enqueue(
enc.encode(`data: ${JSON.stringify({ error: String(e) })}\n\n`),
);
} finally {
controller.close();
}
},
});
return new Response(stream, {
headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache" },
});
}
ここで効いてくるのが タイムアウト設計です。同期版 Converse であれば、SDK デフォルトの requestTimeout でだいたい困りません。一方 Streaming は「最初の 1 トークン」と「最後のトークン」の間が長く空くこともあるため、SDK のデフォルトに任せきりにすると、長いレスポンス生成中に切断されるという厄介な事故が起きます。
// ❌ 悪い例:streaming なのに timeout を意識していない
const bedrock = new BedrockRuntimeClient({ region });
// ✅ 良い例:HTTP handler に長めの socketTimeout / requestTimeout を渡す
import { NodeHttpHandler } from "@smithy/node-http-handler";
export const bedrock = new BedrockRuntimeClient({
region,
maxAttempts: 5,
retryMode: "adaptive",
requestHandler: new NodeHttpHandler({
// 接続自体は数秒で諦めてリトライ
connectionTimeout: 3_000,
// 一方、socket は streaming のために長めに取る(例:3 分)
socketTimeout: 180_000,
}),
});
Lambda 側でも同じです。同期版のつもりで timeout: 30s のままにしておくと、長文応答の Streaming が途中で切れます。Streaming を入れる関数は最低 1 分以上、できれば 3 分は欲しい。
Tool Use(Function Calling)の設計
Streaming で「typing 表示」は手に入りました。次は モデルから外部関数を呼ばせる 仕掛けです。
helpdesk-ai では get_holiday_balance(employee_id) を最初のツールとして定義します。「先月のリモートワーク勤怠を集計して」のような重い処理は ch09 の Bedrock Agents まで取っておきます。今回は 「関数 1 個・1 ターンで決着する」最小ケースで Tool Use の流れを掴むのが目的です。
ツールを宣言する
ツール宣言は JSON Schema で書きます。Anthropic Claude も Nova も Llama も、Bedrock では同じ toolSpec でラップされるため、モデルを差し替えてもこの宣言は使い回せます。
// src/tools/hr.ts
import type { Tool, ToolConfiguration } from "@aws-sdk/client-bedrock-runtime";
export const getHolidayBalance: Tool = {
toolSpec: {
name: "get_holiday_balance",
description:
"指定した社員 ID の有給休暇残数(日数)を取得する。質問者が「自分の」と言った場合は、呼び出し側が解決した employee_id を渡すこと。",
inputSchema: {
json: {
type: "object",
properties: {
employee_id: {
type: "string",
description: "社員 ID(例:E001)。形式は /^E\\d{3,}$/",
},
},
required: ["employee_id"],
},
},
},
};
export const helpdeskToolConfig: ToolConfiguration = {
tools: [getHolidayBalance],
// ツールを使うかどうかはモデルに任せる(後述)
toolChoice: { auto: {} },
};
// 実際の社内 API 呼び出し。ここでは fetch のラッパとして示す
export async function executeHrTool(
name: string,
input: Record<string, unknown>,
): Promise<unknown> {
if (name === "get_holiday_balance") {
const { employee_id } = input as { employee_id: string };
if (!/^E\d{3,}$/.test(employee_id)) {
throw new Error(`invalid employee_id: ${employee_id}`);
}
const res = await fetch(
`${process.env.HR_API_BASE}/employees/${employee_id}/holidays`,
{ headers: { Authorization: `Bearer ${process.env.HR_API_TOKEN}` } },
);
if (!res.ok) throw new Error(`HR API ${res.status}`);
return (await res.json()) as { remaining_days: number; fiscal_year: number };
}
throw new Error(`unknown tool: ${name}`);
}
description は モデルが「このツールをいつ呼ぶか」を判断する唯一の手がかりです。雑に書くと余計なときに呼ばれ、書きすぎると呼ばれるべき場面でも呼ばれない。helpdesk-ai で言うなら、「有給」「休暇」「残日数」「いつまで」など、社員が使う言葉を意識して書き、employee_id の 由来(呼び出し側が解決する)まで明示するのがコツです。
Tool Use のループ
Bedrock の Tool Use は「1 リクエストで 1 つ進む」設計です。Converse を呼ぶ → 戻り値の stopReason が "tool_use" ならツール実行 → 結果を toolResult ブロックとして次の messages に積んで再度 Converse を呼ぶ → 最終的に stopReason が "end_turn" になるまで繰り返す。
flowchart TD
A([user query]) --> B[Converse / ConverseStream]
B --> C{stopReason}
C -->|"end_turn"| Z([final answer])
C -->|"tool_use"| D[ツールを実行]
D --> E[toolResult を messages に積む]
E --> F{iteration < max?}
F -->|yes| B
F -->|no| G([フォールバック応答])
ここで 最大再帰回数(maxIterations) を必ず置きます。tool_use の途中で description が曖昧だと、モデルが同じツールを何度も呼ぼうとして無限ループに近い挙動になることがあります。AWS の SDK もモデルも、暴走を止める仕組みは入れてくれないので、呼び出し側が止める のがルールです。
// ❌ 悪い例:再帰回数の上限を置いていない
while (true) {
const res = await bedrock.send(new ConverseCommand({ ... }));
if (res.stopReason !== "tool_use") return res;
// ...ツール実行して messages に積んで while を回す...
}
// ✅ 良い例:maxIterations を明示し、超えたらフォールバック
const MAX_ITERATIONS = 5;
for (let i = 0; i < MAX_ITERATIONS; i++) {
const res = await bedrock.send(new ConverseCommand({ ... }));
if (res.stopReason !== "tool_use") return res;
// ...ツール実行して messages に積む...
}
throw new ToolLoopExceeded("max iterations reached"); // 上位で人間向けメッセージへ
5 回という数字は経験則です。helpdesk-ai のように 1 ターンで 1〜2 個のツールしか叩かない想定なら 5 で十分すぎる余裕。Bedrock Agents のように複数ツールをチェーンする世界に入ると、もう少し緩める必要が出てきます(その話は ch09 で)。
Streaming と Tool Use を組み合わせる
実用版では「Streaming の中で Tool Use を捌く」形になります。ポイントは次の 3 つです。
- Streaming 中も
stopReasonは最後のmessageStopで確定する。途中でツール呼び出しを実行してはいけない(モデル側ではまだ続きを生成する気でいる可能性がある) - **ツール実行中は streaming を「一時停止」**して、結果が返ってから次の
ConverseStreamCommandを投げる - ツール引数は
contentBlockDelta.delta.toolUse.inputに JSON 文字列の断片で来るので、contentBlockStopの時点でつなげてJSON.parseする
// src/handlers/stream.ts に追記
import {
ConverseStreamCommand,
type Message,
type ContentBlock,
} from "@aws-sdk/client-bedrock-runtime";
import { bedrock, MODEL_ID } from "../client.js";
import { helpdeskToolConfig, executeHrTool } from "../tools/hr.js";
const MAX_ITERATIONS = 5;
export async function* streamHelpdesk(
initialMessages: Message[],
): AsyncGenerator<string, void, void> {
const messages: Message[] = [...initialMessages];
for (let iter = 0; iter < MAX_ITERATIONS; iter++) {
// 1 ターン分のアシスタント応答をここに組み立てる
const assistantBlocks: ContentBlock[] = [];
// 現在組み立て中の Tool Use 情報(index → 状態)
const pendingTools = new Map<
number,
{ toolUseId: string; name: string; inputJson: string }
>();
let stopReason: string | undefined;
const res = await bedrock.send(
new ConverseStreamCommand({
modelId: MODEL_ID,
messages,
system: [
{ text: "You are helpdesk-ai. Call tools when you need fresh data." },
],
toolConfig: helpdeskToolConfig,
inferenceConfig: { maxTokens: 1024, temperature: 0.3, topP: 0.9 },
}),
);
for await (const ev of res.stream ?? []) {
// ツール呼び出しの開始
if (ev.contentBlockStart?.start?.toolUse) {
const idx = ev.contentBlockStart.contentBlockIndex ?? 0;
const tu = ev.contentBlockStart.start.toolUse;
pendingTools.set(idx, {
toolUseId: tu.toolUseId!,
name: tu.name!,
inputJson: "",
});
}
// テキスト or Tool 引数のデルタ
if (ev.contentBlockDelta) {
const idx = ev.contentBlockDelta.contentBlockIndex ?? 0;
const delta = ev.contentBlockDelta.delta;
if (delta?.text) {
// TTFT を意識:UI には即座に流す。バッファしない
yield delta.text;
// 後でアシスタントメッセージを組み立てるための保存
const last = assistantBlocks.at(-1);
if (last && "text" in last) last.text += delta.text;
else assistantBlocks.push({ text: delta.text });
}
if (delta?.toolUse?.input) {
// input は JSON 文字列の断片で届く。連結のみして parse は後段で
const slot = pendingTools.get(idx);
if (slot) slot.inputJson += delta.toolUse.input;
}
}
// 1 つの block が閉じたタイミングで Tool Use を確定
if (ev.contentBlockStop) {
const idx = ev.contentBlockStop.contentBlockIndex ?? 0;
const slot = pendingTools.get(idx);
if (slot) {
assistantBlocks.push({
toolUse: {
toolUseId: slot.toolUseId,
name: slot.name,
input: slot.inputJson ? JSON.parse(slot.inputJson) : {},
},
});
pendingTools.delete(idx);
}
}
if (ev.messageStop) stopReason = ev.messageStop.stopReason;
}
// ターンの組み立てが終わった。アシスタントメッセージを履歴に積む
messages.push({ role: "assistant", content: assistantBlocks });
if (stopReason !== "tool_use") return; // 終了
// Tool Use を 1 ターン分まとめて実行し、結果を user ロールで返す
const toolResults: ContentBlock[] = [];
for (const block of assistantBlocks) {
if (!("toolUse" in block) || !block.toolUse) continue;
try {
const out = await executeHrTool(
block.toolUse.name!,
block.toolUse.input as Record<string, unknown>,
);
toolResults.push({
toolResult: {
toolUseId: block.toolUse.toolUseId!,
content: [{ json: out as Record<string, unknown> }],
status: "success",
},
});
} catch (e) {
// ツール実行失敗は status: "error" でモデルに伝える(次ターンで自己リカバリさせる)
toolResults.push({
toolResult: {
toolUseId: block.toolUse.toolUseId!,
content: [{ text: `tool failed: ${(e as Error).message}` }],
status: "error",
},
});
}
}
messages.push({ role: "user", content: toolResults });
// 次の iteration で再び Converse を投げる
}
// ここに来たということは MAX_ITERATIONS を超えた
yield "\n\n(複数回のツール呼び出しが必要でした。条件を絞って再質問してください)";
}
実装のポイントを 3 つ。
第一に、テキスト用と Tool Use 用で同じ assistantBlocks 配列を使うこと。Claude が最終応答の中で「先に簡単な前置きテキスト → 続いて Tool Use」を 1 つのメッセージとして返してくることがあり、それを messages に積み直すときは順序を保ったまま渡さないと整合性が崩れます。
第二に、ツール実行が失敗したときに status: "error" を立てて投げ返すこと。Tool Use の世界では失敗もモデルに伝えてやると、「別の方法で答える」「ユーザーに条件を再確認する」といった自己リカバリをしてくれます。try の中で例外を握り潰して空応答を返すと、モデルは沈黙の理由が分からず、ループの次のターンで同じツールを同じ引数でまた呼ぶ ── という最悪の流れになります。
第三に、messageStop の stopReason 確認は必ず for-await ループの中で受けること。Streaming の場合、res.stopReason のような外側のフィールドはありません。messageStop イベントだけが正解を持っています。
toolChoice ── ツール選択の制御
toolConfig には toolChoice という地味だが重要なパラメータがあります。
| 値 | 意味 | 使いどころ |
|---|---|---|
{ auto: {} } | 使うかどうかモデルに任せる | デフォルト。チャット用途はこれ |
{ any: {} } | 必ずどれか 1 つは使う | 「必ず関数経由で答えさせたい」場面 |
{ tool: { name: "get_holiday_balance" } } | 特定ツールを強制 | テスト時、または「この質問はこの関数で確定」な前段ルーティング |
helpdesk-ai の通常会話では auto で十分です。一方、/holidays のような社内 Slack コマンドから入ってきた質問なら tool で強制してしまうのも手です。「強制すべきか LLM 判断に任せるか」は UX 設計の話 で、ここを雑にすると「FAQ で答えられるのに毎回 API を叩いて遅い/高い」または「API に行くべきなのに勝手に憶測で答える」のどちらかに振れます。
エラーハンドリング ── Streaming と Tool Use の合わせ技
ch06 で扱った ThrottlingException / ModelTimeoutException 系のエラーは、Streaming でも同じく出ます。ただし どこで出るか が違います。
client.send(new ConverseStreamCommand(...))自体は 接続確立で投げる例外しか返さない- 本体のエラー(throttling 等)は
for awaitの中で例外として浮上してくる
つまり try/catch を for await を含めた範囲にかける必要があります。client.send だけ try で囲んでも 99% のエラーは捕まりません。
try {
for await (const ev of res.stream ?? []) {
// ...
}
} catch (e) {
// ストリーム中断時の処理
// - 既に送った chunk は UI に出ているので、追記で「(中断されました)」を流す
// - Tool Use が走っている途中なら、その toolResult は捨てる(次回再生成)
yield "\n\n(応答中にエラーが発生しました。もう一度お試しください)";
console.error("stream aborted", e);
}
ストリーム中断は チャット UI の体験を一番壊しやすい場所です。中断時に「無音で終わる」とユーザーは何が起きたか分からない。少なくとも 1 行は何か返す、というのを徹底するだけで体感品質が上がります。
次章への接続
ここまでで helpdesk-ai は次のことができるようになりました。
ConverseStreamでトークンを 1 つずつ画面に流す(TTFT を意識した typing 表示)get_holiday_balanceを Tool Use 経由でモデルに呼ばせ、結果を取り込んで最終応答に変換する- 再帰回数を
MAX_ITERATIONS = 5で抑え、暴走を止める - ツール失敗を
status: "error"で正直に伝え、モデルに自己リカバリさせる
ただ、これでは 「経費精算の上限は?」「在宅勤務手当の申請方法は?」のような社内文書の問い合わせ にはまだ答えられません。社内規程の PDF をどう Tool Use に乗せる ── と考え始めた瞬間に、「ツールにすべきは検索か、それとも検索 + 生成か」「JSON で返すか、引用を埋め込んだ自然文で返すか」という問いに突き当たります。これは Tool Use の延長線では設計しきれません。
LLM が「自然言語で書かれた社内文書を検索して、引用付きで答える」を構造的に解いた答えが RAG(Retrieval-Augmented Generation) であり、AWS のマネージド版が Knowledge Bases for Bedrock です。次章ではここに進みます。get_holiday_balance は人事 API という構造化データ向きの道具でした。社内規程の自然言語検索には別の道具が要る、というのが ch08 の入り口です。
そしてもう 1 段先まで予告しておくと、Tool Use を「1 ターン 1 関数」の枠から外して、「モデルが自分で計画を立てて複数のツールを連鎖」させる世界に踏み込むのが ch09 の Bedrock Agents です。今章の MAX_ITERATIONS = 5 は、Agents の世界では 30 や 50 になります。Tool Use はその上位概念(Agents)の最小構成要素だ、という見方で次章以降を読んでもらえると、設計の見通しが効きます。
最後に、本章で console.error に流した usage と metrics、特に TimeToFirstToken ── これは ch13 の観測章で CloudWatch メトリクスとして実装し直します。Streaming は「速く見せる」最強の武器ですが、速く見せたことを計測しなければ Production には出せない。これも伏線として置いておきます。
章末まとめ
- チャット UI には
ConverseStreamCommandが必須。TimeToFirstToken を縮めるために最初の chunk は即座に UI へ流す- Streaming のイベントは
messageStart/contentBlockStart/contentBlockDelta/contentBlockStop/messageStop/metadataの 6 種類。stopReasonはmessageStopで確定する- Tool Use は
stopReason === "tool_use"の間ループ。MAX_ITERATIONSを必ず設定し、超えたらフォールバック応答に逃がすtoolChoiceはauto/any/ 特定ツール強制 の 3 つ。チャット会話はauto、Slack コマンド経由のような確定パスはtool強制が向く- ツール実行失敗は
status: "error"でモデルに返すと自己リカバリしてくれる。握り潰すと最悪のループに陥る- Streaming のエラーは
for awaitの中で例外として浮上するため、try/catch はfor awaitを含む範囲にかける- 次章では社内規程の自然言語検索という「Tool Use の延長では解けない問題」を、Knowledge Bases for Bedrock で解く