目次を表示する

ngrokの魔法を解き直す ── 仕組みを理解し、Goで自作してみる

コントロールとデータを分ける —— `TypedStreamSession` で型付け多重化する

コントロールとデータを分ける —— TypedStreamSession で型付け多重化する

前章の弱点 —— accept した stream の「中身を見るまで何か分からない」問題

第 4 章で muxado を導入し、第 5 章で Host ヘッダーを使った HTTP ルーティングを足した。ここまでで、mintunnel は「1 本の TLS 上に複数 stream を流せる、ホスト名で振り分けられる、x-forwarded-* を付ける」という、本物の ngrok の外枠に近いものになっている。

だが、コードをよく見ると 1 箇所だけ筋の悪いところが残っている。第 5 章の agent 側の処理を思い出してほしい:

// ch05 時点の agent ループ(簡略)
for {
    stream, err := sess.Accept() // ←ここで受け取った stream は「何用」?
    if err != nil {
        return err
    }
    go a.handleStream(stream) // とりあえずローカルにプロキシ
}

sess.Accept() で得られた stream が 何のために開かれたのか、コードからは分からない。

  • これは「agent の認証要求への応答」か?
  • それとも「edge から agent への新規 HTTP リクエスト通知」か?
  • あるいは「実際の HTTP リクエスト本体を流すプロキシストリーム」か?

ch05 までは全部「HTTP リクエスト本体」として扱ってきたので問題なく動いていた。だが現実の ngrok は 認証メタデータ通知実データ転送 を同じ stream に混ぜたりはしない。混ぜると、stream の冒頭で「これは何の stream か」を表すマジックバイトを読まないと処理が分岐できず、コードが汚れる。

こんな疑問をこの章で自力で解決できるようになる。

  • muxado の TypedStreamSession は何のためにあるのか?
  • 「コントロールプレーン」と「データプレーン」を分けると、コードはどう変わるのか?
  • ngrok agent のハートビートは、muxado のどの API でできているのか?

この章を読み終えると、stream に 型 ID(type identifier) を付けて開けるようになる。Accept() した瞬間に「これは制御 stream」「これはプロキシ stream」と判別でき、switch 1 つで処理を分岐できる構造になる。これで自作 ngrok は、本物の ngrok と同じ「型付き多重化プロトコル」の系統に到達する

TypedStreamSession という同梱ラッパー

muxado の公式 doc.go には、興味深い記述がある(第 4 章の冒頭で引いた doc.go の続きの部分だ)。

muxado ships with two wrappers that add commonly used functionality. The first is a TypedStreamSession which allows a client application to open streams with a type identifier so that the remote peer can identify the protocol that will be communicated on that stream. The second wrapper is a simple Heartbeat which issues a callback to the application informing it of round-trip latency and heartbeat failure.

要点は 2 つ。

  1. TypedStreamSession ── stream に「type identifier」を付けて開ける。受け取った側は中身を読む前に「この stream は何用か」が分かる。
  2. Heartbeat ── 一定間隔で ping を打ち、RTT(往復遅延)と障害を検出する。ngrok agent のドキュメントに登場する「ハートビート」と同じ系統。

muxado の本体(フレーム 4 種だけのストイックなプロトコル)は変えず、上位レイヤのラッパーとして被せる構造になっている。muxado を作った Alan Shreve の設計判断が透けて見える:「コアは小さく、よく使う機能はラッパーで」。

pkg.go.dev で API を確認する

golang.ngrok.com/muxado/v2 の API を見ると、こうなっている(pkg.go.dev v2.0.1 ベース)。

// 型 ID は uint32
type StreamType uint32

// TypedStreamSession は Session を被せたインターフェース
type TypedStreamSession interface {
    Session
    OpenTypedStream(stype StreamType) (Stream, error)
    AcceptTypedStream() (TypedStream, error)
}

// AcceptTypedStream の戻り値は型 ID を引ける
type TypedStream interface {
    Stream
    StreamType() StreamType
}

// ラップする関数
func NewTypedStreamSession(s Session) TypedStreamSession

muxado.Session で生のセッションを作り、それを NewTypedStreamSession でラップする。これだけで、stream を開く側は OpenTypedStream(0x01) のように型を指定でき、受け取る側は AcceptTypedStream() の戻り値から StreamType() でその型を取り出せる。

コントロールプレーンとデータプレーンの分離

自作 mintunnel で使う型 ID を 2 種類だけ定義する。spec.md §3.1 の定数だ。

// internal/proto/proto.go
package proto

import "github.com/golang.ngrok.com/muxado/v2"

// StreamType は muxado.TypedStreamSession で使うストリーム種別
type StreamType = muxado.StreamType

const (
    StreamTypeControl StreamType = 0x01 // 制御チャネル:認証・公開URL払い出し・新規リクエスト通知
    StreamTypeProxy   StreamType = 0x02 // データチャネル:HTTPリクエスト本体
)

StreamTypeControl(0x01)と StreamTypeProxy(0x02)の役割は、図にするとこうなる:

graph LR
  subgraph Agent
    A[agent プロセス]
  end
  subgraph Edge[Edge Server]
    E[server プロセス]
  end
  A -- "stream type=0x01<br/>(永続・1本だけ)<br/>AuthRequest/Response<br/>NewRequest 通知" --> E
  E -. "stream type=0x02<br/>(リクエスト毎に新規)<br/>HTTP リクエスト本体" .-> A
  classDef ctrl fill:#fef3c7,stroke:#d97706
  classDef data fill:#dbeafe,stroke:#2563eb
  class A,E ctrl
  • コントロールプレーン(0x01)は agent 起動時に 1 本だけ 張る永続 stream。短いメッセージ(認証・通知)が双方向に流れる。
  • データプレーン(0x02)は HTTP リクエストが来るたびに 1 本ずつ 張る、寿命の短い stream。HTTP リクエスト本体とレスポンスが流れる。

この分離は、本物の ngrok の agent ↔ edge プロトコルが採用しているのと同じパターンだ。コントロール stream の数が agent あたり 1 本に固定されることで、edge 側は「どの agent が生きているか」をその 1 本の生死で管理できる(後述するハートビートでさらに精度が上がる)。

コントロールメッセージの定義

control stream に流すメッセージを 3 種類定義する。spec.md §3.1 で決めた型だ。シリアライズには encoding/gob を使う。標準ライブラリで完結し、構造体の登録だけで扱えて教育用に最も簡潔だ(本番では Protobuf 等が選択肢になるが、ここでは muxado の話に集中したい)。

// internal/proto/proto.go の続き

// AuthRequest は agent が最初に送る認証メッセージ
type AuthRequest struct {
    Token   string
    Version string
}

// AuthResponse は server が返す応答
type AuthResponse struct {
    OK        bool
    PublicURL string // 払い出された公開URL(例: https://abc123.example.test)
    Message   string
}

// NewRequest は server → agent への「新規HTTPリクエスト来たよ」通知
type NewRequest struct {
    StreamID uint64
    Host     string
    ClientIP string
}

3 つのメッセージはすべて control stream(0x01)の上を gob でシリアライズして流れる。gobEncoder/Decoder を control stream に紐付けておけば、enc.Encode(&msg) / dec.Decode(&msg) で送受信できる。

通信フロー(完成版)

ch06 完成時点の通信フローを sequence で見ておこう。spec.md §4 をそのまま図にしたものだ。

sequenceDiagram
  participant B as Browser
  participant E as Edge :443
  participant A as Agent
  participant L as localhost:3000

  Note over A,E: 起動時:control stream を 1 本だけ張る
  A->>E: TCP/TLS Dial + muxado Session 確立
  A->>E: OpenTypedStream(0x01) → control stream
  A->>E: gob: AuthRequest{Token, Version}
  E-->>A: gob: AuthResponse{OK, PublicURL}

  Note over B,L: リクエスト到来時
  B->>E: GET https://abc123.example.test/foo
  E->>A: gob over control: NewRequest{Host, ClientIP}
  E->>A: OpenTypedStream(0x02) → proxy stream
  E->>A: HTTPリクエスト本体(バイト列)を proxy に書く
  A->>L: 通常 HTTP で localhost:3000 へ転送
  L-->>A: HTTP レスポンス
  A-->>E: proxy stream にレスポンスを書き戻す
  E-->>B: HTTP レスポンス

ポイントは、プロキシ stream を Open するのは edge 側 だということ。muxado は「双方向どちらからでも Open できる」設計なので、NAT 内側の agent に向かって edge から能動的に stream を張れる。これが第 2 章で見た NAT 非対称性に対する「最後の一押し」になる ── outbound だけ通る非対称性を逆手にとって、いったん張った 1 本の TLS の中で、edge から agent へ無数の stream を開けてしまう。

実装ハンズオン:agent 側

第 5 章の agent から差分だけ示す(spec.md §3.3 の Agent.Run を更新する)。

// internal/agent/agent.go(抜粋・差分のみ)
func (a *Agent) Run(ctx context.Context) error {
    // 1) TLS 接続を張って muxado Session を立てる
    conn, err := tls.Dial("tcp", a.ServerAddr, &tls.Config{ /* ... */ })
    if err != nil {
        return fmt.Errorf("dial edge: %w", err)
    }
    sess := muxado.Client(conn, nil)

    // 2) TypedStreamSession でラップ(ここが ch06 の核心)
    tsess := muxado.NewTypedStreamSession(sess)

    // 3) control stream を 1 本だけ開く(agent → edge)
    ctrl, err := tsess.OpenTypedStream(proto.StreamTypeControl)
    if err != nil {
        return fmt.Errorf("open control stream: %w", err)
    }
    enc := gob.NewEncoder(ctrl)
    dec := gob.NewDecoder(ctrl)

    // 4) AuthRequest 送信 → AuthResponse 受信
    if err := enc.Encode(&proto.AuthRequest{Token: a.Token, Version: "0.6.0"}); err != nil {
        return fmt.Errorf("send auth: %w", err)
    }
    var resp proto.AuthResponse
    if err := dec.Decode(&resp); err != nil {
        return fmt.Errorf("recv auth: %w", err)
    }
    if !resp.OK {
        return fmt.Errorf("auth failed: %s", resp.Message)
    }
    slog.Info("authenticated", "publicURL", resp.PublicURL)

    // 5) 並行ループ:control stream は NewRequest 通知の購読、
    //    別に AcceptTypedStream() で proxy stream を受け取って localhost に流す
    go a.readControl(ctx, dec)
    return a.acceptProxyStreams(ctx, tsess)
}

// acceptProxyStreams は edge 側から Open された proxy stream を受け取り続ける
func (a *Agent) acceptProxyStreams(ctx context.Context, tsess muxado.TypedStreamSession) error {
    for {
        ts, err := tsess.AcceptTypedStream()
        if err != nil {
            return err
        }
        // ここで型を見て分岐できる! ch05 にはなかった芸当
        switch ts.StreamType() {
        case proto.StreamTypeProxy:
            go a.proxyToLocal(ts) // HTTPリクエスト本体を localhost:3000 に投げる
        default:
            slog.Warn("unknown stream type", "type", ts.StreamType())
            ts.Close()
        }
    }
}

注目してほしいのは switch ts.StreamType() の部分。第 5 章までは「stream の中身を読んでみないと何の stream か分からない」という弱さがあった。それが、フレームヘッダレベルの型 ID で解決される。これが TypedStreamSession の効能だ。

実装ハンズオン:edge 側

edge 側は対称的に、agent が Open してきた control stream を Accept し、HTTP リクエストが来るたびに proxy stream を能動的に Open する。

// internal/server/server.go(抜粋・差分のみ)
func (s *Server) handleAgent(conn net.Conn) {
    sess := muxado.Server(conn, nil)
    tsess := muxado.NewTypedStreamSession(sess)

    // 1) agent が最初に開く control stream を受け取る
    ts, err := tsess.AcceptTypedStream()
    if err != nil || ts.StreamType() != proto.StreamTypeControl {
        slog.Error("expected control stream", "got", ts.StreamType())
        return
    }
    dec := gob.NewDecoder(ts)
    enc := gob.NewEncoder(ts)

    // 2) AuthRequest を読み、AuthResponse を返す
    var req proto.AuthRequest
    if err := dec.Decode(&req); err != nil {
        return
    }
    publicURL := s.allocateURL(req.Token)
    enc.Encode(&proto.AuthResponse{OK: true, PublicURL: publicURL})
    slog.Info("agent registered", "publicURL", publicURL)

    // 3) この agent 用のルーティングテーブルにエントリを刻む
    s.router.bind(publicURL, &agentHandle{tsess: tsess, ctrlEnc: enc})
}

// HTTPリクエストが来たとき、router がこれを呼ぶ
func (h *agentHandle) forward(r *http.Request) (*http.Response, error) {
    // a) control 上で NewRequest を agent に通知
    h.ctrlEnc.Encode(&proto.NewRequest{
        Host:     r.Host,
        ClientIP: r.RemoteAddr,
    })

    // b) proxy stream を edge から能動的に開く
    proxy, err := h.tsess.OpenTypedStream(proto.StreamTypeProxy)
    if err != nil {
        return nil, fmt.Errorf("open proxy stream: %w", err)
    }
    defer proxy.Close()

    // c) HTTP リクエストを proxy stream にそのまま書き、レスポンスを読む
    if err := r.Write(proxy); err != nil {
        return nil, err
    }
    return http.ReadResponse(bufio.NewReader(proxy), r)
}

spec.md §7 の指針通り、ch06 で増える新規コードは 約 30 行。累積で 約 200 行。これで「200 行台で本物の ngrok の核心を再現する」という当初の約束に到達する。

ハートビートを足す(おまけ)

TypedStreamSession を導入したついでに、もう一つの同梱ラッパー Heartbeat も足しておこう。ngrok agent のドキュメントに登場する「ハートビートで RTT を測り、応答が止まったらコネクションを張り直す」挙動と同じ系統が、これだけで手に入る。

// agent 側で control stream を開いた直後に追加する
hb := muxado.NewHeartbeat(tsess, func(rtt time.Duration, timeout bool) {
    if timeout {
        slog.Warn("heartbeat timeout", "rtt", rtt)
    } else {
        slog.Debug("heartbeat", "rtt", rtt)
    }
}, muxado.NewHeartbeatConfig())
hb.Start()

NewHeartbeat のシグネチャは func(sess TypedStreamSession, cb func(time.Duration, bool), config *HeartbeatConfig) *Heartbeat(pkg.go.dev v2.0.1 で確認)。コールバックは (rtt, timeout) を受け取る ── まさに「RTT と障害検出」だ。HeartbeatConfig.Type で独自の StreamType を割り当てられるが、デフォルトのままで構わない。これで agent と edge の間の「生きているか」が秒オーダーで分かるようになる。

動かしてみよう

第 5 章までと同じ手順で mintunnel-servermintunnel-agent を立ち上げ、curl を打つ:

# ターミナル 1
$ go run ./cmd/mintunnel-server
time=... level=INFO msg="public listener ready" addr=:8080
time=... level=INFO msg="agent listener ready" addr=:7000

# ターミナル 2
$ go run ./cmd/mintunnel-agent
time=... level=INFO msg="authenticated" publicURL=https://abc123.example.test
time=... level=DEBUG msg="heartbeat" rtt=2.1ms

# ターミナル 3
$ curl -H "Host: abc123.example.test" http://localhost:8080/
Hello from localhost:3000!

server 側のログを見ると、control stream と proxy stream の開閉が slog に出ているはずだ:

level=INFO msg="agent registered" publicURL=https://abc123.example.test
level=INFO msg="stream opened" type=StreamTypeProxy id=3
level=INFO msg="stream closed" type=StreamTypeProxy id=3 duration=12ms

type=StreamTypeProxystream を読む前に 出ていることに注目してほしい。これが ch05 までのコードとの決定的な違いだ。

ここまでで自作 ngrok の核心はだいたい揃った

第 3 章で ssh -R を再発明し、第 4 章で 1 本の TCP に多重化を入れ、第 5 章で HTTP ヘッダーを書き換えて公開 URL を払い出し、この第 6 章でストリームに型を付けた。

何が増えたかコード行数(累積)
ch03単純な TCP forward80
ch04muxado 多重化130
ch05Host ベースルーティング170
ch06TypedStreamSession + Heartbeat約 200

200 行台の Go コードで、本物の ngrok agent と edge の間で喋られているプロトコルと同じ系統まで到達した。残っているのは、本番では当然必要になる要素 ── TLS 終端、ACME 自動化、マルチリージョン分散、Traffic Policy のような宣言的ルーティング ── だが、これらは「核心」というよりは「商用品質のための周辺」だ。

次章では、ここで作った最小版を 本物の ngrok と並べて答え合わせ する。GSLB(地理的負荷分散)、Traffic Policy、Agent Endpoint といった本物の構成要素が、ここで作った何に対応していて、自作版では何を省略したのか ── を一つひとつ突き合わせていく。本物の ngrok のドキュメントを読み返したとき、「あ、これは ch06 の TypedStreamSession のことだ」「これは ch05 の Host ヘッダー書き換えと同じだ」と読めるようになる。

そして、付録 B では「もし muxado ではなく hashicorp/yamuxgolang.org/x/net/http2 を使ったら、ch06 のコードはどう変わるのか」を実コードで比較する。フレーム数 4 種だけの muxado と、フル機能の HTTP/2 で「同じ多重化トンネル」を書くと、どこが太り、どこが痩せるのか ── これも一つの楽しみとして取っておこう。


章末まとめ

  • muxado.TypedStreamSession は stream に 型 ID(StreamType uint32 を持たせるラッパー。OpenTypedStream(stype) / AcceptTypedStream() で型付きの stream を扱える
  • mintunnel では StreamTypeControl = 0x01StreamTypeProxy = 0x02 の 2 種類を定義し、認証・通知と HTTP 本体転送を別 stream に分離した
  • control stream は agent あたり 1 本の 永続 stream、proxy stream は リクエストごとに edge 側から Open する短命 stream
  • muxado.NewHeartbeat で RTT と障害検出が数行で手に入る。本物の ngrok agent のハートビートと同じ系統
  • 第 6 章まで進めて約 200 行。本物の ngrok のプロトコル系統に到達した。次章では本物の ngrok(GSLB / Traffic Policy / Agent Endpoint)と並べて答え合わせをする