Summary
Add Layer B stream recovery to @cloudflare/think: when a Durable Object is evicted mid-turn, re-attach to the same upstream LLM run via Cloudflare AI Gateway's resumable streaming (cf-aig-run-id) and replay the exact output, instead of doing a fresh model call that re-spends tokens and regenerates a (possibly different) answer.
This is the payoff of the AI Gateway resume work. The SDK primitives it needs are now released in workers-ai-provider@3.2.0 (cloudflare/ai#573), so this is unblocked. The end-to-end recipe is already proven in experimental/gateway-resume-think/ (real ctx.abort() eviction mid-turn → recovered message byte-identical to the completed run, zero regenerated tokens). This issue tracks productizing it in packages/think.
Not starting yet — filing for pickup later. Design reference: design/rfc-workers-ai-gateway-merge.md §9 (Think/Agents SDK integration) and §7.1 (resume mechanics).
Background & motivation
Think has two recovery layers:
- Layer A — DO ↔ client (
ResumableStream): buffers serialized UIMessageChunks in SQLite and replays them over WebSocket when a client reconnects. Always on. Fully handles "client refreshed/dropped, DO still alive." Not affected by this issue.
- Layer B — DO ↔ upstream LLM (does not exist today): on DO eviction mid-turn, the
chatRecovery fiber survives and onChatRecovery defaults to continueLastTurn() — a fresh model call that re-spends tokens and regenerates. This is exactly where gateway cf-aig-run-id resume belongs.
The gap: today a mid-turn eviction means paying for the tokens twice and risking a divergent continuation. With Layer B it becomes a zero-token, byte-exact re-attach.
What's now available (workers-ai-provider@3.2.0)
createWorkersAI({ binding, providers, gateway? }) — routes "<provider>/<model>" catalog slugs through AI Gateway; gateway defaults to the account's "default" gateway.
createResumableStream({ binding, gateway, runId, fromEvent }) — re-attach to an existing run with no initial body; replays from an SSE event index.
onDispatch(info) → info.runId (the cf-aig-run-id) and onProgress(eventOffset) → the live SSE event index. The delegate owns header capture, so there's no framework header-reading gap.
onResumeExpired: "error" | "accept-partial" for buffer expiry (~5.5 min TTL; expiry signals as a clean 404).
- Typed errors (
WorkersAIGatewayError, WorkersAIFallbackError, GatewayDelegateError) re-exported from the package root.
Current state in this repo
- The recovery seam already exists in
packages/think/src/think.ts:
onChatRecovery(ctx) hook; ChatRecoveryContext carries recoveryKind: "retry" | "continue", partialText, partialParts, recoveryData, snapshot.
this.stash(...) (persists into cf_agents_runs, survives eviction); the chat fiber is CHAT_FIBER_NAME = "__cf_internal_chat_turn".
continueLastTurn() (the current default recovery — the regenerate path we want to replace when a run id is present).
_bumpChatRecoveryProgress() — the existing debounced progress flush to mirror for throttling.
- Proof of concept:
experimental/gateway-resume-think/ (src/resume.ts, src/gateway-model.ts, src/plan.ts, src/server.ts, src/layer-b.test.ts, scripts/driver.mjs). It currently vendors its own resume/model logic and does not depend on workers-ai-provider — see pre-work below.
Proposed implementation (packages/think)
- Capture. Thread the delegate's
onDispatch/onProgress so the chat turn sees { aigRunId, eventOffset } as it streams.
- Stash. Debounced
this.stash({ aigRunId, eventOffset }) inside the chat fiber, mirroring _bumpChatRecoveryProgress. Use a delta-based throttle (e.g. stash when eventOffset advances by N), not eventOffset % N — SSE offsets jump (one chunk can carry several events), so a modulo check often never lands on a boundary and the offset is never re-stashed (observed during the experiment: only the initial offset-0 stash survived until this was fixed).
- Recover. In
onChatRecovery, when recoveryData carries an aigRunId:
- Build the re-attach stream with
createResumableStream({ binding, gateway, runId, fromEvent: 0 }) (no initial body), feed it through the same @ai-sdk model → UIMessageChunks → existing reply loop.
- Because the gateway run is detached (keeps generating after the originating disconnect — verified),
fromEvent: 0 replays the complete run; continueLastTurn's replace semantics overwrite the partial leaf, so the recovered message is byte-identical to the full run with zero regenerated tokens.
- Fall back to
continueLastTurn() on expiry/miss (onResumeExpired or a 404 from re-attach).
Key design decisions (carried from the RFC, already validated)
from=0, not a tail re-attach (from=eventOffset). A tail re-attach is not zero-loss with continueLastTurn's replace semantics (it would drop the already-streamed prefix), and the Layer-A↔SSE offset-space mismatch misaligns the seam. from=0 needs only the run id, costs zero tokens (replays the gateway buffer, not the model), and is provably whole. Still stash the event offset for observability + an opt-in tail path later.
- Offset-space mismatch. Layer A counts post-parse
UIMessageChunks; gateway from=N counts provider-native SSE events. Stash the SSE event index from onProgress, not the chunk count.
recoveryKind mapping. Gateway resume is the preferred "continue" strategy (byte-exact, free). When unavailable, partialText/partialParts feed a model-agnostic user-message continuation (not assistant prefill — deprecated on Anthropic 4.6+); else cold "retry".
Open questions / decisions needed
Pre-work (recommended first, small)
Testing
Out of scope (follow-ups)
- OpenAI Responses API resume (its own
sequence_number mechanism) — RFC §10a; future matrix row, gated on a passthrough check.
- OpenAI Predicted Outputs — passthrough body field, no special resume handling (RFC §10b).
- Continuation-after-expiry wording reconciliation in the Layer-A buffer (RFC §10c).
References
Acceptance criteria
Summary
Add Layer B stream recovery to
@cloudflare/think: when a Durable Object is evicted mid-turn, re-attach to the same upstream LLM run via Cloudflare AI Gateway's resumable streaming (cf-aig-run-id) and replay the exact output, instead of doing a fresh model call that re-spends tokens and regenerates a (possibly different) answer.This is the payoff of the AI Gateway resume work. The SDK primitives it needs are now released in
workers-ai-provider@3.2.0(cloudflare/ai#573), so this is unblocked. The end-to-end recipe is already proven inexperimental/gateway-resume-think/(realctx.abort()eviction mid-turn → recovered message byte-identical to the completed run, zero regenerated tokens). This issue tracks productizing it inpackages/think.Background & motivation
Think has two recovery layers:
ResumableStream): buffers serializedUIMessageChunks in SQLite and replays them over WebSocket when a client reconnects. Always on. Fully handles "client refreshed/dropped, DO still alive." Not affected by this issue.chatRecoveryfiber survives andonChatRecoverydefaults tocontinueLastTurn()— a fresh model call that re-spends tokens and regenerates. This is exactly where gatewaycf-aig-run-idresume belongs.The gap: today a mid-turn eviction means paying for the tokens twice and risking a divergent continuation. With Layer B it becomes a zero-token, byte-exact re-attach.
What's now available (
workers-ai-provider@3.2.0)createWorkersAI({ binding, providers, gateway? })— routes"<provider>/<model>"catalog slugs through AI Gateway;gatewaydefaults to the account's"default"gateway.createResumableStream({ binding, gateway, runId, fromEvent })— re-attach to an existing run with noinitialbody; replays from an SSE event index.onDispatch(info)→info.runId(thecf-aig-run-id) andonProgress(eventOffset)→ the live SSE event index. The delegate owns header capture, so there's no framework header-reading gap.onResumeExpired: "error" | "accept-partial"for buffer expiry (~5.5 min TTL; expiry signals as a clean404).WorkersAIGatewayError,WorkersAIFallbackError,GatewayDelegateError) re-exported from the package root.Current state in this repo
packages/think/src/think.ts:onChatRecovery(ctx)hook;ChatRecoveryContextcarriesrecoveryKind: "retry" | "continue",partialText,partialParts,recoveryData,snapshot.this.stash(...)(persists intocf_agents_runs, survives eviction); the chat fiber isCHAT_FIBER_NAME = "__cf_internal_chat_turn".continueLastTurn()(the current default recovery — the regenerate path we want to replace when a run id is present)._bumpChatRecoveryProgress()— the existing debounced progress flush to mirror for throttling.experimental/gateway-resume-think/(src/resume.ts,src/gateway-model.ts,src/plan.ts,src/server.ts,src/layer-b.test.ts,scripts/driver.mjs). It currently vendors its own resume/model logic and does not depend onworkers-ai-provider— see pre-work below.Proposed implementation (
packages/think)onDispatch/onProgressso the chat turn sees{ aigRunId, eventOffset }as it streams.this.stash({ aigRunId, eventOffset })inside the chat fiber, mirroring_bumpChatRecoveryProgress. Use a delta-based throttle (e.g. stash wheneventOffsetadvances by N), noteventOffset % N— SSE offsets jump (one chunk can carry several events), so a modulo check often never lands on a boundary and the offset is never re-stashed (observed during the experiment: only the initial offset-0 stash survived until this was fixed).onChatRecovery, whenrecoveryDatacarries anaigRunId:createResumableStream({ binding, gateway, runId, fromEvent: 0 })(noinitialbody), feed it through the same@ai-sdkmodel →UIMessageChunks → existing reply loop.fromEvent: 0replays the complete run;continueLastTurn's replace semantics overwrite the partial leaf, so the recovered message is byte-identical to the full run with zero regenerated tokens.continueLastTurn()on expiry/miss (onResumeExpiredor a404from re-attach).Key design decisions (carried from the RFC, already validated)
from=0, not a tail re-attach (from=eventOffset). A tail re-attach is not zero-loss withcontinueLastTurn's replace semantics (it would drop the already-streamed prefix), and the Layer-A↔SSE offset-space mismatch misaligns the seam.from=0needs only the run id, costs zero tokens (replays the gateway buffer, not the model), and is provably whole. Still stash the event offset for observability + an opt-in tail path later.UIMessageChunks; gatewayfrom=Ncounts provider-native SSE events. Stash the SSE event index fromonProgress, not the chunk count.recoveryKindmapping. Gateway resume is the preferred "continue" strategy (byte-exact, free). When unavailable,partialText/partialPartsfeed a model-agnostic user-message continuation (not assistant prefill — deprecated on Anthropic 4.6+); else cold "retry".Open questions / decisions needed
packages/thinkorpackages/ai-chat? Both carry theonChatRecovery/continueLastTurnsurface;@cloudflare/thinkis the opinionated stream-resumption chat agent and is what the experiment targets. Confirm before building.workers-ai-providerdelegate + itsonDispatch/onProgresscallbacks automatically vs. the user passing a pre-built model and Think detecting resumability.)cf_agents_runsand how it composes with existing recovery snapshot data.Pre-work (recommended first, small)
experimental/gateway-resume-thinkoff its vendoredresume.ts/gateway-model.tsonto the publishedworkers-ai-provider@3.2.0(createResumableStream+createWorkersAI). Highest-signal check that the released API supports the recipe end-to-end and shrinks the experiment to the real surface before productizing.Testing
experimental/gateway-resume-think/src/layer-b.test.tsintopackages/thinktests: simulatectx.abort()eviction mid-turn, assert recovered message=== full runand zero regenerated tokens.404/onResumeExpired→ graceful fall back tocontinueLastTurn().experimental/gateway-resume-think/scripts/driver.mjs.Out of scope (follow-ups)
sequence_numbermechanism) — RFC §10a; future matrix row, gated on a passthrough check.References
design/rfc-workers-ai-gateway-merge.md§9 (Think integration), §7.1 (resume mechanics), §8a (attempt tree).experimental/gateway-resume-think/(+ itsREADME.md).workers-ai-provider@3.2.0, cloudflare/ai#573, per-provider run-path wire format cloudflare/ai#554.Acceptance criteria
continueLastTurn()on expiry/miss.