コントロールとデータを分ける —— TypedStreamSession で型付け多重化する
前章の弱点 —— accept した stream の「中身を見るまで何か分からない」問題
第 4 章で muxado を導入し、第 5 章で Host ヘッダーを使った HTTP ルーティングを足した。ここまでで、mintunnel は「1 本の TLS 上に複数 stream を流せる、ホスト名で振り分けられる、x-forwarded-* を付ける」という、本物の ngrok の外枠に近いものになっている。
だが、コードをよく見ると 1 箇所だけ筋の悪いところが残っている。第 5 章の agent 側の処理を思い出してほしい:
// ch05 時点の agent ループ(簡略)
for {
stream, err := sess.Accept() // ←ここで受け取った stream は「何用」?
if err != nil {
return err
}
go a.handleStream(stream) // とりあえずローカルにプロキシ
}
sess.Accept() で得られた stream が 何のために開かれたのか、コードからは分からない。
- これは「agent の認証要求への応答」か?
- それとも「edge から agent への新規 HTTP リクエスト通知」か?
- あるいは「実際の HTTP リクエスト本体を流すプロキシストリーム」か?
ch05 までは全部「HTTP リクエスト本体」として扱ってきたので問題なく動いていた。だが現実の ngrok は 認証・メタデータ通知・実データ転送 を同じ stream に混ぜたりはしない。混ぜると、stream の冒頭で「これは何の stream か」を表すマジックバイトを読まないと処理が分岐できず、コードが汚れる。
こんな疑問をこの章で自力で解決できるようになる。
- muxado の
TypedStreamSessionは何のためにあるのか?- 「コントロールプレーン」と「データプレーン」を分けると、コードはどう変わるのか?
- ngrok agent のハートビートは、muxado のどの API でできているのか?
この章を読み終えると、stream に 型 ID(type identifier) を付けて開けるようになる。Accept() した瞬間に「これは制御 stream」「これはプロキシ stream」と判別でき、switch 1 つで処理を分岐できる構造になる。これで自作 ngrok は、本物の ngrok と同じ「型付き多重化プロトコル」の系統に到達する。
TypedStreamSession という同梱ラッパー
muxado の公式 doc.go には、興味深い記述がある(第 4 章の冒頭で引いた doc.go の続きの部分だ)。
muxado ships with two wrappers that add commonly used functionality. The first is a TypedStreamSession which allows a client application to open streams with a type identifier so that the remote peer can identify the protocol that will be communicated on that stream. The second wrapper is a simple Heartbeat which issues a callback to the application informing it of round-trip latency and heartbeat failure.
要点は 2 つ。
TypedStreamSession── stream に「type identifier」を付けて開ける。受け取った側は中身を読む前に「この stream は何用か」が分かる。Heartbeat── 一定間隔で ping を打ち、RTT(往復遅延)と障害を検出する。ngrok agent のドキュメントに登場する「ハートビート」と同じ系統。
muxado の本体(フレーム 4 種だけのストイックなプロトコル)は変えず、上位レイヤのラッパーとして被せる構造になっている。muxado を作った Alan Shreve の設計判断が透けて見える:「コアは小さく、よく使う機能はラッパーで」。
pkg.go.dev で API を確認する
golang.ngrok.com/muxado/v2 の API を見ると、こうなっている(pkg.go.dev v2.0.1 ベース)。
// 型 ID は uint32
type StreamType uint32
// TypedStreamSession は Session を被せたインターフェース
type TypedStreamSession interface {
Session
OpenTypedStream(stype StreamType) (Stream, error)
AcceptTypedStream() (TypedStream, error)
}
// AcceptTypedStream の戻り値は型 ID を引ける
type TypedStream interface {
Stream
StreamType() StreamType
}
// ラップする関数
func NewTypedStreamSession(s Session) TypedStreamSession
muxado.Session で生のセッションを作り、それを NewTypedStreamSession でラップする。これだけで、stream を開く側は OpenTypedStream(0x01) のように型を指定でき、受け取る側は AcceptTypedStream() の戻り値から StreamType() でその型を取り出せる。
コントロールプレーンとデータプレーンの分離
自作 mintunnel で使う型 ID を 2 種類だけ定義する。spec.md §3.1 の定数だ。
// internal/proto/proto.go
package proto
import "github.com/golang.ngrok.com/muxado/v2"
// StreamType は muxado.TypedStreamSession で使うストリーム種別
type StreamType = muxado.StreamType
const (
StreamTypeControl StreamType = 0x01 // 制御チャネル:認証・公開URL払い出し・新規リクエスト通知
StreamTypeProxy StreamType = 0x02 // データチャネル:HTTPリクエスト本体
)
StreamTypeControl(0x01)と StreamTypeProxy(0x02)の役割は、図にするとこうなる:
graph LR
subgraph Agent
A[agent プロセス]
end
subgraph Edge[Edge Server]
E[server プロセス]
end
A -- "stream type=0x01<br/>(永続・1本だけ)<br/>AuthRequest/Response<br/>NewRequest 通知" --> E
E -. "stream type=0x02<br/>(リクエスト毎に新規)<br/>HTTP リクエスト本体" .-> A
classDef ctrl fill:#fef3c7,stroke:#d97706
classDef data fill:#dbeafe,stroke:#2563eb
class A,E ctrl
- コントロールプレーン(0x01)は agent 起動時に 1 本だけ 張る永続 stream。短いメッセージ(認証・通知)が双方向に流れる。
- データプレーン(0x02)は HTTP リクエストが来るたびに 1 本ずつ 張る、寿命の短い stream。HTTP リクエスト本体とレスポンスが流れる。
この分離は、本物の ngrok の agent ↔ edge プロトコルが採用しているのと同じパターンだ。コントロール stream の数が agent あたり 1 本に固定されることで、edge 側は「どの agent が生きているか」をその 1 本の生死で管理できる(後述するハートビートでさらに精度が上がる)。
コントロールメッセージの定義
control stream に流すメッセージを 3 種類定義する。spec.md §3.1 で決めた型だ。シリアライズには encoding/gob を使う。標準ライブラリで完結し、構造体の登録だけで扱えて教育用に最も簡潔だ(本番では Protobuf 等が選択肢になるが、ここでは muxado の話に集中したい)。
// internal/proto/proto.go の続き
// AuthRequest は agent が最初に送る認証メッセージ
type AuthRequest struct {
Token string
Version string
}
// AuthResponse は server が返す応答
type AuthResponse struct {
OK bool
PublicURL string // 払い出された公開URL(例: https://abc123.example.test)
Message string
}
// NewRequest は server → agent への「新規HTTPリクエスト来たよ」通知
type NewRequest struct {
StreamID uint64
Host string
ClientIP string
}
3 つのメッセージはすべて control stream(0x01)の上を gob でシリアライズして流れる。gob の Encoder/Decoder を control stream に紐付けておけば、enc.Encode(&msg) / dec.Decode(&msg) で送受信できる。
通信フロー(完成版)
ch06 完成時点の通信フローを sequence で見ておこう。spec.md §4 をそのまま図にしたものだ。
sequenceDiagram
participant B as Browser
participant E as Edge :443
participant A as Agent
participant L as localhost:3000
Note over A,E: 起動時:control stream を 1 本だけ張る
A->>E: TCP/TLS Dial + muxado Session 確立
A->>E: OpenTypedStream(0x01) → control stream
A->>E: gob: AuthRequest{Token, Version}
E-->>A: gob: AuthResponse{OK, PublicURL}
Note over B,L: リクエスト到来時
B->>E: GET https://abc123.example.test/foo
E->>A: gob over control: NewRequest{Host, ClientIP}
E->>A: OpenTypedStream(0x02) → proxy stream
E->>A: HTTPリクエスト本体(バイト列)を proxy に書く
A->>L: 通常 HTTP で localhost:3000 へ転送
L-->>A: HTTP レスポンス
A-->>E: proxy stream にレスポンスを書き戻す
E-->>B: HTTP レスポンス
ポイントは、プロキシ stream を Open するのは edge 側 だということ。muxado は「双方向どちらからでも Open できる」設計なので、NAT 内側の agent に向かって edge から能動的に stream を張れる。これが第 2 章で見た NAT 非対称性に対する「最後の一押し」になる ── outbound だけ通る非対称性を逆手にとって、いったん張った 1 本の TLS の中で、edge から agent へ無数の stream を開けてしまう。
実装ハンズオン:agent 側
第 5 章の agent から差分だけ示す(spec.md §3.3 の Agent.Run を更新する)。
// internal/agent/agent.go(抜粋・差分のみ)
func (a *Agent) Run(ctx context.Context) error {
// 1) TLS 接続を張って muxado Session を立てる
conn, err := tls.Dial("tcp", a.ServerAddr, &tls.Config{ /* ... */ })
if err != nil {
return fmt.Errorf("dial edge: %w", err)
}
sess := muxado.Client(conn, nil)
// 2) TypedStreamSession でラップ(ここが ch06 の核心)
tsess := muxado.NewTypedStreamSession(sess)
// 3) control stream を 1 本だけ開く(agent → edge)
ctrl, err := tsess.OpenTypedStream(proto.StreamTypeControl)
if err != nil {
return fmt.Errorf("open control stream: %w", err)
}
enc := gob.NewEncoder(ctrl)
dec := gob.NewDecoder(ctrl)
// 4) AuthRequest 送信 → AuthResponse 受信
if err := enc.Encode(&proto.AuthRequest{Token: a.Token, Version: "0.6.0"}); err != nil {
return fmt.Errorf("send auth: %w", err)
}
var resp proto.AuthResponse
if err := dec.Decode(&resp); err != nil {
return fmt.Errorf("recv auth: %w", err)
}
if !resp.OK {
return fmt.Errorf("auth failed: %s", resp.Message)
}
slog.Info("authenticated", "publicURL", resp.PublicURL)
// 5) 並行ループ:control stream は NewRequest 通知の購読、
// 別に AcceptTypedStream() で proxy stream を受け取って localhost に流す
go a.readControl(ctx, dec)
return a.acceptProxyStreams(ctx, tsess)
}
// acceptProxyStreams は edge 側から Open された proxy stream を受け取り続ける
func (a *Agent) acceptProxyStreams(ctx context.Context, tsess muxado.TypedStreamSession) error {
for {
ts, err := tsess.AcceptTypedStream()
if err != nil {
return err
}
// ここで型を見て分岐できる! ch05 にはなかった芸当
switch ts.StreamType() {
case proto.StreamTypeProxy:
go a.proxyToLocal(ts) // HTTPリクエスト本体を localhost:3000 に投げる
default:
slog.Warn("unknown stream type", "type", ts.StreamType())
ts.Close()
}
}
}
注目してほしいのは switch ts.StreamType() の部分。第 5 章までは「stream の中身を読んでみないと何の stream か分からない」という弱さがあった。それが、フレームヘッダレベルの型 ID で解決される。これが TypedStreamSession の効能だ。
実装ハンズオン:edge 側
edge 側は対称的に、agent が Open してきた control stream を Accept し、HTTP リクエストが来るたびに proxy stream を能動的に Open する。
// internal/server/server.go(抜粋・差分のみ)
func (s *Server) handleAgent(conn net.Conn) {
sess := muxado.Server(conn, nil)
tsess := muxado.NewTypedStreamSession(sess)
// 1) agent が最初に開く control stream を受け取る
ts, err := tsess.AcceptTypedStream()
if err != nil || ts.StreamType() != proto.StreamTypeControl {
slog.Error("expected control stream", "got", ts.StreamType())
return
}
dec := gob.NewDecoder(ts)
enc := gob.NewEncoder(ts)
// 2) AuthRequest を読み、AuthResponse を返す
var req proto.AuthRequest
if err := dec.Decode(&req); err != nil {
return
}
publicURL := s.allocateURL(req.Token)
enc.Encode(&proto.AuthResponse{OK: true, PublicURL: publicURL})
slog.Info("agent registered", "publicURL", publicURL)
// 3) この agent 用のルーティングテーブルにエントリを刻む
s.router.bind(publicURL, &agentHandle{tsess: tsess, ctrlEnc: enc})
}
// HTTPリクエストが来たとき、router がこれを呼ぶ
func (h *agentHandle) forward(r *http.Request) (*http.Response, error) {
// a) control 上で NewRequest を agent に通知
h.ctrlEnc.Encode(&proto.NewRequest{
Host: r.Host,
ClientIP: r.RemoteAddr,
})
// b) proxy stream を edge から能動的に開く
proxy, err := h.tsess.OpenTypedStream(proto.StreamTypeProxy)
if err != nil {
return nil, fmt.Errorf("open proxy stream: %w", err)
}
defer proxy.Close()
// c) HTTP リクエストを proxy stream にそのまま書き、レスポンスを読む
if err := r.Write(proxy); err != nil {
return nil, err
}
return http.ReadResponse(bufio.NewReader(proxy), r)
}
spec.md §7 の指針通り、ch06 で増える新規コードは 約 30 行。累積で 約 200 行。これで「200 行台で本物の ngrok の核心を再現する」という当初の約束に到達する。
ハートビートを足す(おまけ)
TypedStreamSession を導入したついでに、もう一つの同梱ラッパー Heartbeat も足しておこう。ngrok agent のドキュメントに登場する「ハートビートで RTT を測り、応答が止まったらコネクションを張り直す」挙動と同じ系統が、これだけで手に入る。
// agent 側で control stream を開いた直後に追加する
hb := muxado.NewHeartbeat(tsess, func(rtt time.Duration, timeout bool) {
if timeout {
slog.Warn("heartbeat timeout", "rtt", rtt)
} else {
slog.Debug("heartbeat", "rtt", rtt)
}
}, muxado.NewHeartbeatConfig())
hb.Start()
NewHeartbeat のシグネチャは func(sess TypedStreamSession, cb func(time.Duration, bool), config *HeartbeatConfig) *Heartbeat(pkg.go.dev v2.0.1 で確認)。コールバックは (rtt, timeout) を受け取る ── まさに「RTT と障害検出」だ。HeartbeatConfig.Type で独自の StreamType を割り当てられるが、デフォルトのままで構わない。これで agent と edge の間の「生きているか」が秒オーダーで分かるようになる。
動かしてみよう
第 5 章までと同じ手順で mintunnel-server と mintunnel-agent を立ち上げ、curl を打つ:
# ターミナル 1
$ go run ./cmd/mintunnel-server
time=... level=INFO msg="public listener ready" addr=:8080
time=... level=INFO msg="agent listener ready" addr=:7000
# ターミナル 2
$ go run ./cmd/mintunnel-agent
time=... level=INFO msg="authenticated" publicURL=https://abc123.example.test
time=... level=DEBUG msg="heartbeat" rtt=2.1ms
# ターミナル 3
$ curl -H "Host: abc123.example.test" http://localhost:8080/
Hello from localhost:3000!
server 側のログを見ると、control stream と proxy stream の開閉が slog に出ているはずだ:
level=INFO msg="agent registered" publicURL=https://abc123.example.test
level=INFO msg="stream opened" type=StreamTypeProxy id=3
level=INFO msg="stream closed" type=StreamTypeProxy id=3 duration=12ms
type=StreamTypeProxy が stream を読む前に 出ていることに注目してほしい。これが ch05 までのコードとの決定的な違いだ。
ここまでで自作 ngrok の核心はだいたい揃った
第 3 章で ssh -R を再発明し、第 4 章で 1 本の TCP に多重化を入れ、第 5 章で HTTP ヘッダーを書き換えて公開 URL を払い出し、この第 6 章でストリームに型を付けた。
| 章 | 何が増えたか | コード行数(累積) |
|---|---|---|
| ch03 | 単純な TCP forward | 80 |
| ch04 | muxado 多重化 | 130 |
| ch05 | Host ベースルーティング | 170 |
| ch06 | TypedStreamSession + Heartbeat | 約 200 |
200 行台の Go コードで、本物の ngrok agent と edge の間で喋られているプロトコルと同じ系統まで到達した。残っているのは、本番では当然必要になる要素 ── TLS 終端、ACME 自動化、マルチリージョン分散、Traffic Policy のような宣言的ルーティング ── だが、これらは「核心」というよりは「商用品質のための周辺」だ。
次章では、ここで作った最小版を 本物の ngrok と並べて答え合わせ する。GSLB(地理的負荷分散)、Traffic Policy、Agent Endpoint といった本物の構成要素が、ここで作った何に対応していて、自作版では何を省略したのか ── を一つひとつ突き合わせていく。本物の ngrok のドキュメントを読み返したとき、「あ、これは ch06 の TypedStreamSession のことだ」「これは ch05 の Host ヘッダー書き換えと同じだ」と読めるようになる。
そして、付録 B では「もし muxado ではなく hashicorp/yamux や golang.org/x/net/http2 を使ったら、ch06 のコードはどう変わるのか」を実コードで比較する。フレーム数 4 種だけの muxado と、フル機能の HTTP/2 で「同じ多重化トンネル」を書くと、どこが太り、どこが痩せるのか ── これも一つの楽しみとして取っておこう。
章末まとめ
muxado.TypedStreamSessionは stream に 型 ID(StreamType uint32) を持たせるラッパー。OpenTypedStream(stype)/AcceptTypedStream()で型付きの stream を扱えるmintunnelではStreamTypeControl = 0x01とStreamTypeProxy = 0x02の 2 種類を定義し、認証・通知と HTTP 本体転送を別 stream に分離した- control stream は agent あたり 1 本の 永続 stream、proxy stream は リクエストごとに edge 側から
Openする短命 streammuxado.NewHeartbeatで RTT と障害検出が数行で手に入る。本物の ngrok agent のハートビートと同じ系統- 第 6 章まで進めて約 200 行。本物の ngrok のプロトコル系統に到達した。次章では本物の ngrok(GSLB / Traffic Policy / Agent Endpoint)と並べて答え合わせをする