Skip to content

Think Layer B: re-attach to AI Gateway runs (cf-aig-run-id) on DO eviction instead of regenerating #1763

Description

@threepointone

Summary

Add Layer B stream recovery to @cloudflare/think: when a Durable Object is evicted mid-turn, re-attach to the same upstream LLM run via Cloudflare AI Gateway's resumable streaming (cf-aig-run-id) and replay the exact output, instead of doing a fresh model call that re-spends tokens and regenerates a (possibly different) answer.

This is the payoff of the AI Gateway resume work. The SDK primitives it needs are now released in workers-ai-provider@3.2.0 (cloudflare/ai#573), so this is unblocked. The end-to-end recipe is already proven in experimental/gateway-resume-think/ (real ctx.abort() eviction mid-turn → recovered message byte-identical to the completed run, zero regenerated tokens). This issue tracks productizing it in packages/think.

Not starting yet — filing for pickup later. Design reference: design/rfc-workers-ai-gateway-merge.md §9 (Think/Agents SDK integration) and §7.1 (resume mechanics).

Background & motivation

Think has two recovery layers:

  • Layer A — DO ↔ client (ResumableStream): buffers serialized UIMessageChunks in SQLite and replays them over WebSocket when a client reconnects. Always on. Fully handles "client refreshed/dropped, DO still alive." Not affected by this issue.
  • Layer B — DO ↔ upstream LLM (does not exist today): on DO eviction mid-turn, the chatRecovery fiber survives and onChatRecovery defaults to continueLastTurn() — a fresh model call that re-spends tokens and regenerates. This is exactly where gateway cf-aig-run-id resume belongs.

The gap: today a mid-turn eviction means paying for the tokens twice and risking a divergent continuation. With Layer B it becomes a zero-token, byte-exact re-attach.

What's now available (workers-ai-provider@3.2.0)

  • createWorkersAI({ binding, providers, gateway? }) — routes "<provider>/<model>" catalog slugs through AI Gateway; gateway defaults to the account's "default" gateway.
  • createResumableStream({ binding, gateway, runId, fromEvent }) — re-attach to an existing run with no initial body; replays from an SSE event index.
  • onDispatch(info)info.runId (the cf-aig-run-id) and onProgress(eventOffset) → the live SSE event index. The delegate owns header capture, so there's no framework header-reading gap.
  • onResumeExpired: "error" | "accept-partial" for buffer expiry (~5.5 min TTL; expiry signals as a clean 404).
  • Typed errors (WorkersAIGatewayError, WorkersAIFallbackError, GatewayDelegateError) re-exported from the package root.

Current state in this repo

  • The recovery seam already exists in packages/think/src/think.ts:
    • onChatRecovery(ctx) hook; ChatRecoveryContext carries recoveryKind: "retry" | "continue", partialText, partialParts, recoveryData, snapshot.
    • this.stash(...) (persists into cf_agents_runs, survives eviction); the chat fiber is CHAT_FIBER_NAME = "__cf_internal_chat_turn".
    • continueLastTurn() (the current default recovery — the regenerate path we want to replace when a run id is present).
    • _bumpChatRecoveryProgress() — the existing debounced progress flush to mirror for throttling.
  • Proof of concept: experimental/gateway-resume-think/ (src/resume.ts, src/gateway-model.ts, src/plan.ts, src/server.ts, src/layer-b.test.ts, scripts/driver.mjs). It currently vendors its own resume/model logic and does not depend on workers-ai-provider — see pre-work below.

Proposed implementation (packages/think)

  1. Capture. Thread the delegate's onDispatch/onProgress so the chat turn sees { aigRunId, eventOffset } as it streams.
  2. Stash. Debounced this.stash({ aigRunId, eventOffset }) inside the chat fiber, mirroring _bumpChatRecoveryProgress. Use a delta-based throttle (e.g. stash when eventOffset advances by N), not eventOffset % N — SSE offsets jump (one chunk can carry several events), so a modulo check often never lands on a boundary and the offset is never re-stashed (observed during the experiment: only the initial offset-0 stash survived until this was fixed).
  3. Recover. In onChatRecovery, when recoveryData carries an aigRunId:
    • Build the re-attach stream with createResumableStream({ binding, gateway, runId, fromEvent: 0 }) (no initial body), feed it through the same @ai-sdk model → UIMessageChunks → existing reply loop.
    • Because the gateway run is detached (keeps generating after the originating disconnect — verified), fromEvent: 0 replays the complete run; continueLastTurn's replace semantics overwrite the partial leaf, so the recovered message is byte-identical to the full run with zero regenerated tokens.
    • Fall back to continueLastTurn() on expiry/miss (onResumeExpired or a 404 from re-attach).

Key design decisions (carried from the RFC, already validated)

  • from=0, not a tail re-attach (from=eventOffset). A tail re-attach is not zero-loss with continueLastTurn's replace semantics (it would drop the already-streamed prefix), and the Layer-A↔SSE offset-space mismatch misaligns the seam. from=0 needs only the run id, costs zero tokens (replays the gateway buffer, not the model), and is provably whole. Still stash the event offset for observability + an opt-in tail path later.
  • Offset-space mismatch. Layer A counts post-parse UIMessageChunks; gateway from=N counts provider-native SSE events. Stash the SSE event index from onProgress, not the chunk count.
  • recoveryKind mapping. Gateway resume is the preferred "continue" strategy (byte-exact, free). When unavailable, partialText/partialParts feed a model-agnostic user-message continuation (not assistant prefill — deprecated on Anthropic 4.6+); else cold "retry".

Open questions / decisions needed

  • Where does Layer B live — packages/think or packages/ai-chat? Both carry the onChatRecovery/continueLastTurn surface; @cloudflare/think is the opinionated stream-resumption chat agent and is what the experiment targets. Confirm before building.
  • Opt-in API. How does a user enable Layer B and supply the gateway-delegated model? (e.g. a Think option that wires the workers-ai-provider delegate + its onDispatch/onProgress callbacks automatically vs. the user passing a pre-built model and Think detecting resumability.)
  • Gateway/runId persistence shape in cf_agents_runs and how it composes with existing recovery snapshot data.
  • Is the resume-buffer TTL configurable per gateway? Drives how aggressively we persist partials as a backstop.

Pre-work (recommended first, small)

  • Dogfood 3.2.0. Rebase experimental/gateway-resume-think off its vendored resume.ts/gateway-model.ts onto the published workers-ai-provider@3.2.0 (createResumableStream + createWorkersAI). Highest-signal check that the released API supports the recipe end-to-end and shrinks the experiment to the real surface before productizing.

Testing

  • Port experimental/gateway-resume-think/src/layer-b.test.ts into packages/think tests: simulate ctx.abort() eviction mid-turn, assert recovered message === full run and zero regenerated tokens.
  • Cover the expiry path: 404/onResumeExpired → graceful fall back to continueLastTurn().
  • Cover the throttle: delta-based stash re-stashes the advancing offset (regression guard for the modulo bug).
  • E2E against a live gateway (gated on creds), mirroring experimental/gateway-resume-think/scripts/driver.mjs.

Out of scope (follow-ups)

  • OpenAI Responses API resume (its own sequence_number mechanism) — RFC §10a; future matrix row, gated on a passthrough check.
  • OpenAI Predicted Outputs — passthrough body field, no special resume handling (RFC §10b).
  • Continuation-after-expiry wording reconciliation in the Layer-A buffer (RFC §10c).

References

  • Design: design/rfc-workers-ai-gateway-merge.md §9 (Think integration), §7.1 (resume mechanics), §8a (attempt tree).
  • Proof of concept: experimental/gateway-resume-think/ (+ its README.md).
  • SDK: workers-ai-provider@3.2.0, cloudflare/ai#573, per-provider run-path wire format cloudflare/ai#554.

Acceptance criteria

  • DO eviction mid-turn re-attaches to the upstream run and produces a message byte-identical to the completed run, with zero regenerated tokens, when the run id is valid and within TTL.
  • Graceful, automatic fallback to continueLastTurn() on expiry/miss.
  • Opt-in, additive API — default Think behavior unchanged when Layer B isn't enabled.
  • Tests + a guide/example demonstrating resumable third-party streaming with Think.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions