diff --git a/sdk/agentserver/PREVIEW-SHARE.md b/sdk/agentserver/PREVIEW-SHARE.md new file mode 100644 index 000000000000..8a022dd72f14 --- /dev/null +++ b/sdk/agentserver/PREVIEW-SHARE.md @@ -0,0 +1,87 @@ +# Agentserver durable preview — share bundle + +This branch is a **self-contained preview distribution** of the +`azure-ai-agentserver-*` durable + Responses/Invocations primitives, +assembled for internal teams to experiment with. It bundles +pre-built wheels, runnable **durable** samples, developer guides, and +copy-into-your-project Copilot skills — no PyPI publish or source build +required. + +> Built off `main`. The package **source** under +> `azure-ai-agentserver-*/azure/...` is `main`'s — consume the +> **wheels** below, not the in-tree source. + +## What's here + +| Path | Contents | +|------|----------| +| [`wheels/`](wheels/) | Pre-built `core` / `invocations` / `responses` wheels. Install these. | +| [`skills/`](skills/) | 4 standalone Copilot skills (durable-task, streaming, invocations, responses). Drop next to your code. | +| [`azure-ai-agentserver-core/docs/`](azure-ai-agentserver-core/docs/) | Durable-task + streaming developer guides + the `task-and-streaming-spec.md` source-of-truth spec. | +| [`azure-ai-agentserver-responses/docs/`](azure-ai-agentserver-responses/docs/) | Responses durability + handler-implementation guides + the `responses-durability-spec.md` SOT spec and `durability-contract.md` contract matrix. | +| `azure-ai-agentserver-responses/samples/` | Durable Responses samples + the `durable-responses-agent-demo`. | +| `azure-ai-agentserver-invocations/samples/` | Durable Invocations samples + the `durable-agent-demo`. | + +Only **durable** samples are included. + +## Latest refresh + +The bundled wheels carry the current durable + responses fixes: + +- **core** — steering fixes: `ctx.pending_input_count` now reflects the live + queued-input count (was always `0`); the steering drain transitions the + record `suspended→in_progress` so the steered turn runs on hosted (was + failing with "lease renewal is only supported for in_progress tasks"); + plus write-serialization hardening (read-inside-lock, lock-held update + primitive, no blind writes). Validated end-to-end on a hosted deployment. +- **responses** — durable stored streams are created under SSE keep-alive + (hosted responses no longer hang `in_progress`). +- **invocations sample** — `durable-agent-demo` uses `gpt-4o`; `demo-client.sh` + auto-resolves the endpoint from your azd env after `azd deploy`. + +## Install + +```bash +pip install wheels/*.whl +``` + +## Run the crash → recover demo locally + +The durable demos run end-to-end against a **hosted** Foundry deployment +(`azd deploy` the sample, then drive it with `demo-client.sh` — the client +auto-resolves the endpoint from your azd env). An equivalent **local**, +file-backed kit (task store + response store on disk, no hosted dependency) +is also provided for offline experimentation. A ready-to-run, verified kit +lives at +**[`azure-ai-agentserver-responses/samples/durable-responses-agent-demo/local/`](azure-ai-agentserver-responses/samples/durable-responses-agent-demo/local/README.md)**: + +```bash +cd azure-ai-agentserver-responses/samples/durable-responses-agent-demo/local +./setup.sh # venv from ../../../../wheels +az login +export FOUNDRY_PROJECT_ENDPOINT=https://.services.ai.azure.com/api/projects/ +export AZURE_AI_MODEL_DEPLOYMENT_NAME=gpt-4o +./run.sh # stream -> crash -> recover -> verify +``` + +The local switch is two env vars (the kit sets them for you): + +```bash +export AGENTSERVER_TASKS_BACKEND=local +export AGENTSERVER_DURABLE_ROOT=/tmp/durable # task + response store +``` + +There is an equivalent verified kit for the **invocations** durable demo at +[`azure-ai-agentserver-invocations/samples/durable-agent-demo/local/`](azure-ai-agentserver-invocations/samples/durable-agent-demo/local/README.md) +(same `./setup.sh` → `./run.sh` flow). + +## Versions + +| Wheel | Version | +|-------|---------| +| `azure-ai-agentserver-core` | `2.0.0b7` | +| `azure-ai-agentserver-invocations` | `1.0.0b6` | +| `azure-ai-agentserver-responses` | `1.0.0b8` | + +These are unreleased preview (`bN`) builds. To rebuild the wheels from +updated source, see [`wheels/build-wheels.sh`](wheels/build-wheels.sh). diff --git a/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md b/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md new file mode 100644 index 000000000000..485a3911e084 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md @@ -0,0 +1,868 @@ +# Durable Tasks — Developer Guide + +This is the developer guide for `azure.ai.agentserver.core.durable` — +the durable-task primitive that turns an `async def` function into a +crash-resilient unit of agent work. + +If your agent needs to survive container crashes, OOM kills, or +redeployments without losing its place, you want this. If your turn +of work could plausibly outlive the request that started it (long +LLM calls, multi-step tool chains, multi-message conversations), you +want this. + +--- + +## 1. Why + +There is **one primitive in two flavours**: + +- **`@task`** — *one-shot*. A single durable run of a function. + Returns its `Output`, then the record is gone. Use for "do this + one thing durably". + +- **`@multi_turn_task`** — *chain*. A series of turns sharing a + conversation identity (a `task_id`). Each `return X` is one turn; + the chain stays alive in between turns and can accept more inputs. + Use for chat sessions, agents that work across multiple user + messages, durable orchestrations. + +Both run the same way under the hood: lease-based crash recovery, a +single typed input per turn, a `TaskContext` handle, optional retry, +optional steering (for `multi_turn_task`). + +What this primitive solves: + +- **Crash survival.** If the process dies mid-call, the next + process picks up the same task with the same input and runs the + handler again (or, for a chain in `suspended`, the next caller + resumes the chain). +- **Identity.** A `task_id` is the durable name of the work. Two + callers naming the same `task_id` don't double-execute — they + attach to the same run. +- **Typed inputs and outputs.** Generic in `Input` and `Output`; + the framework persists the input and surfaces the output through + a typed handle. +- **Cooperative cancellation.** The caller can ask the handler to + stop; the handler decides how to wind down. +- **Lightweight, small surface.** A few decorators, a few classes, + a handful of exceptions. + +What this primitive deliberately does **not** do: + +- Deterministic replay. The handler is re-invoked from the top on + recovery; effects are your responsibility (use `ctx.metadata` + watermarks for at-most-once patterns — see §6). +- Workflow orchestration (fan-out / fan-in / child workflows). If + you want Temporal-style orchestration, use Temporal; you can + still wrap durable tasks inside it. +- A bulk data store. `ctx.metadata` is small and JSON-only; + conversation history and big blobs belong in your own storage. +- A queue. One `task_id` is one logical job — not a competing-consumer + pull queue. + +--- + +## 2. Mental model + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Your code │ +│ │ +│ @task @multi_turn_task │ +│ async def summarize(ctx): async def chat(ctx): │ +│ return work(ctx.input) return reply(ctx.input) │ +│ │ +│ await summarize.run(input=X) await chat.run( │ +│ task_id="c1", input=X) │ +└─────────────────────────────────────────────────────────────────┘ + ▲ + │ (your async caller) + │ +┌─────────────────────────────────────────────────────────────────┐ +│ Durable task framework │ +│ │ +│ - persists input + metadata + lease │ +│ - invokes your handler with TaskContext │ +│ - watches for crashes, reclaims abandoned leases │ +│ - delivers output via TaskRun.result() / await run │ +└─────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Task store (hosted or local file-backed) │ +│ │ +│ PATCH-with-ETag store of task records: │ +│ id, status, lease_owner, payload, attachments, etag │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### One-shot vs multi-turn — at a glance + +| | `@task` (one-shot) | `@multi_turn_task` (chain) | +|--------------------------|--------------------|-----------------------------| +| Lifetime | One run | Multiple turns, chain stays alive between turns | +| `task_id` on `.start` | Optional (auto-gen GUID) | Mandatory | +| `input_id` | Defaults to `task_id` (1:1) | Per turn (auto-gen GUID per turn) | +| Terminal status | `completed` / `failed` / `cancelled` → record deleted | `suspended` between turns; deleted only via `.delete(task_id)` | +| `.delete(task_id)` | Not available (auto-cleans on terminal) | Available — chain-level delete | +| Handler `return X` | Finishes the run; `await run.result()` resolves to `X` | Finishes the **turn**; chain goes to `suspended`; caller receives `X` | +| Steering queue | n/a | `steerable=True` opt-in | +| Concurrent `.start` on same `task_id` while in-flight | `TaskConflictError` | If `steerable=True`: queued; else `TaskConflictError` | + +--- + +## 3. Hello world + +### One-shot + +```python +import asyncio +from azure.ai.agentserver.core.durable import task, TaskContext + +@task(name="summarize") +async def summarize(ctx: TaskContext[str]) -> str: + # ctx.input is typed as str; the framework persisted it before invoking us. + return ctx.input.upper() + +async def main(): + # Lifecycle-aware: creates fresh, attaches to in-flight, recovers a + # crashed prior lifetime — all automatic. task_id is optional. + output: str = await summarize.run(input="hello") + print(output) # 'HELLO' + +asyncio.run(main()) +``` + +### Multi-turn chain + +```python +import asyncio +from azure.ai.agentserver.core.durable import multi_turn_task, TaskContext + +@multi_turn_task(name="chat") +async def chat(ctx: TaskContext[dict]) -> dict: + return {"reply": f"Echo: {ctx.input['msg']}", + "input_id": ctx.input_id} + +async def main(): + # Turn 1 — fresh chain. + r1 = await chat.run(task_id="conv-7", input={"msg": "hi"}) + print(r1) # {"reply": "Echo: hi", "input_id": ""} + + # Turn 2 — same task_id resumes the persisted chain; same handler + # is invoked with the new ctx.input. + r2 = await chat.run(task_id="conv-7", input={"msg": "what's up?"}) + print(r2) # {"reply": "Echo: what's up?", "input_id": ""} + +asyncio.run(main()) +``` + +--- + +## 4. Concepts + +### 4.1 Identifiers + +- **`task_id`** — the durable name of the work. + - One-shot: optional; the framework generates a GUID when omitted. + Two callers passing the same `task_id` for a one-shot **converge** + (the second caller either attaches to the first's in-flight run + or sees `TaskConflictError` if it has already terminated). + - Multi-turn: mandatory; identifies the chain. + +- **`input_id`** — the durable name of one input within the chain. + - One-shot: defaults to `task_id` (one run, one input — the 1:1 + invariant). + - Multi-turn: per turn; the framework generates a GUID per turn + unless the caller supplies one (callers managing their own per- + message ids — e.g. chat clients — pass them through). + +- **`if_last_input_id=""`** — an optional precondition on + `.start` / `.run`. The framework verifies that the chain's + currently-stored last-accepted `input_id` equals `` before + accepting the new input. If a concurrent caller advanced the + chain first, the call raises `LastInputIdPreconditionFailed`. + Use this when your caller is reasoning about message ordering + (HTTP `If-Match`-style optimistic concurrency on the input + queue). + +### 4.2 Entry mode + +The handler can branch on `ctx.entry_mode`: + +| Value | Means | +|---------------|------------------------------------------------------------| +| `"fresh"` | First invocation for this `(task_id, input_id)` | +| `"resumed"` | This is a subsequent turn of an existing chain (multi-turn)| +| `"recovered"` | A previous lifetime ran this same `(task_id, input_id)` and didn't finish (lease was abandoned); the framework is re-invoking with the persisted input | + +```python +@multi_turn_task(name="checkpointer") +async def step(ctx: TaskContext[dict]) -> dict: + if ctx.entry_mode == "recovered": + # Skip any work we already wrote to ctx.metadata; pick up where we left off. + last_done = ctx.metadata.get("last_done_step") + ... +``` + +### 4.3 Inputs and outputs + +The handler signature is `async def fn(ctx: TaskContext[Input]) -> Output`. +The framework infers `Input` and `Output` from the annotation; the +typing flows through `task_id.run(input=X) -> Output`. + +- **Inputs are persisted before the handler runs.** That is the + guarantee crash recovery rests on: a recovered handler is invoked + with the same `ctx.input` it would have seen in the lost lifetime. +- **Outputs are not persisted.** When the handler returns, the + value resolves the caller's `await run.result()` — that is the + only place it appears. There is no `payload["output"]` and no + output attachment to inspect later. If you want to keep a + per-turn artifact across crashes, write it through your handler + (LangGraph checkpoint, your own DB, etc.) before you return. +- **Per-input size limit** ≈ 2 MB (after JSON serialization). + Larger inputs raise `InputTooLarge` at the caller before any + network round-trip. Externalize (blob store + reference) for + bigger payloads. + +### 4.4 The handler's context (`TaskContext`) + +```python +class TaskContext: + input: Input # the value the caller passed + task_id: str + input_id: str # per-turn id + entry_mode: Literal["fresh", "resumed", "recovered"] + metadata: TaskMetadata # callable namespace facade (see §4.5) + retry_attempt: int # 0 on the first try + is_steered_turn: bool # True iff this turn was promoted from the queue + pending_input_count: int # how many newer turns are queued + + # Cancellation signals — all cooperative. + cancel: asyncio.Event # any-cause cancel + cancel_requested: bool # cause: TaskRun.cancel() was called + timeout_exceeded: bool # cause: per-task timeout fired + shutdown: asyncio.Event # container is shutting down + + async def exit_for_recovery(self) -> None: ... +``` + +The first parameter MUST be named `ctx`. The framework binds +positionally, but it validates the name at decoration time so the +guide examples and your code stay consistent. + +### 4.5 Metadata + +`ctx.metadata` is a **callable namespace facade**: small key-value +state that survives crashes and is visible across turns of a chain. +Values must be JSON-serializable (the framework exposes the +`JSONValue` type alias). + +```python +@multi_turn_task(name="agent") +async def agent(ctx: TaskContext[dict]) -> dict: + # Default namespace. + ctx.metadata["score"] = 42 + # Named namespace — auto-vivified. + ctx.metadata("billing")["tokens_in"] = 130 + return {"ok": True} +``` + +Names starting with `_` are reserved for the framework and raise +`ValueError` at write time. Use `ctx.metadata.flush()` if you need +an explicit at-most-once fence before a side effect. + +### 4.6 The result handle (`TaskRun`) + +`.start(...)` returns a `TaskRun[Output]`: + +```python +class TaskRun(Generic[Output]): + task_id: str + input_id: str + metadata: TaskMetadata # live ref while the run is in-flight + + async def result(self) -> Output: ... + async def cancel(self) -> None: ... + def __await__(self) -> Output: ... # so `output = await run` works +``` + +That is the entire `TaskRun` surface. The framework intentionally +does **not** expose `.delete`, `.refresh`, `.status`, or +`.lease_expiry_count` on the handle — for chain-level deletion use +`MultiTurnTask.delete(task_id)`, and for status inspection consult +the store directly via the task manager. + +### 4.7 Steering (multi-turn only) + +Pass `steerable=True` to `@multi_turn_task` to opt into the steering +queue. With steering on, a `.start` against an in-flight chain +**queues** the new input rather than raising — the framework +delivers it as the next turn after the current turn ends. + +```python +@multi_turn_task(name="conv", steerable=True) +async def conv(ctx: TaskContext[dict]) -> dict: + return await llm(ctx.input) + +# Mid-conversation steering: user changes their mind 50 ms into turn 1. +r1 = asyncio.create_task(conv.start(task_id="c1", input={"msg": "Plan a trip to Rome"})) +await asyncio.sleep(0.05) +r2 = asyncio.create_task(conv.start(task_id="c1", input={"msg": "Actually, Paris"})) +# r1 resolves with turn 1's outcome; r2 resolves with turn 2's outcome. +``` + +The handler observes `ctx.cancel.is_set()` during turn 1 if there's +something queued — it can wind down early and let the queued turn +take over (see §6 "interruptible turns"). + +### 4.8 Retry + +Per-turn (multi-turn) or per-run (one-shot). Configure via the +decorator: + +```python +from datetime import timedelta +from azure.ai.agentserver.core.durable import RetryPolicy + +@task( + name="fetch", + retry=RetryPolicy( + max_attempts=3, + initial_delay=timedelta(seconds=1), + max_delay=timedelta(seconds=10), + backoff_coefficient=2.0, + jitter=True, + ), +) +async def fetch(ctx: TaskContext[str]) -> bytes: ... +``` + +`ctx.retry_attempt` (0-based) is exposed if your handler wants to +branch. The retry counter resets at every new turn boundary +(multi-turn) so a new turn starts with a fresh budget. + +When the budget is exhausted, the caller sees +`TaskFailed(error=TaskExhaustedRetriesErrorDict(...))` (vs the +normal `TaskFailed(error=TaskErrorDict(...))` for a non-retryable +raise). + +`ctx.retry_attempt` is persisted: **crash recovery does NOT consume +retry budget**. If attempt 2 of 3 crashes mid-flight, the recovered +handler sees `ctx.retry_attempt == 2` and still has its third +attempt available — the recovery is not counted as an extra retry. + +### 4.9 Cancellation + +Cancellation is **cooperative**. The framework never force-stops a +running handler. The handler observes `ctx.cancel` (an +`asyncio.Event`) and chooses how to wind down: + +- Raise `asyncio.CancelledError` → caller sees `TaskCancelled`. +- `return X` → caller sees `X` (treated as a normal completion; + for multi-turn that's an implicit suspend of the chain). +- Call `await ctx.exit_for_recovery()` (only valid when + `ctx.shutdown` is set) → caller sees `TaskDeferred`; the task + stays `in_progress`; the recovery scanner re-invokes the + handler in a future process lifetime. + +When the handler sees `ctx.cancel.is_set()`, it can branch on +the cause via the cause-discriminator booleans: + +| Trigger | `ctx.cancel_requested` | `ctx.timeout_exceeded` | `ctx.shutdown.is_set()` | +|--------------------------------------|------------------------|------------------------|-------------------------| +| `await run.cancel()` (caller-cancel) | `True` | `False` | `False` | +| Per-turn `timeout=` watchdog fires | `False` | `True` | `False` | +| Container graceful shutdown | `False` | `False` | `True` | + +`ctx.is_steered_turn` and `ctx.pending_input_count` round out the +steering-observability surface: a steerable handler that sees +`ctx.cancel.is_set()` AND `ctx.pending_input_count > 0` knows the +cancel was triggered by a newer turn being queued behind it and +can choose to wind down early so the next turn gets the lane. + +### 4.10 Timeout + +Each task can specify a `timeout` on its decorator. The watchdog +is **per-turn**, **wall-clock**, and **durable**: + +- **Per-turn** — the budget resets at every turn boundary + (multi-turn) or at the start of each fresh run (one-shot). It is + NOT a per-invocation budget; if a recovered handler is re-invoked + with the same `ctx.input` after a crash, the timeout starts from + the persisted turn-start timestamp — not from the new lifetime's + re-invocation. +- **Wall-clock** — the watchdog uses the persisted turn-start + timestamp (UTC) and "now" wall-clock. It survives crashes: a + recovered handler that started its turn one minute before a + process death and has a 90-second budget gets ~30 seconds before + the watchdog fires. +- **Durable** — the persisted turn-start timestamp means the + watchdog's view of "time elapsed" is the same across crashes, + so a long-running turn cannot game the budget by triggering + recovery to reset its clock. + +When the watchdog fires it sets `ctx.cancel` and flips +`ctx.timeout_exceeded`. The handler decides what to do (see §4.9). + +### 4.11 Shutdown + +Container shutdown sets `ctx.shutdown` (an `asyncio.Event`) AND +`ctx.cancel`. The intended handler response is to call +`await ctx.exit_for_recovery()`, which: + +1. Releases the lease without writing a terminal status. +2. Raises `TaskDeferred` to the caller of `.result()`. +3. Leaves the task `in_progress` so the next process lifetime's + recovery scanner picks it up and re-invokes the handler with + the persisted `ctx.input`. + +`exit_for_recovery()` is only meaningful during shutdown; calling +it outside that context is a programming error. + +### 4.12 Multi-turn chain deletion + +```python +await chat.delete("conv-7") +``` + +Force-removes the chain: cancels any in-flight turn, resolves all +queued steerer callers with `TaskCancelled`, and deletes the +record. Idempotent (no-op if the chain is already gone). + +--- + +## 5. Reference + +### 5.1 Decorators + +```python +def task( + *, + name: str, # required — used for registration / recovery + title: str | None = None, # static label for telemetry + timeout: timedelta | None = None, # cooperative watchdog + retry: RetryPolicy | None = None, # None = no retry +) -> Callable[[Handler], Task[Input, Output]]: ... + +def multi_turn_task( + *, + name: str, + title: str | None = None, + timeout: timedelta | None = None, + retry: RetryPolicy | None = None, + steerable: bool = False, +) -> Callable[[Handler], MultiTurnTask[Input, Output]]: ... +``` + +Each decorator produces a **distinct class** (`Task` vs +`MultiTurnTask`) — the type checker enforces "no `.delete()` on +one-shot" and "multi-turn `get_active_run` takes `(task_id, +input_id)`" statically. + +### 5.2 `Task` (one-shot) + +```python +class Task(Generic[Input, Output]): + name: str + + async def run( + self, *, + input: Input, + task_id: str | None = None, + input_id: str | None = None, + if_last_input_id: str | None = None, + ) -> Output: ... + + async def start( + self, *, + input: Input, + task_id: str | None = None, + input_id: str | None = None, + if_last_input_id: str | None = None, + ) -> TaskRun[Output]: ... + + async def get_active_run(self, task_id: str) -> TaskRun[Output] | None: ... +``` + +### 5.3 `MultiTurnTask` + +```python +class MultiTurnTask(Generic[Input, Output]): + name: str + + async def run( + self, *, + task_id: str, + input: Input, + input_id: str | None = None, + if_last_input_id: str | None = None, + ) -> Output: ... + + async def start( + self, *, + task_id: str, + input: Input, + input_id: str | None = None, + if_last_input_id: str | None = None, + ) -> TaskRun[Output]: ... + + async def get_active_run( + self, task_id: str, input_id: str, + ) -> TaskRun[Output] | None: ... + + async def delete(self, task_id: str) -> None: ... +``` + +### 5.4 `TaskRun[Output]` + +```python +class TaskRun(Generic[Output]): + task_id: str + input_id: str + metadata: TaskMetadata + + async def result(self) -> Output: ... + async def cancel(self) -> None: ... + def __await__(self) -> Generator[Any, None, Output]: ... +``` + +### 5.5 `TaskContext[Input]` + +```python +class TaskContext(Generic[Input]): + # Identifiers (read-only). + input: Input + task_id: str + input_id: str + entry_mode: EntryMode # "fresh" | "resumed" | "recovered" + retry_attempt: int # 0 on the first try; survives crash recovery + + # Steering observability (multi-turn). + is_steered_turn: bool # True iff this turn was promoted from the queue + pending_input_count: int # how many newer turns are queued behind this one + + # Cancellation — all cooperative. + cancel: asyncio.Event # any-cause cancel + cancel_requested: bool # cause: TaskRun.cancel() was called + timeout_exceeded: bool # cause: per-turn timeout watchdog fired + shutdown: asyncio.Event # container is shutting down + + # Cross-turn / cross-attempt state. + metadata: TaskMetadata + + # Control. + async def exit_for_recovery(self) -> None: ... +``` + +The handler's first parameter MUST be named `ctx`. The framework +binds positionally, but it validates the name at decoration time +so the guide examples and your handler stay consistent. + +Read-only enumeration: + +- `ctx.input`, `ctx.task_id`, `ctx.input_id`, `ctx.entry_mode`, + `ctx.retry_attempt` +- `ctx.is_steered_turn`, `ctx.pending_input_count` +- `ctx.cancel`, `ctx.cancel_requested`, `ctx.timeout_exceeded`, + `ctx.shutdown` +- `ctx.metadata` +- `ctx.exit_for_recovery()` + +### 5.6 Exceptions + +Public exception taxonomy. Each carries only **new** information the +caller doesn't already have (the caller already has `task_id` / +`input_id` from the call site or `TaskRun`). + +| Exception | Shape | When it is raised | +|-----------|-------|-------------------| +| `TaskFailed` | `error: TaskErrorDict \| TaskExhaustedRetriesErrorDict` | Handler raised; caller of `.result()` / `.run()` sees this. | +| `TaskCancelled` | bare | Cooperative cancel honoured (handler raised `CancelledError`); per-task timeout watchdog honoured; `MultiTurnTask.delete()` invalidating an in-flight run; queued steerer cancelled before promotion. | +| `TaskDeferred` | bare | Handler called `ctx.exit_for_recovery()` — the task continues durably; the recovery scanner re-invokes in a future lifetime. | +| `TaskConflictError` | `current_status: str` | `.start` / `.run` against an in-flight or terminal task that can't accept the call (one-shot in-progress / completed; multi-turn non-steerable in-progress). | +| `LastInputIdPreconditionFailed` | `actual_last_input_id: str \| None` | `if_last_input_id=` precondition didn't match. | +| `SteeringQueueFull` | bare | Steering queue at capacity (multi-turn `steerable=True` only). | +| `InputTooLarge` | bare | Input value exceeds the per-input cap. | + +`TaskFailed.error` is one of two `TypedDict`s: + +```python +class TaskErrorDict(TypedDict): + type: str # exception class name, e.g. "ValueError" + message: str # str(exc) + traceback: str # traceback.format_exc() + +class TaskExhaustedRetriesErrorDict(TypedDict): + type: Literal["exhausted_retries"] + attempts: int # number of attempts made (>= max_attempts) + last_error: str + last_error_type: str + traceback: str +``` + +### 5.7 `RetryPolicy` + +```python +class RetryPolicy: + initial_delay: timedelta + backoff_coefficient: float + max_delay: timedelta + max_attempts: int + retry_on: tuple[type[BaseException], ...] | None + jitter: bool + + def __init__( + self, *, + initial_delay: timedelta = timedelta(seconds=1), + backoff_coefficient: float = 2.0, + max_delay: timedelta = timedelta(seconds=60), + max_attempts: int = 3, + retry_on: tuple[type[BaseException], ...] | None = None, + jitter: bool = True, + ) -> None: ... +``` + +Presets: `exponential_backoff(...)`, `fixed_delay(delay, ...)`, +`linear_backoff(...)`, `no_retry()`. + +### 5.8 `TaskMetadata` and `JSONValue` + +```python +JSONValue = Union[ + str, int, float, bool, None, + list[JSONValue], + dict[str, JSONValue], +] + +class TaskMetadata: + def __getitem__(self, key: str) -> JSONValue: ... + def __setitem__(self, key: str, value: JSONValue) -> None: ... + def __delitem__(self, key: str) -> None: ... + def __contains__(self, key: str) -> bool: ... + def __iter__(self) -> Iterator[str]: ... + def get(self, key: str, default: JSONValue = None) -> JSONValue: ... + def __call__(self, namespace: str) -> TaskMetadata: ... # sibling ns + async def flush(self) -> None: ... # at-most-once fence +``` + +### 5.9 `EntryMode` + +```python +EntryMode = Literal["fresh", "resumed", "recovered"] +``` + +--- + +## 6. Patterns + +### 6.1 Multi-turn agent (the common case) + +```python +@multi_turn_task(name="session_agent") +async def session_agent(ctx: TaskContext[dict]) -> dict: + # ctx.entry_mode is "fresh" on the first turn, "resumed" on + # subsequent turns of this conversation. + history = ctx.metadata.get("history", []) + user_msg = ctx.input["message"] + history.append({"role": "user", "content": user_msg}) + + reply = await llm.chat(history) + + history.append({"role": "assistant", "content": reply}) + ctx.metadata["history"] = history + return {"reply": reply, "turn": ctx.metadata.get("turn", 0) + 1} + +# Turn 1. +r1 = await session_agent.run(task_id="conv-A", input={"message": "hi"}) + +# Turn 2 — same task_id resumes the chain; history is preserved. +r2 = await session_agent.run(task_id="conv-A", input={"message": "what time is it?"}) +``` + +### 6.2 At-most-once side effects across crashes + +```python +@task(name="charge_card") +async def charge_card(ctx: TaskContext[dict]) -> str: + # Survive recovery: if we already charged in a prior lifetime, + # don't double-charge. + if ctx.metadata.get("charge_done"): + return ctx.metadata["charge_receipt"] + + # Reserve a dedup token before the side effect, flush, then act. + ctx.metadata["pending_charge_token"] = generate_uuid() + await ctx.metadata.flush() + + receipt = await payment_gateway.charge( + ctx.input["card"], + ctx.input["amount"], + idempotency_key=ctx.metadata["pending_charge_token"], + ) + + ctx.metadata["charge_done"] = True + ctx.metadata["charge_receipt"] = receipt + return receipt +``` + +### 6.3 Steering — interruptible long turn + +```python +@multi_turn_task(name="thinker", steerable=True) +async def thinker(ctx: TaskContext[dict]) -> dict: + partial = [] + async for chunk in slow_llm_stream(ctx.input): + if ctx.cancel.is_set(): + # User changed their mind — surface what we have and bow out. + return {"interrupted": True, "partial": "".join(partial)} + partial.append(chunk) + return {"reply": "".join(partial)} + +# Turn 1 starts a slow generation. +r1 = asyncio.create_task(thinker.start(task_id="t1", input={"msg": "long question"})) +# 50 ms later the user pivots. +await asyncio.sleep(0.05) +r2 = asyncio.create_task(thinker.start(task_id="t1", input={"msg": "shorter question"})) +# r1.result() resolves with {"interrupted": True, ...}; r2 with the answer. +``` + +### 6.4 Graceful shutdown — `exit_for_recovery` + +```python +@multi_turn_task(name="long_runner") +async def long_runner(ctx: TaskContext[dict]) -> dict: + for step in plan(ctx.input): + if ctx.shutdown.is_set(): + # Container is going down; defer to the next lifetime. + await ctx.exit_for_recovery() # raises TaskDeferred upstream + await do(step) + return {"done": True} +``` + +The caller awaiting `await run.result()` sees `TaskDeferred`. The +task record stays `in_progress`; the next lifetime's recovery +scanner re-invokes the handler with the same `ctx.input` and +`entry_mode="recovered"`. + +### 6.5 Late-join an in-flight run + +```python +# Caller A launched the work… +run_a = await chat.start(task_id="conv-9", input_id="i1", input={"msg": "hi"}) + +# … but caller B (different coroutine / different request) wants to +# attach to the same in-flight turn: +run_b = await chat.get_active_run("conv-9", "i1") +if run_b is not None: + output = await run_b # same Output that A sees +``` + +`get_active_run` returns `None` when the chain isn't in-flight for +that exact `(task_id, input_id)` — no retrospective attach to a +terminated turn. + +### 6.6 Optimistic concurrency on the input queue + +```python +prev_input_id = "msg-7" # what the caller thinks the chain last accepted + +try: + await chat.run( + task_id="conv-2", + input_id="msg-8", + input={"msg": "next"}, + if_last_input_id=prev_input_id, + ) +except LastInputIdPreconditionFailed as exc: + # Concurrent caller advanced the chain to exc.actual_last_input_id; + # re-fetch UI state and try again. + ... +``` + +--- + +## 7. Operational notes + +- **Heartbeats / lease.** The framework holds a lease on the + task record while the handler runs and renews it automatically. + If the process dies, the lease expires and the recovery scanner + reclaims the record on a future process startup. +- **Recovery is from the persisted input.** A recovered handler is + invoked with the same `ctx.input` the lost lifetime saw — not + with any new input the caller may now be passing. (A caller's + new `.start` against an in-flight record with an expired lease + follows the normal lifecycle: rejected for one-shot / + non-steerable, queued for `steerable=True` multi-turn.) +- **Structured failure logs.** Every handler raise emits an + ERROR-level event named `durable_task_handler_failure` with + `task_id`, `input_id`, `error_type`, `error_message` fields — + visible in your observability pipeline whether or not your caller + awaited the failed `.result()`. +- **Storage backends.** The same primitive runs against the hosted + task store and against a local file-backed store for development + and tests. +- **Streaming** is a separate primitive in + `azure.ai.agentserver.core.streaming` — `await streams.get_or_create(invocation_id)` + gives the handler a stream handle. `TaskRun` itself is not + iterable. + +--- + +## 8. What This Is NOT + +- **Not a deterministic-replay framework.** The handler is re-invoked + from the top on recovery; the framework does not record and + replay every effect. Determinism across re-invocations is the + handler's responsibility — use `ctx.metadata` watermarks for + at-most-once patterns (see §6.2). +- **Not a workflow engine.** No fan-out / fan-in, no child-workflow + orchestration, no first-class signals or timers. If you need + those, use Temporal / Durable Functions and wrap durable tasks + inside them. +- **Not a bulk data store.** `ctx.metadata` is intentionally small + and JSON-only. Persist conversation history, LLM outputs, and + big checkpoints through your own storage (LangGraph SqliteSaver, + your own DB). Use metadata only for small watermarks and dedup + tokens. +- **Not a queue.** A `task_id` identifies one logical unit of + work. If you want competing consumers off a shared queue, use a + different primitive. + +--- + +## Quick FAQ + +**Q. How do I do "fire and forget"?** +A. `await task_fn.start(input=...)` — the call returns a `TaskRun` +handle as soon as the work is registered. You can drop the handle +and the task runs durably; the next caller can attach via +`get_active_run(task_id)` if they care about the outcome. + +**Q. Can two callers run the same `task_id` concurrently?** +A. No — `task_id` is the identity. The second caller either attaches +to the first's in-flight run (one-shot via the lifecycle merge), +gets queued (multi-turn `steerable=True`), or sees `TaskConflictError`. + +**Q. Does the framework retry by default?** +A. No. Pass `retry=RetryPolicy(...)` to opt in. + +**Q. Can I store conversation history in `ctx.metadata`?** +A. Small histories fit, but `metadata` is intentionally small and +JSON-only. Use a dedicated checkpointer (LangGraph SqliteSaver, +your own DB, etc.) for large multi-turn state, and keep `metadata` +to small watermarks and dedup tokens. + +**Q. What if my handler ignores `ctx.cancel`?** +A. Cooperative cancel is a request; nothing forces the handler to +stop. If your handler must be interruptible, check +`ctx.cancel.is_set()` in your loop. `MultiTurnTask.delete(task_id)` +is the only call that force-cancels: it sets the cancel event AND +hard-cancels the underlying asyncio task so a non-cooperating +handler still exits. + +**Q. How do I inspect a task's persisted state from outside the handler?** +A. Consult the task manager's provider directly: +`await manager.provider.get(task_id)` returns a `TaskInfo` snapshot. +The decorator's public surface intentionally doesn't expose a +`.get()` method — read paths go through the provider so the public +decorator surface stays small and write-shaped. diff --git a/sdk/agentserver/azure-ai-agentserver-core/docs/streaming-guide.md b/sdk/agentserver/azure-ai-agentserver-core/docs/streaming-guide.md new file mode 100644 index 000000000000..6d8cd0adf99e --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-core/docs/streaming-guide.md @@ -0,0 +1,520 @@ +# Streaming guide — `azure.ai.agentserver.core.streaming` + +This package gives you one way to **emit events from one coroutine +and receive them from one or more other coroutines** — typically: +your `@task` handler produces events, and your HTTP layer fans them +out to a Server-Sent-Events / WebSocket / long-poll endpoint. + +You pick a backing once at app startup, then everywhere else you +look streams up by id and call `emit` / `subscribe`. + +--- + +## 5-minute getting started + +```python +from azure.ai.agentserver.core.streaming import streams + +# 1. At app startup — pick a backing. +streams.use_in_memory_replay(cursor_fn=lambda ev: ev["n"], ttl_seconds=600) + +# 2. The producer (e.g. your @task handler): +async def produce(stream_id: str) -> None: + stream = await streams.get_or_create(stream_id) + try: + for n in range(5): + await stream.emit({"n": n, "msg": f"hello {n}"}) + finally: + await stream.close() + +# 3. The subscriber (e.g. your HTTP handler) — attach BEFORE the +# producer starts (see §Subscribing for why): +async def consume(stream_id: str) -> None: + stream = await streams.get_or_create(stream_id) + async for event in stream.subscribe(): + print(event) + # Loop terminates cleanly when the producer calls close(). +``` + +`streams.get_or_create(id)` is idempotent: the producer and the +subscriber both call it with the same id and get the **same** +`EventStream` instance back. + +--- + +## Public surface + +Six exports, total: + +```python +from azure.ai.agentserver.core.streaming import ( + streams, # the process-level registry singleton + EventStream, # @runtime_checkable Protocol + EventStreamError, # base exception (catch-all) + EventStreamClosedError, # emit on a closed stream + EventStreamNotFoundError, # any op on an id that isn't currently a live stream +) +``` + +That's it. Obtain stream instances from the registry and program +against the `EventStream` Protocol. + +--- + +## Choosing a backing + +| Backing | Use when | Reconnect / replay? | Survives process restart? | Notes | +|---|---|---|---|---| +| `use_in_memory_live()` (default) | Single subscriber that attaches before the producer; lowest memory; you don't need late subscribers to catch up. | No — late subscribers miss earlier events. | No. | Constant memory: only the subscriber list, no event buffer. | +| `use_in_memory_replay(...)` | Multiple subscribers that may attach at different times; client may reconnect within `ttl_seconds`. | Yes (within the per-event TTL window). | No. | Each event is retained until its TTL elapses (or `delete` runs). | +| `use_file_backed_replay(...)` | Long-running turns where you need to survive a process crash and a fresh worker resuming the same turn. | Yes. | Yes — events are persisted to `storage_dir / f"{id}.jsonl"` and rehydrated on the next `get_or_create(id)`. | Single-writer-per-file enforced. | + +**Call a configurator before you create any streams** (typically +once at app startup). Later calls only affect streams created +after the call — streams already in the registry keep their original +backing. Switching mid-process is supported but discouraged. + +### Configurator signatures + +```python +streams.use_in_memory_live() -> None + +streams.use_in_memory_replay( + *, + cursor_fn: Callable[[Any], int] | None = None, + ttl_seconds: float | None = None, +) -> None + +streams.use_file_backed_replay( + *, + storage_dir: Path, + cursor_fn: Callable[[Any], int] | None = None, + ttl_seconds: float | None = None, + serializer: Callable[[Any], bytes] | None = None, + deserializer: Callable[[bytes], Any] | None = None, +) -> None +``` + +- **`cursor_fn`** — pass this if you want cursored re-subscription + (`subscribe(after=N)`) and a usable `last_cursor()`. It receives + each payload and returns an `int` you choose as its cursor (a + monotonically increasing sequence number is typical). Without it, + `subscribe(after=...)` is silently ignored and `last_cursor()` + always returns `None`. +- **`ttl_seconds`** — per-event retention. Each emitted event becomes + evictable `ttl_seconds` after its emit time, regardless of whether + the stream is still active. Use this to bound memory / disk usage. + Once the stream is closed AND its last retained event has expired + AND at least one event was ever emitted, the stream itself + transitions to "destroyed" (see §Lifecycle). A stream that was + created and closed without ever emitting stays in CLOSED forever + (or until `streams.delete(id)`). +- **`storage_dir`** (file-backed only) — directory that holds one + `.jsonl` file per stream. Created if it doesn't exist. +- **`serializer` / `deserializer`** (file-backed only) — bring your + own codec for non-JSON-serializable payloads. Defaults assume the + payload is JSON-serializable. + +--- + +## The stream id + +A stream id is the identity of a single producer/consumer +conversation. Pick the per-turn identifier from your framework: + +| Context | Use as id | +|---|---| +| Inside `azure-ai-agentserver-invocations` | `request.state.invocation_id` (HTTP layer); `ctx.input["invocation_id"]` (handler) | +| Inside `azure-ai-agentserver-responses` | `response_id` | +| Bare-Python / custom | Any per-turn `str` you control end-to-end | + +**Do NOT use a durable `task_id` as the stream id.** A durable task +can span multiple turns (steering, recovery). Reusing the id across +turns means the second turn finds the previous turn's already-closed +stream and `emit` raises `EventStreamClosedError`. Always scope the +id to one logical request/turn/invocation. + +**File-backed backing only:** because the file-backed backing maps +the id directly to `/.jsonl`, the id must be safe +for use as a single filename — no path separators, no characters +your filesystem rejects, ideally short. The framework-provided +`invocation_id` / `response_id` values already satisfy this; if you +mint your own id, sanitize it. + +--- + +## The `EventStream` Protocol + +Every stream — regardless of backing — exposes the same four +methods: + +```python +class EventStream(Protocol): + async def emit(self, payload: Any, *, close: bool = False) -> None: ... + async def close(self) -> None: ... + def subscribe(self, *, after: int | None = None) -> AsyncIterator[Any]: ... + async def last_cursor(self) -> int | None: ... +``` + +### `emit(payload, *, close=False)` + +Publishes one event to every currently-attached subscriber. + +- `payload` is yours — pass any value compatible with your + serializer. For file-backed replay the default expects JSON- + serializable values. +- `close=True` is an **atomic emit-and-close**: the payload is + delivered + the stream is closed in one step, with no opportunity + to emit again in between. For replay backings, the payload is + still retained in history and a late subscriber can see it; for + the live backing, late subscribers see neither the payload nor any + earlier events. +- Raises `EventStreamClosedError` if you call `emit` after `close`. + This means a producer bug (you should not be emitting any more); + HTTP layers should treat this as `5xx`, not a client error. +- Raises `EventStreamNotFoundError` if the stream has been destroyed. + +### `close()` + +Marks the stream done. Idempotent — calling it twice (or on a +destroyed stream) is a no-op, never raises. After `close()`: + +- New `emit` calls raise `EventStreamClosedError`. +- Existing subscriber iterators drain any in-flight events, then + exit cleanly with `StopAsyncIteration`. +- New `subscribe` calls still work as long as the stream hasn't yet + been destroyed (for replay backings, they will see the retained + history). + +### `subscribe(*, after=None)` + +Returns an **async iterator** over emitted payloads. **Not** a +coroutine — call it WITHOUT `await`, use directly in `async for`: + +```python +async for event in stream.subscribe(): + handle(event) +``` + +The iterator terminates cleanly with `StopAsyncIteration` when the +stream is closed (after draining any in-flight events) **or** when +the stream is destroyed while you are iterating (whether by +`streams.delete(id)` or by the auto-transition described in +§Lifecycle). `subscribe()` itself raises `EventStreamNotFoundError` +synchronously only if the stream is already destroyed at the time +you call it. + +`after=N` is the **reconnection primitive** — only yield events +whose cursor is strictly greater than `N`. Requires the active +backing to have a `cursor_fn`; silently ignored otherwise. See +§Recovery & resumption. + +Multiple subscribers are supported; each gets its own independent +queue. + +### `last_cursor()` + +Returns the highest cursor value seen so far, or `None` if no +events were emitted, or `None` if the active backing has no +`cursor_fn`. After the stream is closed, this is the last cursor +the backing saw — even if that event has since expired from +replay. Raises `EventStreamNotFoundError` if the stream is destroyed. + +`last_cursor()` is the producer's recovery primitive: a recovering +handler reads it to learn "what cursor should I assign to my next +emit?". + +--- + +## Lifecycle: ACTIVE → CLOSED → (destroyed) + +Each stream is **ACTIVE** or **CLOSED**. After CLOSED, the id may +be destroyed; once destroyed, every operation against it raises +`EventStreamNotFoundError`. + +| State | What it means | How you reach it | +|---|---|---| +| **ACTIVE** | Open to `emit`. Subscribable. | Construction (first `get_or_create(id)`). | +| **CLOSED** | No new emits (`emit` raises `EventStreamClosedError`). Existing subscribers drain. New subscribers can still attach (replay backings) but no new events arrive. | `close()` from ACTIVE. | + +Three independent paths into destroyed: + +- the id was **never registered** (no `get_or_create(id)` for it ever ran); +- the id was **explicitly `streams.delete(id)`**d; +- the id's stream was **Closed** and its close-clock TTL + (`close_time + ttl_seconds`) **elapsed** — only applies to replay + backings constructed with `ttl_seconds`. + +A few practical implications: + +- The live backing (`use_in_memory_live`) never auto-destroys — it + has no TTL machinery. Call `streams.delete(id)` explicitly if you + need to release the id. +- After `close_time + ttl_seconds`, the id is destroyed — regardless + of whether anyone is still subscribed or any retained events are + still in the buffer. +- `last_cursor()` is safe to call during the close window — a + recovering handler can always read the last cursor it had seen + before close. + +--- + +## The registry + +```python +streams.get(id) -> EventStream # raises NotFound for any id that is not currently live +streams.get_or_create(id) -> EventStream # idempotent +streams.delete(id) -> None # idempotent +``` + +- `get(id)` returns the registered stream, or raises + `EventStreamNotFoundError`. Treat any `NotFound` uniformly: + "this id is not a live stream; subscribe to a new id or treat as + missing". +- `get_or_create(id)` is idempotent — every caller using the same + id gets the same `EventStream` instance, even from concurrent + coroutines. If the id was previously destroyed, a fresh stream is + created. +- `delete(id)` removes the stream and any backing resources (including + the on-disk log for file-backed replay). Idempotent — safe to call + on an unknown or already-deleted id. + +You typically do not need to call `delete(id)` for replay backings +with `ttl_seconds` configured — the close-clock auto-destroy +cleans up for you. Call `delete(id)` explicitly when you want +immediate cleanup (end-of-request hook, test teardown) or for +backings without `ttl_seconds`. + +--- + +## Exceptions → wire mapping + +```text +EventStreamError (base — catch-all) +├── EventStreamClosedError producer bug — wire-map to HTTP 5xx +└── EventStreamNotFoundError id is not currently a live stream — HTTP 404 +``` + +Every "this id is not currently a live stream" condition raises +`EventStreamNotFoundError` (HTTP 404). Treat it uniformly: +subscribe to a new id, or render the id as missing. + +--- + +## Subscribing — the subscribe-before-start rule + +For the **default live backing** (`use_in_memory_live`), subscribers +only see events emitted after they attach. With the live backing +"attach" means **`async for` over the iterator has begun (i.e. +`__aiter__` has run)** — not merely that you've called +`get_or_create` or `subscribe`. So just calling +`asyncio.create_task(_serve_sse(stream))` does not guarantee the SSE +task has actually begun iterating before your producer starts +emitting — there is a race. + +Safe options: + +1. **Use a replay backing** (`use_in_memory_replay` or + `use_file_backed_replay`). Late subscribers catch up via the + retained history, so the race doesn't matter. This is the + recommended default for HTTP layers. +2. **Drive iteration before starting the producer.** Spawn the SSE + task, then `await asyncio.sleep(0)` (or any explicit signal from + the SSE task that it has started its `async for`) before calling + `task.start(...)`. This is harder to get right than option 1; we + recommend option 1 unless you have a strong reason to avoid + buffering. + +Once you've picked your strategy, the canonical pattern is: + +1. HTTP layer reads the per-turn id from the request. +2. HTTP layer calls `await streams.get_or_create(id)` and arranges + for a subscriber to be attached (per the strategy above). +3. HTTP layer starts the producer (e.g. `await task.start(...)`) + with the id propagated via input. +4. Producer also calls `await streams.get_or_create(id)` and gets + the same instance. + +```python +# At startup (option 1 — recommended): +streams.use_in_memory_replay(cursor_fn=lambda ev: ev["n"], ttl_seconds=600) + +# HTTP layer +async def handle_request(request): + inv_id = request.state.invocation_id + + stream = await streams.get_or_create(inv_id) # 1 + 2 + sse = asyncio.create_task(_serve_sse(stream)) # safe: replay backing + + await my_task.start( + task_id=..., + input={"invocation_id": inv_id, ...}, # 3 + ) + return StreamingResponse(...) + +# Handler +@task +async def my_task(ctx): + inv_id = ctx.input["invocation_id"] + stream = await streams.get_or_create(inv_id) # 4 — same instance + await stream.emit({"event": "hello"}) +``` + +--- + +## Recovery & resumption + +### Cursored reconnect (client side) + +If your subscriber drops (network blip, client refresh) and your +backing has a `cursor_fn`, the client reconnects with the last +cursor it saw and the SDK only re-delivers later events: + +```python +# Client reconnects with Last-Event-ID: 42 +stream = await streams.get_or_create(stream_id) +async for event in stream.subscribe(after=42): + push_to_client(event) +``` + +Events with cursor ≤ 42 are skipped from the retained history; +delivery resumes at 43. + +### Crash-recoverable producer (file-backed) + +With `use_file_backed_replay`, a fresh process resuming the same +turn rehydrates the stream automatically: + +```python +from azure.ai.agentserver.core.streaming import ( + streams, EventStreamNotFoundError, +) + +streams.use_file_backed_replay( + storage_dir=Path("/var/lib/myapp/streams"), + cursor_fn=lambda ev: ev["n"], + ttl_seconds=3600, +) + +@task +async def producer(ctx): + inv_id = ctx.input["invocation_id"] + stream = await streams.get_or_create(inv_id) + try: + # On crash recovery this is the highest n that made it to disk. + last = await stream.last_cursor() + except EventStreamNotFoundError: + # The previous run closed the stream AND every persisted event + # has since expired. The on-disk log is stale; drop it and start + # fresh. delete() removes the file and records the deletion; + # the next get_or_create() then mints a brand-new stream. + await streams.delete(inv_id) + stream = await streams.get_or_create(inv_id) + last = None + + next_n = (last + 1) if last is not None else 0 + for n in range(next_n, total): + await stream.emit({"n": n, "msg": ...}) + await stream.close() +``` + +The typical recovery scenario — process crashed mid-stream, no +terminal marker on disk — is handled by the first branch: +rehydration loads the persisted events, `last_cursor()` returns the +highest cursor, and the handler resumes emitting from the next +cursor. + +The `EventStreamNotFoundError` branch handles the edge case where the +previous run completed cleanly (wrote a close marker to disk) AND +every persisted event has since expired AND your application policy +is "start over with a fresh stream". Without the explicit +`delete(id)`, the next `get_or_create(id)` would re-hand-back the +same expired stream. `delete(id)` lets you mint a fresh one. + +### Don't double-track in `@task` metadata + +Anti-pattern: + +```python +# Don't do this. +await stream.emit({"n": n, ...}) +ctx.metadata.set("last_event_n", n) +await ctx.metadata.flush() +``` + +The stream already persisted the event; `last_cursor()` will return +`n` for you. `ctx.metadata` is for **workflow** watermarks — which +units of side-effecting work (LLM calls, tool invocations) you've +already completed — not for mirroring stream state. + +--- + +## HTTP / SSE bridging pattern + +Typical helper for serving a stream over Server-Sent-Events: + +```python +import json + +from azure.ai.agentserver.core.streaming import EventStreamNotFoundError + +async def _serve_sse(stream): + """Bridge an EventStream to an SSE wire format.""" + last_seen: int | None = None + try: + async for event in stream.subscribe(): + cursor = event.get("n") + yield f"id: {cursor}\ndata: {json.dumps(event)}\n\n".encode() + last_seen = cursor + except EventStreamNotFoundError: + # Server-side cleanup ran while we were attached; tell the + # client we're done. + yield b"event: gone\ndata: {}\n\n" +``` + +If your client sends `Last-Event-ID`, pass it through to +`stream.subscribe(after=int(last_event_id))` to skip already-delivered +events. + +--- + +## Bringing your own `EventStream` implementation + +You can write your own `EventStream` Protocol impl (e.g. a Redis- +backed stream). It will be accepted anywhere the Protocol is — the +`@runtime_checkable` decorator on the Protocol means +`isinstance(s, EventStream)` works. + +**But** don't register your custom impl with the SDK `streams` +registry — its cleanup is wired to the bundled backings only. Ship +your own peer registry instead, and let consumers pick which one +to call: + +```python +class _MyRedisStreams: + """Peer namespace to the SDK ``streams`` registry.""" + def __init__(self, *, redis_url, **opts): ... + async def get(self, id: str) -> EventStream: ... + async def get_or_create(self, id: str) -> EventStream: ... + async def delete(self, id: str) -> None: ... + +my_redis_streams = _MyRedisStreams(redis_url="...") +``` + +Consumers explicitly choose which registry they want: +`await my_redis_streams.get_or_create(id)` vs +`await streams.get_or_create(id)`. The shared interface is the +`EventStream` Protocol; lifecycle is each registry's own concern. + +--- + +## See also + +- [`durable-task-guide.md`](./durable-task-guide.md) — `@task` developer + guide; Pattern E shows the streaming integration end-to-end. +- `samples/durable_streaming/durable_streaming.py` (in this package) + — minimal standalone sample. +- `azure-ai-agentserver-invocations/samples/durable_research/`, + `durable_langgraph/`, `durable_copilot/` — HTTP-server samples + exercising the registry + per-turn `invocation_id` + + subscribe-before-start pattern end-to-end. diff --git a/sdk/agentserver/azure-ai-agentserver-core/docs/task-and-streaming-spec.md b/sdk/agentserver/azure-ai-agentserver-core/docs/task-and-streaming-spec.md new file mode 100644 index 000000000000..6314e2124ab3 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-core/docs/task-and-streaming-spec.md @@ -0,0 +1,4355 @@ +# Durable Task & Streaming Primitives — Design Specification + +**Status:** Authoritative, source-of-truth specification. +**Scope:** The **`@task` durable-task primitive** and the **`streams` +streaming primitive** in `azure-ai-agentserver-core` — i.e. +everything that ships under `azure.ai.agentserver.core.durable.*` +and `azure.ai.agentserver.core.streaming.*`. NOT a spec for the +rest of the core package (the hosting foundation, middleware, +logging, tracing, server-side ASGI plumbing, etc. are outside +this document's scope). +**Audience:** Implementers building or maintaining these two +primitives in any language (Python, .NET, …), and contributors +modifying the canonical Python implementation. Treat this document +as the only doc a re-implementer needs. +**Out of scope:** Everything else in `azure-ai-agentserver-core` +beyond the two named primitives. The `azure-ai-agentserver-responses` +and `azure-ai-agentserver-invocations` packages. Response-event-stream +wire shapes. HTTP route plumbing for response APIs. The platform +itself. + +This document is the authoritative single source of truth for the +two primitives in scope. + +It **references** the *Foundry Task Storage Protocol Specification* +as the authoritative description of the hosted task store's HTTP +contract (routes, request/response envelopes, server-side merge +rules, authentication, activation, ETag/CAS, error codes). Where +this spec talks about wire shape, the framework MUST conform to +that protocol spec; this spec only describes **how the framework +uses** the store, plus the framework-reserved keys / conventions +it layers on top. + +--- + +## Table of contents + +### Part I — Orientation +- §1. Purpose and design goals +- §2. Non-goals +- §3. Architecture overview +- §4. Glossary (forward-reference) + +### Part II — Programming model (developer-facing concepts) +- §5. The durable task primitive +- §6. Lifecycle and entry mode +- §7. Identity (`task_id`, `agent_name`, `session_id`, lease owner) +- §8. Inputs, outputs, and per-input size limit +- §9. Persistence ownership (framework vs developer) +- §10. Crash recovery +- §11. Suspend, resume, and multi-turn +- §12. Steering primitive +- §13. Cancellation and cause booleans +- §14. Timeout (per-turn, cooperative) +- §15. Retry +- §16. Shutdown and `exit_for_recovery` +- §17. Metadata namespaces + +### Part III — Storage contract (wire-level) +- §18. Reference to the Foundry Task Storage Protocol +- §19. The framework's view of the task record +- §20. Framework-reserved payload keys +- §21. Framework-reserved tag and source values +- §22. Lease structure and ownership semantics (+ §22.1 lease write rules) +- §23. Attachments and input promotion (+ §23.9 key validation, §23.10 clear-all) +- §24. Status state machine (+ §24.1 transition matrix, §24.2 terminal immutability, §24.3 delete force semantics) +- §25. ETag (optimistic concurrency) usage +- §26. Recovery — internal lifecycle (no public HTTP endpoint) + +### Part IV — Provider abstraction (storage backends) +- §27. `TaskProvider` interface +- §28. Hosted provider (HTTP) +- §28a. Field validation (shared between providers) +- §29. Local provider (file-backed) +- §30. Provider auto-selection +- §31. Background loops +- §31a. List filter parity (internal `list()`) + +### Part V — Public API surface (language-agnostic) +- §32. `task` and `multi_turn_task` decorators +- §33. `Task` (one-shot) and `MultiTurnTask` (multi-turn) handles +- §34. `TaskContext` +- §35. `TaskRun` +- §35a. Read-only inspection (internal — via the task manager's provider) +- §36. `TaskRun.result()` returns `Output` directly +- §37. `TaskMetadata` +- §38. `RetryPolicy` +- §39. Error taxonomy + +### Part VI — Streaming primitive (peer subpackage) +- §40. Why streaming is decoupled from `@task` +- §41. `EventStream` protocol +- §42. The `streams` registry +- §43. Stream lifecycle states (Active ↔ Closed; registry tombstones) +- §44. Concrete backings (live, replay, file-backed) +- §45. Cursor and `subscribe(after=...)` +- §46. TTL eviction and the close-clock (replay backings) +- §47. Streaming error taxonomy +- §48. Third-party stream-impl pattern + +### Part VII — Implementation guidance (algorithms) +- §49. Cold-start sequence +- §50. `.start()` lifecycle resolution +- §51. Steering append (atomic) +- §52. Steering drain (two-phase) +- §53. Suspend write +- §54. Recovery + reclaim +- §55. Periodic recovery loop +- §56. Lease renewal loop +- §57. Per-turn watchdog +- §58. Orphan attachment cleanup + +### Part VIII — Conformance items +- §59. Conformance items (C-1 … C-N) + +### Part IX — References +- §60. References + +### Part X — Appendices (informative) +- §A. Language-mapping cheat sheet +- §B. Representative full task record +- §C. Steering sequence (append → cancel → drain → result) +- §D. Cold-start recovery sequence + +--- +## Part I — Orientation + +### §1. Purpose and design goals + +The durable-task primitive turns a single async agent function into a +**crash-resilient, steerable, long-running** unit of work backed by a +durable task store. It exists to close the gap between: + +- **What the platform sees.** A unit of work it can place, restart, + liveness-check, and reclaim. +- **What the application owns.** A plain function the developer writes + once, that survives container crashes, OOM kills, redeployments, and + cooperative cancellation without hand-rolling lease, heartbeat, + checkpoint, recovery, or steering plumbing. + +The streaming primitive (`azure.ai.agentserver.core.streaming`) is a +**peer** to the durable primitive — it does *not* nest under +`@task`. It exists to give every async producer/consumer pair in the +agentserver family a single Protocol to program against (in-memory live +fan-out, in-memory replay with cursor, file-backed crash-recoverable +replay), independent of whether the producer happens to be a `@task`. + +Five design goals constrain every decision in this document: + +1. **Single invariant for the durable primitive.** For any given + `task_id`, at most one handler runs at a time. Every other behavior + falls out of this invariant. +2. **Crash-recovery is first-class, not a feature.** Every API + decision is evaluated against the question "what does this look + like after a crash?" A primitive that disappears at the crash + boundary (a per-call kwarg, an in-memory listener, a closure-only + state) is not acceptable; it must be reified into the durable + record or it must be on the developer. +3. **Cooperative everywhere.** The framework signals; it does not + preempt. Cancellation, timeout, and steering all reduce to "set + `ctx.cancel`; let the handler decide the terminal shape." Forced + teardown belongs to the platform layer, not the primitive. +4. **Storage shape is the public contract.** The framework writes a + structured task record. The shape of that record (which + payload keys are reserved, what attachments look like, what tags + are stamped) is part of the spec — implementers in other languages + MUST produce byte-compatible records so a recovery scan from one + process can pick up a task created by another. +5. **Pay only for what you use.** Streaming is decoupled because + handlers that do not stream pay nothing. Attachments are + thresholded because small inputs pay only the inline cost. + Steering is opt-in because non-steerable tasks pay no queue + overhead. + +### §2. Non-goals + +The primitive is intentionally narrow. The following are explicit +non-goals — they will NOT be added to the spec without explicit +re-scoping: + +1. **Not deterministic replay.** No record-and-replay of effects. + After a crash the handler is re-invoked from the top; only + durable state (`ctx.input`, `ctx.metadata`, framework counters) + survives. Determinism inside the handler is the developer's + responsibility — the standard at-most-once side-effect pattern in + §10 covers the common case. +2. **Not a workflow engine.** No fan-out/fan-in, no child workflows, + no signals or timers as first-class primitives. Use Temporal / + Durable Functions / Orleans for that — `@task` can live inside + such an engine but does not replace it. +3. **Not a bulk-data store.** `ctx.metadata` is small (tens of KB + per namespace; the whole task payload caps at 1 MB). It is a + watermark / dedup-token store, not a chat-log store. Per-input + payloads up to 2 MB are accepted via the attachments mechanism + (§23) but anything larger MUST be externalized by the caller. +4. **Not a competing-consumer queue.** A `task_id` identifies one + logical unit of work owned by one current lifetime. N workers + pulling jobs off a shared queue is the wrong fit; use a queue. +5. **Not multi-process streaming.** The streaming primitive's bundled + backings are single-process. A future remote-backed implementation + could plug into the same protocol but is out of scope here. +6. **No exactly-once side-effect guarantee.** The framework provides + at-most-once via a developer-issued dedup token (the at-most-once + pattern). Anything stronger requires external transactionality. +7. **Single wire shape.** The framework reads and writes exactly + the shapes documented in this spec. The primitive is in private + preview; there is no version-skew compatibility to maintain. + +### §3. Architecture overview + +The framework's runtime decomposes into the following components. +Boxes are types/objects; arrows show the dominant call direction. + +``` + ┌──────────────────────────────┐ + │ application code │ + │ (user-written @task funcs) │ + └──────────────┬───────────────┘ + │ decorator registration + ▼ + ┌─────────────┐ .start / ┌─────────────────┐ create / get / + │ caller │ ─ .run ────▶ │ Task (handle) │ ─ update / list ──▶ ┌──────────────┐ + │ (HTTP,etc.) │ ◀─ TaskRun ─ │ │ │ TaskProvider │ + └─────────────┘ Output └─────────┬───────┘ └──────┬───────┘ + │ │ + invokes user fn ┌──────┴──────┐ + │ │ Hosted via │ + ▼ │ HTTP + │ + ┌─────────────────┐ │ classifier │ + │ TaskContext │ └──────┬──────┘ + │ (ctx.input, │ │ + │ ctx.metadata, │ │ + │ ctx.cancel,…) │ ▼ + └────────┬────────┘ ┌──────────────────┐ + │ flush / suspend / │ Foundry Task │ + │ exit_for_recovery │ Storage (HTTP) │ + ▼ └──────────────────┘ + ┌─────────────────┐ ▲ + │ TaskManager │ ──── lease_renewal_loop ──────┤ + │ (singleton) │ ──── periodic_recovery_loop ─┤ + │ │ ──── timeout_watchdog ───────┤ + └─────────────────┘ │ + │ + ┌────────────────────────────────────────┐ │ + │ Local file provider (dev/test only) │ ◀──────┘ + │ (~/.durable-tasks///…) │ + └────────────────────────────────────────┘ + + ┌──────────────────────────────────────────────────────────────────┐ + │ Streaming subpackage (PEER — not nested under @task) │ + │ │ + │ ┌───────────────────┐ get_or_create(id) ┌──────────────┐ │ + │ │ streams registry │ ──────────────────────▶│ EventStream │ │ + │ │ (process-level) │ ◀───────────────────── │ (3 backings)│ │ + │ └───────────────────┘ delete(id) └──────┬───────┘ │ + │ │ │ │ + │ │ emit / subscribe │ + │ ▼ ▼ │ + │ use_in_memory_live() / producers / │ + │ use_in_memory_replay() / consumers │ + │ use_file_backed_replay() │ + └──────────────────────────────────────────────────────────────────┘ +``` + +**Key relationships:** + +- The `Task` handle is the developer-facing object created by the + `@task` decorator; the singleton `TaskManager` is the *runtime* + that owns the active-task table, the periodic recovery loop, and + the provider. +- The `TaskProvider` is an abstraction over the durable store. Two + concrete providers ship: `HostedTaskProvider` (HTTP-backed, used + when the platform is detected) and `LocalFileTaskProvider` + (JSON-on-disk under `~/.durable-tasks///.json` + by default; used otherwise). The framework auto-selects. +- The `TaskContext` is what the handler receives; it is wired by the + manager and exposes both inputs (`input`, `metadata`, `entry_mode`) + and signals (`cancel`, `shutdown`, cause booleans). +- Three background loops run while the manager is up: the periodic + recovery scan (default 300s), one lease-renewal loop per active + task (half the lease duration), and one timeout watchdog per + active execution (when the task declares a timeout). +- The streaming subpackage is independent. Handlers that want to + stream do `await streams.get_or_create(id)` and `emit` / `close` + on the returned object; the HTTP layer attaches `subscribe(after=…)` + consumers. The framework never touches a stream from the durable + path. + +### §4. Glossary (forward-referenced) + +| Term | Meaning | +|---|---| +| **Task** | A unit of durable work, identified by `task_id`, persisted in the task store. | +| **Lifetime** | One contiguous in-memory execution of a task by a particular process. A task can have multiple lifetimes over its life (each crash starts a new lifetime). | +| **Turn** | One handler invocation. A fresh task with no resume/recover is one turn. A suspend/resume cycle is two turns. A steering-driven re-entry is the next turn. | +| **Generation / sequence number** | Monotonic counter inside the steering queue used to derive attachment keys; never reused (see §23). | +| **Lease** | The fenced ownership record on the task. While a process holds the lease, no other lifetime is allowed to run the task. | +| **Entry mode** | The framework's signal to the handler about WHY this turn started: `fresh` (first), `resumed` (after suspend or steering drain), `recovered` (previous lifetime crashed). | +| **Steering** | A new caller `.start()` against an already-running steerable task: the new input is queued, the current turn is cancelled cooperatively, and on the next turn the queued input is consumed. | +| **Attachment** | Per-task secondary storage slot for values larger than a payload-friendly inline threshold (§23). | +| **Ref / attachment ref** | The sentinel value the framework writes into `payload` to indicate "this slot has been promoted to `attachments[]`" (§23.3). | +| **Cause boolean** | A read-only field on `TaskContext` (`timeout_exceeded`, `cancel_requested`) or counter (`pending_input_count`) that explains why `ctx.cancel` was set. | +| **Promotion** | The framework's act of moving an oversized input from inline `payload` into `attachments`, replacing the inline value with a ref (§23). | +| **Drain** | Popping a single steering input off the queue and re-entering the handler with it (§52). | +| **Reclaim** | A different lifetime taking over a task whose lease has expired (§54). | + +--- + + +## Part II — Programming model + +This part is the developer-facing mental model. It is normative for +behavior visible to handler code, but the *wire-level realization* of +each concept lives in Part III. + +### §5. The durable task primitive + +A durable task is created by decorating a single async function: + +``` +@task(name="my_task") # decorator +async def my_task(ctx) -> Out: # exactly one parameter: TaskContext[Input] + return ... +``` + +The decoration registers the function with the process-wide +descriptor table (consulted at recovery time). The returned object — +the *task handle* — is what callers invoke (`.run()` / `.start()`). + +The framework guarantees one invariant: **for a given `task_id`, at +most one handler runs at a time in any process owning the active +lease.** Every higher-level behavior in this spec is derived from +that invariant. + +### §6. Lifecycle and entry mode + +The task store records each task in one of four statuses: + +| Status | Meaning | +|---|---| +| `pending` | Created, not yet picked up by a handler. (Rarely observed by handler code — the framework moves through it atomically.) | +| `in_progress` | A handler is currently executing this task (or claims to be — a stale lease may need to be reclaimed). | +| `suspended` | (Multi-turn only.) Handler's turn ended with `return X`; the chain is parked between turns awaiting the next `.run()` / `.start()` to drive the next turn. | +| `completed` | Terminal. The handler is finished (success, raise, cancel) and will not run again. The *outcome* (success / failure / cancelled) is communicated via the typed exceptions (§39) — **NOT encoded in the status field**. | + +Every time the framework invokes the handler, it computes an entry +mode from the persisted state and exposes it as `ctx.entry_mode`: + +| Persisted state at entry | `entry_mode` | What it means | +|---|---|---| +| No task / status `pending` | `"fresh"` | First invocation. No prior state. | +| `suspended` | `"resumed"` | Caller provided new input; resume from where we suspended. | +| `in_progress` (previous lifetime died) | `"recovered"` | We are the new lifetime; check your watermark. | +| `in_progress` (steerable, mid-flight, steering drain) | `"resumed"` (with `ctx.is_steered_turn = True`) | Another input was queued; we are the next-turn re-entry. | + +The handler is REQUIRED to be safe to enter in any of these modes. +Branching on `ctx.entry_mode` at the top is the canonical pattern. + +`entry_mode` and `is_steered_turn` are orthogonal. The combination +`(entry_mode="recovered", is_steered_turn=True)` is legal: a previous +process crashed mid-drain and the recovered handler is taking over. + +### §7. Identity + +A task is identified by three independent strings: + +| Field | Source | Lifetime | Purpose | +|---|---|---|---| +| `task_id` | Caller-supplied at `.start()` / `.run()`. | Identical across resume / recovery / steering. | The conversation / unit-of-work key. | +| `agent_name` | Platform-supplied (env `FOUNDRY_AGENT_NAME`); fallback `"unknown-agent"`. | Fixed per process. | Scoping; multiple agents share a store. | +| `session_id` | Platform-supplied (env `FOUNDRY_AGENT_SESSION_ID`). | Fixed per process. | Scoping; multiple sessions share an agent. | + +The framework derives the **lease owner** string from both +`agent_name` AND `session_id`: + +``` +lease_owner = "|session:" +``` + +Deriving the owner from BOTH components (not session alone) prevents +silent cross-agent ownership collisions in topologies where two +different agents happen to share a session identifier. + +Each *process* generates a fresh **instance id** at startup: + +``` +lease_instance_id = "worker---" +``` + +The `(owner, instance_id)` pair lets recovery distinguish: + +- **Same-owner same-instance** = my own running task (renew, do not reclaim). +- **Same-owner different-instance** = a previous lifetime of mine that + is gone (reclaim immediately on cold start; no expiry wait). +- **Different-owner** = someone else's task; do not touch. + +#### `task_id` validation + +Implementers MUST reject `task_id` values that: + +- Are empty. +- Exceed 256 characters. +- Contain characters outside `[a-zA-Z0-9\-_.:]`. + +Rejection is at the call site (`.start()` / `.run()` raise) before +any network is touched. + +### §8. Inputs, outputs, and the per-input size limit + +A task carries exactly one **input** value at any time — the value +passed to `.start(input=...)` or `.run(input=...)`. The input is JSON- +serialized for persistence and is re-hydrated into `ctx.input` on +every handler entry (fresh, resumed, recovered). + +The handler's return value (or the value passed to +(the handler's `return X`) is the **output**, also JSON-serialized. + +| Bound | Limit | Raised as | +|---|---|---| +| Per-input maximum size | **2 MB** after JSON serialization, for the function input AND each individual queued steering input. | `InputTooLarge` from `.start()` / `.run()` — pre-network, at the call site. | +| Concurrent queued steering inputs | **9** | `SteeringQueueFull` from `.start()` against a steerable task whose queue is full. | + +Inputs and outputs that fit easily in the inline payload budget stay +inline. Inputs whose JSON size exceeds a per-channel threshold are +**promoted** into the task's `attachments` slot transparently — +developers do not configure or opt in. See §23 for the wire +mechanism; the per-input ceiling above is the only developer-visible +limit. + +The framework uses JSON canonicalization rules (`sort_keys=True`, +separators `(",", ":")`) when computing serialized sizes and content +hashes (§23.6). Implementers MUST use the same canonicalization for +both, or hashes will not match across implementations. + +If the handler's input or output cannot be JSON-serialized (e.g. it +contains non-JSON-native types), the framework raises before the +HTTP call. Implementations using a richer model (Pydantic-style) +SHOULD attempt model-aware serialization (`model_dump`) first. + +### §9. Persistence ownership + +The framework persists: + +- The current `ctx.input` value (inline or as an attachment ref). +- A snapshot of every touched `ctx.metadata` namespace at every + terminal-of-turn boundary (suspend, complete, cancel, raise, + steering drain, `exit_for_recovery`) and at every explicit + `metadata.flush()` call. +- Lifecycle counters: `retry_attempt`, `recovery_count` (the + `expiry_count` of the lease record), `_last_input_id` (the + optional caller-provided chain head — see §11). +- A per-turn `_turn_started_at` ISO-8601 UTC timestamp used by the + watchdog (§14) to compute remaining budget across crashes. +- Steering state (`pending_inputs` queue, `cancel_requested`, + `drain_in_progress`, `active_input`, `next_input_seq`) for + steerable tasks (§12). +- The handler's terminal outcome: a structured `error` dict on + failure (when persisted by the layer above the primitive), + `suspension_reason` on suspend. The handler's `return X` value + is NOT persisted in the record — it resolves the in-process + caller's `TaskRun.result()` future and is then no longer + reachable from the persisted record. + +The framework does NOT persist: + +- Handler-local variables. +- In-memory closures over the handler's body. +- Caller-provided callbacks or futures (those are bound to a single + lifetime; a crash discards them). +- Streaming events (those live in the streaming subpackage, which has + its own backings; see Part VI). +- Any bulk data the developer chooses to compute. The developer is + responsible for that — typically through a sibling framework + (LangGraph checkpoint, custom DB, blob storage) with only a small + reference token in `ctx.metadata`. + +The dividing line is "what does the framework need to decide +`entry_mode` and reproduce `ctx`?" — that is what it persists; nothing +more. + +### §10. Crash recovery + +Recovery is **framework-managed**. There is no developer-tunable +threshold and no opt-in. + +**When recovery happens:** + +1. **Cold start** of a new process. The manager's `startup()` scans + the task store for tasks owned by `(agent_name, session_id)` + whose lease has expired OR whose lease is owned by a different + instance of the same owner (a previous dead lifetime). Each is + reclaimed inline. +2. **Periodic scan.** While the manager is up, a background loop + re-runs the same scan every 300 seconds (default; see §31). This + catches tasks that became reclaimable AFTER cold start — typically + leases that expired during this process's lifetime because a sibling + process died. +3. **Inline reclaim.** When a caller `.start()`s a `task_id` whose + current record shows an `in_progress` status with an expired or + foreign-instance lease, the lifecycle resolver reclaims it inline + (no waiting for the periodic scan). + +**What recovery does:** + +The reclaiming process: + +1. Issues a PATCH that re-takes the lease atomically: new + `lease_owner` (always self), new `lease_instance_id` (always + self), new `lease_expires_at`, bumps the lease's `expiry_count` IF the + previous lease had actually expired (not bumped for same-owner + dead-instance handoff). This PATCH MUST be guarded by the read + `etag` for CAS safety. +2. Reads the (now self-owned) record, looks up the registered + resume callback by `source.name` (§21), invokes the handler + with `ctx.entry_mode="recovered"` and the persisted `ctx.input` + re-hydrated. +3. From the handler's perspective, the recovery looks identical to + a fresh entry except that `entry_mode == "recovered"` and any + `ctx.metadata` writes from the previous lifetime are already + present. + +**Crash-recovery does NOT consume the retry budget** (§15). A +lifetime that died before the handler raised does not advance +`retry_attempt`. + +**Pattern — at-most-once side effect across recovery:** + +```python +if ctx.metadata.get("dedup_token") is None: + token = uuid4().hex + ctx.metadata["dedup_token"] = token + await ctx.metadata.flush() # fence + await do_side_effect(idempotency_key=token) +# crash-recovered lifetimes re-issue the call with the SAME token, +# letting the downstream system de-dupe. +``` + +This pattern is the standard answer to "I crashed mid-effect; how +do I avoid duplicate effects?" The framework does NOT provide +exactly-once semantics — the developer issues the dedup token and +fences it before the effect. + +### §11. Suspend, resume, and multi-turn + +Multi-turn chains end every turn with a bare `return X` from the +handler. The framework treats this **return-is-implicit-suspend**: + +1. Transitions the stored status from `in_progress` to `suspended` + with `suspension_reason="run_completion"`. +2. Persists a snapshot of every touched metadata namespace. +3. Does NOT persist `X` anywhere in the task record. `X` resolves + the caller's `await run.result()` in-process and is then gone. +4. Clears `payload["input"]` (and the corresponding attachment if + the input was promoted) — the consumed input is no longer needed + and would inflate the next payload write. +5. Clears `_steering["active_input"]` (mechanism state lives, but + the consumed input value goes). +6. Clears `payload["_retry_attempt"]` so the next turn starts with + a fresh retry budget. +7. Preserves `payload["_last_input_id"]` so the next + `if_last_input_id` precondition can be evaluated. + +The caller's `await run.result()` resolves to `X` directly (typed +as the handler's `Output`). No wrapper class. + +The next `.run(task_id=same, input=new)` or +`.start(task_id=same, input=new)` transitions the status back to +`in_progress` and re-invokes the handler with +`ctx.entry_mode="resumed"`, `ctx.input=new`, and `ctx.metadata` +re-hydrated. + +The same machinery is what multi-turn conversations and +human-in-the-loop approval flows ride. + +One-shot tasks do NOT use this mechanism. A one-shot `@task` +handler's `return X` is a terminal completion: the framework +resolves the caller's `.result()` with `X` and then deletes the +record (one-shot is always ephemeral). + +#### Multi-turn raise semantics + +If a multi-turn handler RAISES (an unhandled exception other than +`asyncio.CancelledError`), the chain still transitions to +`suspended` (NOT `completed` / `failed`) so subsequent turns can +continue: + +1. Transitions to `suspended` with + `suspension_reason="run_completion"`. +2. NO `payload["error"]` is written — the chain record does not + carry the per-turn failure diagnostic. +3. The framework emits a structured ERROR log named + `durable_task_handler_failure` with `task_id`, `input_id`, + `error_type`, `error_message`. +4. The caller's `await run.result()` raises + `TaskFailed(error=TaskErrorDict(...))`. +5. Queued steerers (multi-turn `steerable=True`) promote per §12: + the next queued input becomes the next turn's input, and the + handler re-invokes with `ctx.entry_mode="resumed"`, + `ctx.is_steered_turn=True`. + +#### Chain identity: `input_id` and `if_last_input_id` + +Both `.run()` and `.start()` accept two optional keyword arguments +that thread caller-supplied chain identity through the persisted +record: + +- **`input_id`** — record-only. The framework writes + `payload["_last_input_id"] = input_id` after accepting the input; + no precondition is checked. +- **`if_last_input_id`** — precondition. The framework requires the + stored `_last_input_id` to equal `if_last_input_id` (the + predecessor the caller claims to be extending). Mismatch raises + `LastInputIdPreconditionFailed(actual_last_input_id=)`. + +For multi-turn, `input_id` is the per-turn identity. For one-shot, +`input_id` defaults to `task_id` (the 1:1 invariant `task_id == +input_id`). + +Implementations MUST reject `if_last_input_id` provided without +`input_id` (`TypeError` at the call site). The pair is orthogonal: +`input_id` alone is idempotency / chain-head tracking; +`(input_id, if_last_input_id)` together is HTTP-`If-Match`-style +chain extension. + +### §12. Steering primitive + +`@multi_turn_task(steerable=True)` upgrades a multi-turn chain from +"one turn at a time" to "callers can queue a new input while a turn +is mid-flight." + +Steering is exclusive to multi-turn chains. One-shot `@task` does +not support steering (the one-shot lifecycle is one input one run); +`@multi_turn_task` without `steerable=True` accepts concurrent +`.start` calls only as `TaskConflictError`. + +#### What `.start()` does on an in-flight steerable chain + +`.start(task_id=, input=NEW)` against an in-flight +steerable chain: + +1. The new input is **queued** at the tail of an internal + pending-inputs FIFO. +2. The cancel signal is raised on the currently-executing turn — + `ctx.cancel.is_set()` becomes True for the handler that is + running right now. `ctx.pending_input_count` flips from 0 to + the live backlog size. +3. A new `TaskRun` handle is returned to the caller. Its + `.result()` resolves with **whatever the next turn emits** — + the caller is the *steerer* of the next turn. + +If the steering queue is at its cap (9), `.start()` raises +`SteeringQueueFull`. + +#### What the first turn's caller sees + +The first turn's caller observes the natural multi-turn outcome of +the in-flight turn: + +| Handler ends turn 1 with... | First caller's `await run.result()` | +|---|---| +| `return X` (clean return) | Resolves with `X` (typed as `Output`). The chain transitions to `suspended` (return-is-implicit-suspend). The framework then promotes the queued steering input as the next turn. | +| `raise SomeError` (non-CancelledError) | Raises `TaskFailed(error=...)`. The chain stays alive in `suspended` with no `payload["error"]` written; the queued steerer is promoted as the next turn. | +| `raise asyncio.CancelledError()` | Raises `TaskCancelled()`. The chain stays alive in `suspended`; the queued steerer is promoted as the next turn. | +| Handler calls `ctx.exit_for_recovery()` (shutdown only) | Raises `TaskDeferred()`. The chain stays `in_progress`; the recovery scanner re-invokes the handler in a future lifetime. The queued steerer remains queued. | + +The handler's `return X` value is delivered **unconditionally** to +the first caller; it is never replaced by what a later turn +produces. + +#### Cooperative cancellation in steering + +`ctx.cancel` is advisory. The framework sets it when a steering +input arrives (alongside the cause counter +`ctx.pending_input_count`), but does not preempt the handler. The +handler decides: + +- **A — Yield immediately.** Check `ctx.cancel.is_set()` (or + `ctx.pending_input_count > 0`) at the next boundary and `return` + with whatever you have. +- **B — Wind down to a safe checkpoint.** Finish the current tool + call / token batch, persist a clean checkpoint, then `return` + with the final value. +- **C — Ignore cancel and finish.** Do not read `ctx.cancel`; let + the handler complete. The chain still transitions to + `suspended` and the queued steerer is promoted as the next + turn. + +#### Steering observability fields + +On a steering-driven re-entry, `TaskContext` exposes: + +- `ctx.is_steered_turn: bool` — `True` iff this turn was + constructed by the steering-drain code path. False for every + other entry path. Orthogonal to `entry_mode`: + `(entry_mode="recovered", is_steered_turn=True)` is legal. +- `ctx.pending_input_count: int` — live count of currently queued + steering inputs. Reads as 0 for non-steerable chains. Useful for + "I am three turns behind, I should short-circuit even harder" + decisions. It is derived from the **in-process observed** steering + state (the property is synchronous — it does NOT issue a store read + per access), and is **failure-tolerant** (any compute failure reads + as 0). It is recorded *before* `ctx.cancel` is set (see §13 ordering + invariant) by both the same-process enqueue and the cross-process + steering poll, and is decremented as the drain consumes inputs, so a + handler that observes `ctx.cancel.is_set()` for a steering cause + already sees `pending_input_count >= 1`. It must be backed by a + settable runtime field (historically it was read from an attribute + that was never storable, so it was stuck at 0). + +#### Force delete + +`MultiTurnTask.delete(task_id)` is the only API that force-removes +a chain. It cancels the in-flight turn (active caller's +`.result()` resolves with `TaskCancelled`), resolves all queued +steerer callers' `.result()` futures with `TaskCancelled`, and +force-deletes the record. Idempotent (no-op on a missing chain). + +### §13. Cancellation and cause booleans + +`ctx.cancel` is a bare event (e.g. `asyncio.Event` in Python). The +framework sets it from multiple causes; a handler observing the bare +event does NOT know *why* it was set. Three independent **cause +booleans** answer the why: + +| Cause | Set when | Reset? | +|---|---|---| +| `ctx.timeout_exceeded: bool` | Per-turn timeout watchdog has fired for this turn. | Never within a turn. | +| `ctx.cancel_requested: bool` | `TaskRun.cancel()` was invoked against this run from external caller code. | Never within a turn. | +| `ctx.pending_input_count: int` (read as a count, not boolean) | Live count of queued steering inputs >= 1. | Decrements as drains consume inputs. | + +**Causes accumulate.** Multiple cause booleans can be `True` +simultaneously (e.g., timeout AND external cancel AND steering). + +**Ordering invariant.** Each cause is set BEFORE the framework sets +`ctx.cancel`. A handler observing `ctx.cancel.is_set() == True` is +guaranteed to see at least one cause already set (cause booleans +or pending_input_count > 0). + +Canonical reaction pattern: + +```python +while not ctx.cancel.is_set(): + await do_a_unit_of_work() +# Branch on cause: +if ctx.timeout_exceeded: + return "(timed out — partial result)" +if ctx.cancel_requested: + raise asyncio.CancelledError() # caller observes TaskCancelled +if ctx.pending_input_count > 0: + return "(pre-empted by queued steering input)" +raise RuntimeError("ctx.cancel set with no recognised cause") +``` + +The handler's choice of terminal shape (`return X` / `raise`) +controls what the caller observes. The framework does NOT pick +the terminal shape on the handler's behalf. For multi-turn, +`return X` is the implicit-suspend boundary (chain stays alive, +caller's `.result()` resolves to `X`); for one-shot, `return X` +ends the run (record is deleted). + +### §14. Timeout (per-turn, cooperative) + +`@task(timeout=...)` is **cooperative-only**. When the budget elapses, +the framework: + +1. Sets `ctx.timeout_exceeded = True`. +2. Sets `ctx.cancel`. +3. Exits the watchdog. + +It does **NOT** force-stop the handler, end the task, or cancel +the lease renewal. An ignoring handler runs until process exit or +external `TaskRun.cancel()`. + +The budget is **per-turn** and **wall-clock**: + +- Each handler turn (fresh entry, suspended-to-resume) gets a + fresh budget. +- A process crash mid-turn does NOT reset the budget. When the + recovered handler enters, the watchdog computes + `remaining = max(0, timeout - (now - turn_started_at))` from the + persisted `_turn_started_at` and fires immediately if elapsed. +- Clock skew is clamped to `[0, timeout]` in both directions. +- **Known gap on steering drain re-entry:** the canonical Python + implementation spawns the watchdog ONCE per `_execute_task` + invocation; steering drain re-enters in-place inside + `_execute_task_loop` without spawning a fresh watchdog. The + steered turn inherits whatever budget remained on the original + watchdog. The persisted `_turn_started_at` IS stamped per drain + (§52 Phase 1), so a CRASH-then-recover from a drained turn + correctly honors the new turn's budget; the in-process drain + path itself does not. Other-language implementers SHOULD spawn + a fresh watchdog per drain to honor the design intent. + +The framework MUST persist `payload["_turn_started_at"]` (ISO-8601 +UTC) at every turn-start boundary: fresh entry, suspended -> in_progress +resume, steering drain re-entry. It is NOT re-stamped on crash +recovery — that is precisely what allows the watchdog to honor the +original budget across crashes. + +### §15. Retry + +`@task(retry=RetryPolicy(...))` and +`@multi_turn_task(retry=RetryPolicy(...))` configure the framework's +retry behavior for handler-raised exceptions. + +`RetryPolicy` parameters: + +| Field | Default | Meaning | +|---|---|---| +| `max_attempts` | `3` | Total failure-retry budget across all lifetimes. Counts the original try. | +| `initial_delay` | `1 second` | Delay before the first retry. | +| `backoff_coefficient` | `2.0` | Multiplier for exponential backoff. | +| `max_delay` | `60 seconds` | Cap on per-retry delay. | +| `jitter` | `True` | Add randomized jitter to delays. | +| `retry_on` | `None` (all exceptions) | Tuple of exception types to retry; others propagate. A bare exception class is accepted as a single-element tuple. | + +Presets: `exponential_backoff()`, `fixed_delay(delay)`, +`linear_backoff()`, `no_retry()`. + +Semantics: + +- **`retry_attempt` is the cross-lifetime counter.** Persisted as + `payload["_retry_attempt"]`. Re-hydrated on every handler entry + via `ctx.retry_attempt`. Increments only when the handler raises + (not on crash). Cleared on every turn-start boundary so each new + turn (multi-turn) or each new run (one-shot) gets a fresh budget. +- **Crash recovery does NOT consume the budget.** A lifetime that + is gone before the handler raised does not advance + `retry_attempt`. The recovered handler sees the same + `ctx.retry_attempt` value the crashed lifetime saw. +- **`return X` bypasses retry.** A handler that returns + (multi-turn = implicit suspend; one-shot = terminal completion) + is not a failure; the retry counter is unaffected. +- When `retry_attempt >= max_attempts`, the framework gives up: + it stops re-invoking, and the awaiting caller observes + `TaskFailed(error=TaskExhaustedRetriesErrorDict(...))` carrying + `attempts`, `last_error`, `last_error_type`, `traceback`. + +#### Interim retry persistence + +Between every failed attempt and the next retry the framework +PATCHes only `payload["_retry_attempt"] = `. NO +`payload["error"]` is written between attempts — the per-turn +failure diagnostic is not projected onto the record. The status +stays `in_progress` throughout. + +When the budget is exhausted (or the exception is non-retryable), +the failure handler runs: + +- **One-shot (`@task`)**: the record is DELETED entirely; nothing + survives on disk. The caller observes `TaskFailed` raised from + `.result()`. +- **Multi-turn (`@multi_turn_task`)**: the chain transitions to + `suspended` with `suspension_reason="run_completion"`; NO + `payload["error"]` is written; queued steerers promote per §12. + The caller of the failing turn observes `TaskFailed` raised + from `.result()`. The chain stays alive — a future + `.run()`/`.start()` against the same `task_id` resumes the + chain with a fresh retry budget. + +The framework emits a structured ERROR log named +`durable_task_handler_failure` on every handler raise (including +non-final attempts). Observers learn "what just failed, which +attempt am I on" from logs, NOT from a persisted `error` field on +the record. + +`TaskFailed.error` is one of two `TypedDict` shapes: + +```python +class TaskErrorDict(TypedDict): + type: str # exception class name, e.g. "ValueError" + message: str # str(exc) + traceback: str # traceback.format_exc() + +class TaskExhaustedRetriesErrorDict(TypedDict): + type: Literal["exhausted_retries"] + attempts: int + last_error: str + last_error_type: str + traceback: str +``` + +Type-checkers can discriminate on the `type` literal. + +### §16. Shutdown and `exit_for_recovery` + +The container can be shut down at any time (deployment, rolling +restart, eviction). The framework sets `ctx.shutdown` when it +receives the shutdown signal. The handler has three legitimate +responses: + +| Shape | When to use | Stored outcome | Caller observes | +|---|---|---|---| +| `await ctx.exit_for_recovery()` | Container shutting down AND you want this turn re-entered later. | `in_progress` (preserved across shutdown). | `TaskDeferred`. | +| `return X` (multi-turn) | Handler reached a clean checkpoint AND wants to expose `X` to the caller. | `suspended` (caller can `.run()` again to drive the next turn). | `X` (typed as `Output`). | +| `raise asyncio.CancelledError()` | Handler decided to abort. | One-shot: record deleted. Multi-turn: chain transitions to `suspended` (stays alive). | `TaskCancelled()`. | + +`ctx.exit_for_recovery()` is the durable-deferral primitive. The +method: + +1. Flushes all touched metadata namespaces. +2. **Releases ownership** of the persisted record so the next + process can take over (force-expires the lease). +3. Leaves status as `in_progress` (NOT `suspended`). +4. Raises `TaskDeferred()` upward — the caller of `.result()` + sees this. Semantically distinct from `TaskCancelled`: the + task is not cancelled; this lifetime is just deferring to the + next. +5. Preserves any queued steering inputs — they are NOT drained + during shutdown; on recovery they remain queued. + +When the recovery scanner re-acquires the deferred task, the +handler re-enters with `ctx.entry_mode="recovered"` and the +persisted `payload["input"]` — exactly as if the lifetime had +crashed. + +Misuse: calling `ctx.exit_for_recovery()` when +`ctx.shutdown.is_set() == False` MUST raise `RuntimeError` at the +call site. This makes misuse loudly visible to operators (the task +ends in error, not silently `in_progress`). + +### §17. Metadata namespaces + +`ctx.metadata` is a **callable namespace facade** for the small, +durable, per-task state the handler owns: + +- `ctx.metadata["key"] = value` — read/write the **default** + namespace, persisted at `payload["metadata"]`. +- `ctx.metadata("session")["upstream_id"] = sid` — read/write a + **named** sibling namespace, persisted at + `payload["metadata:session"]`. + +Each namespace is independent: a write to one does not dirty the +other; `flush()` on one persists only that namespace's data. + +`metadata.flush()` is the fence the developer uses to make +at-most-once side-effect patterns work across a crash. The framework +**auto-flushes** all touched namespaces at every terminal-of-turn +boundary, so writes the developer forgets to flush are still durable +across a graceful boundary. Explicit `flush()` is for mid-handler +fence semantics. + +**Naming convention:** namespaces and top-level metadata keys +starting with `_` are RESERVED for the framework. The primitive +treats this as a convention at the API surface; layers built on top +(e.g. the responses framework's `_responses` namespace) MAY enforce +it more strictly. + +`TaskMetadata` MUST expose dict-like semantics +(`__getitem__`/`__setitem__`/`__contains__`/`__iter__`/`.get()`/`.to_dict()`) +plus: + +- `flush()` — persist this namespace only. +- `increment(key)` — in-memory atomic numeric increment **on the + metadata namespace object** (read/modify/write under an in- + memory lock). The change is NOT pushed to the store until the + next `flush()` / auto-flush. This is NOT a store-level + compare-and-swap; concurrent processes incrementing the same + key would race at the store level. Use for handler-local + counters that get flushed at clean boundaries; for cross- + process atomic counters, use the store's CAS protocol directly + via the provider. +- `append(key, value)` — append to a list-valued key. Same + in-memory semantics as `increment`: atomic within the namespace + object, NOT atomic against the durable record. + +Flush failures are logged, not raised — a failed flush should not +crash a handler. The framework retries on the next flush call or +auto-flush boundary. + +--- + + +## Part III — Storage contract (wire-level) + +This part documents how the framework projects the programming model +onto the durable task record. The HTTP routes, request/response +envelopes, and server-side merge rules themselves are defined by the +*Foundry Task Storage Protocol* specification; this section names which +fields the framework reads/writes and what the framework-reserved +keys mean. + +### §18. Reference to the Foundry Task Storage Protocol + +The hosted task store's transport-level contract — routes +(`POST /tasks`, `GET /tasks`, `GET /tasks/{id}`, `PATCH /tasks/{id}`, +`DELETE /tasks/{id}`), authentication, activation, payload PATCH merge +semantics, attachment PATCH merge semantics, ETag/CAS rules, +classification of 409/412 responses — is specified by +`foundrysdk_specs/specs/hosted-agents/container-spec/docs/foundry-task-storage-protocol-spec.md`. + +This document does **not** restate that contract. Implementers MUST +conform to the protocol spec for any hosted-provider implementation. +The conformance items in §59 reference both this document and the +protocol spec. + +Where this spec uses terms like "PATCH" or "etag", it does so under +the protocol spec's definitions. + +### §19. The framework's view of the task record + +The framework writes/reads the following fields on every task record. +Field meanings beyond this table are defined in the protocol spec. + +| Field | Type | Owned by | Set on | +|---|---|---|---| +| `id` | string | caller | `create`. | +| `agent_name` | string | framework | `create`. | +| `session_id` | string | framework | `create`. | +| `status` | `pending` / `in_progress` / `suspended` / `completed` | framework | `create`, status transitions (§24). | +| `title` | string \| null | caller | `create` (optional). | +| `description` | string \| null | caller | `create` (optional). | +| `lease` | LeaseInfo (§22) | framework | `create`, every renewal, every reclaim. | +| `payload` | object | framework + developer | almost every transition (§20). | +| `tags` | map of string -> string | framework + caller | `create` (framework stamps `_task_name`); caller-set tags allowed. | +| `error` | object \| null | framework | on handler raise. | +| `suspension_reason` | string \| null | framework | on suspend. | +| `source` | object | framework | `create` (§21). | +| `attachments` | object \| null | framework + developer | on input promotion / drain / suspend / orphan cleanup (§23). | +| `etag` | string | server | every server-issued response. | +| `created_at` | ISO-8601 string | server | `create`. | +| `updated_at` | ISO-8601 string | server | every PATCH. | +| `started_at` | ISO-8601 string \| null | server | **set once on first `in_progress` transition; never updated thereafter** (lease re-acquisition, recovery scanner takeover, and suspend/resume cycles do NOT reset). | +| `completed_at` | ISO-8601 string \| null | server | terminal transition. | + +Caller-controlled fields (`tags` keys NOT starting with `_task_`, +`title`, `description`) are passed through verbatim. Framework-owned +fields MUST NOT be set by caller code. + +### §20. Framework-reserved payload keys + +`payload` is the JSON object that holds both the framework's +runtime state and the developer's metadata. The framework reserves +the following top-level keys, all starting with `_` or named +`input`/`metadata`/`output`: + +| Key | Type | Lifetime | Meaning | +|---|---|---|---| +| `input` | any JSON value, or a ref dict (§23) | Set on every `in_progress` transition; cleared at suspend; cleared by drain after consumption. | The current input value (or a ref to its attachment). | +| `metadata` | object | Persisted at boundaries; auto-flushed. | The DEFAULT user metadata namespace. | +| `metadata:` | object | Same as above. | NAMED user metadata namespace ``. | +| `_last_input_id` | string \| null | Set when caller supplies `input_id`. | Chain-head tracking (§11). | +| `_turn_started_at` | ISO-8601 UTC string | Set at every turn-start boundary; NEVER re-stamped on recovery. | Source of truth for the per-turn watchdog (§14). | +| `_retry_attempt` | integer | Incremented on handler raise; reset to 0 on steering drain. (Not also reset on success in the canonical Python implementation.) | Durable retry counter (§15). | +| `_steering` | object (see below) | Only present on steerable tasks. | Steering mechanism state (§12). | + +The framework does NOT persist the handler's return value in the +task record. There is no `payload["output"]` key and no `_output` +attachment. The handler's return value resolves the in-process +caller's `TaskRun.result()` future and is then no longer reachable +from the persisted record. Per-turn outputs that need to survive +crashes are the handler's responsibility — write them through +your own storage (e.g., LangGraph checkpoint, your own DB) before +returning. + +Likewise, `error` from a handler raise is NOT persisted. The +framework emits a structured ERROR log (named +`durable_task_handler_failure`) on every handler raise, but the +chain record itself does not carry the per-turn diagnostic. + +`_steering` object shape: + +| Sub-key | Type | Meaning | +|---|---|---| +| `pending_inputs` | array of input values OR refs (§23) | FIFO of queued steering inputs. | +| `next_input_seq` | integer | Monotonic counter for promoted-attachment key allocation (NEVER reused). | +| `cancel_requested` | boolean | Durable cancel signal; set on steering append; cleared after drain when pending is empty. | +| `drain_in_progress` | boolean | True between the start of a drain PATCH and the next turn-start; protects against partial drain on crash. | +| `active_input` | any JSON value OR ref | The single input being drained (mirror copy used by the race-recovery contract). Cleared at suspend / terminal. | + +Implementers in other languages MUST use these exact key names. A +process built in language X must be able to recover a task created +by language Y. + +Keys NOT in this table are caller-controlled (e.g. user metadata +namespaces); the framework leaves them alone. + +### §21. Framework-reserved tag keys and `source` shape + +#### Reserved tag keys + +The framework stamps the following `tags` entries on `create`: + +| Tag key | Value | Purpose | +|---|---|---| +| `_task_name` | The decorator's `name` (or `fn.__qualname__` fallback). | Server-side `LIST` filtering by task name. | + +Tag keys starting with `_task_` are RESERVED. Caller-supplied tags +using this prefix are stripped at the call site with a warning; +the framework does not pass them to the server. + +#### `source` shape + +The framework stamps `source` on `create`: + +``` +{ + "type": "agentserver.task", + "name": "", + "server_version": "/ (/)" +} +``` + +`source.name` is the **canonical identity anchor** for recovery +routing — the framework looks up the registered handler callback +by matching `source.name` against the decorator-supplied names. +`source.type` is currently a single fixed string but is reserved +for future namespacing. + +### §22. Lease structure and ownership semantics + +`lease` is a sub-object with the following fields: + +| Field | Type | Meaning | +|---|---|---| +| `owner` | string | `\|session:` (§7). Stable across process lifetimes. | +| `instance_id` | string | `worker---`. Fresh per process. | +| `generation` | integer | Increments each time the lease is re-acquired with a different `instance_id`. Mirrored to `ctx.recovery_count`. The local provider AND the hosted task store both bump this. | +| `expires_at` | ISO-8601 UTC string | When the lease expires (and another process may reclaim). | +| `expiry_count` | integer | Number of times ownership has changed via **actual expiry** (i.e. lease was reclaimed because the prior lease's `expires_at` passed, NOT because the same owner restarted). **Server- / provider-only counter** — the framework never writes this field (it is not on `TaskPatchRequest`). The hosted task store bumps it; the local file provider also bumps it on actual-expiry reclaim for parity (so local-mode tests can assert expiry-counter behavior). Surfaced on the framework's internal `TaskInfo`; NOT projected onto the public `TaskRun` handle (lease bookkeeping is framework-internal). | +| `heartbeat_at` | ISO-8601 UTC string | Wall time of the most recent lease write (acquisition, renewal, or force-expire). Stamped by the provider on every lease-touching PATCH. **Provider-only field** — the framework never writes this; consumers and observability tooling read it to distinguish "fresh lease" from "lease that hasn't expired yet". NOT projected onto the public `TaskRun` handle — it's a framework / operator concern, not a developer one. | + +The framework's interaction with the lease: + +- On `create`, the framework sets `lease_owner = self.owner`, + `lease_instance_id = self.instance_id`, and + `lease_duration_seconds = 60` (the framework default). +- The lease renewal loop (§56) renews at half the lease duration + (every 30s by default), but its next tick is computed + DYNAMICALLY from the per-task last-refresh time, NOT a fixed + cadence. So a PATCH within the last `interval` seconds fully + shadows the next heartbeat. +- **Every PATCH the framework issues** (renewal, metadata, + steering, suspend, drain, complete, fail, reclaim) MUST + piggyback (`lease_owner`, `lease_instance_id`, + `lease_duration_seconds`) to refresh the lease as a side effect. + See §25.4. +- On reclaim (§54), the framework PATCHes the lease to itself with + `if_match: ` for CAS. BOTH the inline reclaim + AND the cold-start/periodic scan reclaim use `if_match` (closes + the prior known gap). +- On `ctx.exit_for_recovery()` (§16), the framework force-expires + the lease so the next process can reclaim immediately. + +The framework recognizes three lease states for a foreign-instance +or expired record: + +1. **Live and same-instance** — my own running task; do nothing. +2. **Live and different-instance, same-owner** — a previous lifetime + of mine. RECLAIM immediately (no expiry wait). `expiry_count` is + NOT bumped (the server only bumps on actual-expiry handoff, and + this isn't one). +3. **Expired (any owner)** — RECLAIM. `expiry_count` IS bumped + (server-side, in the hosted store; AND in the local provider + for parity — see the table above). + +**Important: the framework never writes `expiry_count`.** It is not +a field on `TaskPatchRequest` (only `lease_owner`, +`lease_instance_id`, `lease_duration_seconds` are writable). The +hosted task store and the local file provider both increment it +server-side / provider-side on actual-expiry ownership change; the +framework only reads it. + +#### 22.1 Lease write rules (provider-enforced, identical for hosted and local) + +These rules MUST be enforced by **both** providers identically. +Violations raise the internal `_HostedConflict` (§39) which the +framework translates to public exceptions per the translation table +(also §39). Local file provider raises the same logical conditions +directly, with the same internal classification, so the framework +behaves identically against either backing. + +| # | Rule | When violated | +|---|---|---| +| LSE-W-1 | `lease_duration_seconds` MUST be `0` (force-expire) OR in the range `10..3600` (renewal). | Reject as `invalid_request` (400). | +| LSE-W-2 | The triplet `(lease_owner, lease_instance_id, lease_duration_seconds)` is all-or-nothing. Supplying any one without all three is rejected. | Reject as `invalid_request` (400). | +| LSE-W-3 | Lease acquisition / renewal against a record whose lease is currently held by a **different** owner AND not yet expired is rejected. | Raise `_HostedConflict(_code="lease_held_by_another")` → `TaskConflictError(current_status="in_progress")`. | +| LSE-W-4 | When transitioning a task from `in_progress` → `pending`, the supplied `(lease_owner, lease_instance_id)` MUST match the record's current lease. | Raise `_HostedConflict(_code="lease_held_by_another")`. | +| LSE-W-5 | Lease renewal (no status change, `lease_duration_seconds > 0`) is only valid when the current status is `in_progress`. Renewing on `pending` / `suspended` / `completed` is rejected. | Reject as `invalid_request` (400). | +| LSE-W-6 | `lease_duration_seconds = 0` (force-expire) cannot be combined with a status transition in the same PATCH. | Reject as `invalid_request` (400). | +| LSE-W-7 | Force-expire (`lease_duration_seconds = 0`) requires the caller's `(lease_owner, lease_instance_id)` to match the current lease UNLESS the lease is already expired (in which case any caller may force-expire). | Raise `_HostedConflict(_code="lease_held_by_another")` if mismatched and lease is still live. | +| LSE-W-8 | `started_at` is **immutable** after the first `in_progress` transition. Lease re-acquisition (including expired-lease takeover by a different owner OR same-owner restart) MUST NOT update `started_at`. The original wall-clock time of the first turn-start is preserved across recovery, restarts, and suspend/resume cycles. | (Behavioral — observable via the task manager's provider; not on the public `TaskRun` handle.) | +| LSE-W-9 | On lease handoff to a different owner where the prior lease was **expired**, `expiry_count` MUST be incremented. Same-owner different-instance handoff before expiry does NOT bump. | (Behavioral — observable via the task manager's provider; not on the public `TaskRun` handle.) | +| LSE-W-10 | On every successful lease write (acquisition, renewal, force-expire), the provider MUST stamp the lease's `heartbeat_at` field to "now". This field exists on `LeaseInfo` so consumers and observability tooling can distinguish a fresh lease from one that simply hasn't expired yet. | (Behavioral — observable through `LeaseInfo.heartbeat_at` in the internal `TaskInfo`. Not on the public surface.) | + +### §23. Attachments and input promotion + +The hosted task store provides a second per-task storage slot, +`attachments`, alongside `payload`. The two stores have different +budgets: + +| Slot | Per-task cap | Per-value cap | Entry count cap | +|---|---|---|---| +| `payload` | 1 MB | n/a (shared) | unlimited keys | +| `attachments` | n/a (per-entry only) | 2 MB per attachment | 20 attachments max | + +`attachments` lets the framework lift the per-input ceiling from +"however much fits in payload alongside everything else" to +**2 MB per input** without evicting metadata budget. + +#### 23.1 PATCH merge semantics + +The hosted store's merge semantics for `attachments` mirror `tags`: + +- Key present with non-null value -> **upsert** (new) or **replace** (existing). +- Key present with `null` -> **delete** that entry. +- Key absent -> **unchanged**. +- `attachments` field absent entirely -> no attachment changes. + +PATCHes that include BOTH `payload` and `attachments` are atomic +across both stores. This is load-bearing: every promote, drain, +suspend, and orphan-cleanup write co-PATCHes payload + attachments +in a single round trip. + +#### 23.2 Thresholds + always-attachment for output (framework-owned) + +The framework treats different channels differently. Inputs use a +size threshold; output ALWAYS uses an attachment (no threshold, +no inline shape). + +| Channel | Promotion rule | Attachment key | +|---|---|---| +| Function input (`payload["input"]`) | > 200 KiB serialized → ref; otherwise inline. | `_input` | +| Each steering input (entry in `_steering["pending_inputs"]`) | > 20 KiB serialized → ref; otherwise inline. | `_steering_input_` | + +Different rules because: + +- The function input is set once per turn-start. A 200 KiB inline + budget keeps small inputs cheap and only spills clearly-large ones. +- Steering inputs may accumulate (up to 9 queued). A 20 KiB + threshold caps the worst-case inline payload contribution from + steering at ~180 KiB even when the queue is full. + +There is no `_output` channel and no output promotion. The +framework does not persist handler return values; outputs resolve +the in-process caller's `TaskRun.result()` future directly and are +never projected onto the task record. + +Sizes are measured in bytes of canonical JSON +(`sort_keys=True`, separators `(",", ":")`). + +Worst-case framework attachment usage: +`_input` (1) + `_steering_input_*` (up to 9) = +**10 of 20** per-task attachment slots. Leaves 10 slots free for +future use. + +#### 23.3 Wire shapes — two only + +A slot that would hold an input (`payload["input"]`, an entry in +`_steering["pending_inputs"]`) is represented in exactly one of two +shapes: + +**Inline** (size <= threshold): the raw JSON value, verbatim. + +**Ref** (size > threshold): a single-magic-key dict pointing at the +attachment: + +```json +{ + "__attachment_ref__": { + "key": "", + "hash": "sha256:<64 lowercase hex chars>" + } +} +``` + +**Detection rule** (used everywhere the framework reads a slot): +the slot is a ref iff (1) it is a JSON object, (2) it has exactly +one key, (3) that key is `__attachment_ref__`, (4) the value is an +object with both `key` and `hash`. Everything else is inline. + +The inline + ref shapes are **disjoint**: a developer-supplied +inline value cannot accidentally be misread as a ref because the +detection rule's 4-step structure is too specific to occur +incidentally. + +#### 23.4 Single wire shape + +The framework reads and writes exactly the inline + ref shapes +documented in §23.3. The primitive is in private preview; there is +no version-skew compatibility to maintain. + +#### 23.5 Sequence number invariants (steering) + +`payload["_steering"]["next_input_seq"]` is the monotonic counter +the framework uses to derive `_steering_input_` keys. Critical +invariants: + +- **Advances ONLY on promotion.** Inline steering appends do not + bump `next_input_seq`. +- **Never reused.** A drained-and-deleted key is never re-allocated; + the next promoted append always uses the current + `next_input_seq`, then `next_input_seq += 1`. +- **Stable for surviving entries.** A drain pops the head of + `pending_inputs` and (if it was a ref) deletes the corresponding + `_steering_input_` attachment. It does NOT renumber any + other entry. A queue of `[ref_3, ref_4]` becomes `[ref_4]` after + one drain; `ref_4` keeps its key. + +This invariant is what allows the framework to drain without +re-uploading attachments — a property that would be impossible if +keys encoded queue position. + +#### 23.6 Content hash + +Every ref carries `hash: "sha256:"` where the hex is the +SHA-256 of the canonical JSON bytes +(`sort_keys=True`, separators `(",", ":")`) of the attachment +value. The framework writes the hash on promotion. + +**Hash validation (known gap).** The canonical Python +implementation today writes the hash on every promotion but does +NOT validate it on read — `_read_input_value()` resolves the ref +key against `attachments` and returns the value without +recomputing the hash. Other-language implementers SHOULD validate +on read (recompute hash from the attachment value, compare against +the ref's hash, raise on mismatch) to detect store-side +corruption. Cross-implementation byte-compatibility requires using +the SAME canonicalization rules so a write from one language can +be validated by another. + +The hash is sufficient for ref validity once validated (no separate +write-timestamp is needed): SHA-256 birthday-bound collision +probability at fleet trillion/sec × 100 years is < 1 in 10^33. + +#### 23.7 Caps and pre-network enforcement + +Caps: + +- Per-attachment value: **2 MB** serialized. +- Per-task attachment count: **20**. + +The framework enforces (pre-network) and surfaces developer-facing +exceptions based on which channel the violation occurs on: + +| Cap | Where enforced | Developer-facing exception | +|---|---|---| +| Per-value (2 MB) on `_input` | Create + PATCH, both providers | `InputTooLarge` (the framework remaps an internal `_AttachmentTooLarge` based on attachment-key prefix) | +| Per-value (2 MB) on `_steering_input_` | Steering append site (always reads state first to count) | `InputTooLarge` | + +| Per-task count (20) on `create` | Create path | `_AttachmentLimitExceeded` (internal) — reachable only via direct provider use, which is unsupported | +| Per-task count (20) on `patch` | Local provider (cheap count); hosted PATCH relies on server-side check | `_AttachmentLimitExceeded` (internal) | + +Internal exceptions `_AttachmentTooLarge` and +`_AttachmentLimitExceeded` are **provider-internal** — they are +NOT exported from `durable/__init__.py`. The framework catches +`_AttachmentTooLarge` and re-raises the appropriate developer- +facing exception based on the attachment key prefix (`_input` / +`_steering_input_*` → `InputTooLarge`). +`_AttachmentLimitExceeded` is unreachable in normal framework +operation (worst case is 11 of 20 slots; see §23.2) and if it ever +propagates indicates a framework bug — caught at the boundary and +converted to `RuntimeError`. + +#### 23.8 Atomic co-writes + +These transitions MUST be single PATCHes carrying BOTH `payload` and +`attachments`: + +1. **Promote on `.start()` (fresh)**: `attachments["_input"] = ` + + `payload["input"] = {ref}` (CREATE on the hosted store). +2. **Promote on resume**: same fields, but PATCH. +3. **Suspend (multi-turn turn-end via `return X`)**: + - `payload["input"] = null` + - `payload["_steering"]["active_input"] = null` + - `payload["_retry_attempt"] = null` (fresh budget for the next turn) + - `attachments["_input"] = null` (delete) IF the input was a ref +4. **Steering append (promoted)**: `payload["_steering"]["pending_inputs"] + += [{ref}]`, `attachments["_steering_input_"] = `, + `payload["_steering"]["next_input_seq"] += 1`, + `payload["_steering"]["cancel_requested"] = true`. +5. **Steering drain (promoted entry, Phase 1)**: + `payload["_steering"]["pending_inputs"]` without the popped + head, `attachments["_steering_input_"] = null`, + plus the new turn's `_turn_started_at`. +6. **One-shot completion**: the record is deleted (one-shot is + always ephemeral). +7. **Failure**: one-shot → record deleted; multi-turn → status="suspended" + with `suspension_reason="run_completion"`. No `payload["error"]` + is written; the per-turn failure surfaces to the caller via + `TaskFailed(error=...)` and via the structured log + `durable_task_handler_failure`. +8. **Resume (suspended → in_progress)**: status="in_progress", + `_turn_started_at` re-stamped, `_retry_attempt` reset to 0. + New input written (inline or as ref + attachment per §23.2). + +Splitting any of these into multiple PATCHes opens a crash window +where the attachment exists without its ref (or vice versa). The +framework treats this as a single-PATCH invariant. + +#### 23.9 Attachment key validation + +Attachment keys MUST match the regex `^[a-zA-Z0-9_.\-]{1,64}$` and +MUST NOT be empty after trimming whitespace. Both providers enforce +this on every CREATE / PATCH write. The framework's reserved keys (`_input`, `_steering_input_`) all conform. +Developer-supplied attachment keys (none exist today — attachments +are framework-owned per §23.7) would also be validated against this +regex if the surface is ever expanded. + +#### 23.10 Clear-all gesture + +In addition to per-key null-as-delete (§23.1), the provider accepts a +top-level "clear all attachments" gesture: + +- Wire form: `PATCH ... { "attachments": null }`. +- Effect: deletes every attachment on the task, regardless of which + keys currently exist. Per-key entries supplied in the same PATCH + are NOT applied (the clear takes precedence). +- Typed-API form: `TaskPatchRequest.clear_attachments = true`. When + set, the hosted provider serializes `attachments: null`; the local + provider clears the attachments dict directly. Mutually exclusive + with `attachments={...}` (per-key patch) in the same request — the + combination is rejected as `invalid_request`. +- The framework today never emits this gesture; per-key delete + covers all current needs. It is documented for parity with the + service and for future internal callers (e.g. orphan-attachment + cleanup post-recovery). + +DELETE on a task removes all attachments along with the task. The +local provider achieves this trivially (attachments live in the +same JSON file as the task record; unlinking the file removes +both). The hosted provider relies on the service's blob-cleanup +hook. + +### §24. Status state machine + +The framework drives the following transitions: + +``` + create() handler returns + │ or raises + ▼ ┌──────────────┐ + ┌──────────┐ auto-start ┌──────────────│ completed │ + │ pending │ ──────────────▶│ in_progress │ (terminal) │ + └──────────┘ │ │ │ + │ └──────────────┘ + │ return X (multi-turn) + ▼ ▲ + ┌──────────┐ │ + │suspended │ ────────┘ + └──────────┘ .run/.start with new input + ▲ + │ + │ reclaim (same status, + │ new lease) + │ + └─── in_progress (foreign lease) +``` + +Notes: + +- The framework usually creates with `status = in_progress` directly + (the `pending` state is rarely externally observed). +- `in_progress -> in_progress` is the most-traversed transition + (every lease renewal, every reclaim, every steering drain, every + successful retry). +- `completed` is terminal; the *outcome* (success / failure / + cancel) is communicated through the typed exceptions, not via a + separate status value. +- `ctx.exit_for_recovery()` preserves `in_progress` and force-expires + the lease — it is the only way to release ownership without moving + to a different status (§16). + +#### 24.1 Allowed transition matrix (provider-enforced) + +The provider rejects PATCHes whose declared `status` transition is +not in this table. Internal classification `_HostedConflict(_code="invalid_state_transition")`, +translated to a generic framework error at the boundary (this +condition should never escape to developer code — the framework +chooses transitions, not the developer; if it ever does escape it's +a framework bug per Workstream C). + +| From → To | `pending` | `in_progress` | `suspended` | `completed` | +|---|---|---|---|---| +| `pending` | n/a | ✅ | ❌ | ✅ | +| `in_progress` | ✅ (with matching lease) | ✅ (lease renewal) | ✅ | ✅ | +| `suspended` | ✅ | ✅ | ✅ | ✅ | +| `completed` | ❌ (terminal) | ❌ | ❌ | ✅ (no-op only — see §24.2) | + +#### 24.2 Terminal immutability + +A PATCH against a task whose current status is `completed` is +rejected UNLESS the PATCH is a no-op `completed → completed` AND +carries no other field changes (no `payload`, no `tags`, no +`error`, no `suspension_reason`, no lease). The no-op pass-through +returns the existing record without modification — this lets +idempotent retry-loops behave predictably. + +Any other PATCH against a completed task raises +`_HostedConflict(_code="task_immutable")` → translated to +`TaskConflictError(current_status="completed")`. + +#### 24.3 Delete force semantics + +DELETE on a task in any **non-terminal** status (`pending`, +`in_progress`, `suspended`) requires `force=true`. Without it the +provider rejects the delete as `invalid_request` (400) — note this +is **NOT** a conflict (409); the service's PR 2135250 explicitly +moved this from 409 → 400 with code `invalid_request`. + +DELETE on a **terminal** (`completed`) task always succeeds (no +force required). + +DELETE additionally honors `If-Match`: when supplied, the +provider rejects the delete with `_HostedConflict(_code="etag_mismatch")` +→ `EtagConflict` if the supplied etag does not match the current +record. + +### §25. ETag (optimistic concurrency) + in-process write serialization + +The framework uses the hosted store's ETag/CAS protocol per the +Foundry Task Storage Protocol spec. + +#### 25.1 Etag tracking — always-on after the first read/create + +After the first successful read/create on a `task_id`, **every +subsequent PATCH MUST carry `If-Match` with the latest known etag** +for that task. The framework tracks the latest etag in the +in-memory active-task entry, updating it from every PATCH/GET +response. `delete()` is the only operation that MUST NOT carry +`if_match` — deletion is intentionally unconditional and tolerates +a concurrent winner. + +**No blind writes.** This applies to *every* PATCH-issuing site, +including those that hold the per-task write lock and call the +provider directly to avoid re-entrant lock acquisition (e.g. the +queued-steering-cancel path): such sites MUST go through the +lock-held update helper that selects `If-Match` from the tracked +etag, never a bare `provider.update` with no `if_match`. + +The service-returned `etag` value is passed verbatim as `If-Match` +on the next PATCH. The framework does NOT strip surrounding quotes, +normalize whitespace, or otherwise rewrite it. + +#### 25.2 Per-task in-process write queue + +Without coordination, the framework has multiple concurrent +PATCH-issuing code paths against the same task: lease renewal +heartbeats, metadata flushes (handler-issued AND auto-flush at +turn boundaries), steering append, steering drain Phase-1/3, +suspend, complete, fail, output writes, and reclaim. All of these +race in-process for the same etag and can produce avoidable 412 +conflicts in steady state. + +The framework MUST serialize these writes through a **per-task +asyncio lock** held for the read-state + compute-PATCH + apply +cycle. Reads (e.g., `Task.get(task_id)`) do NOT take this lock — +they're snapshot operations that don't move the etag. + +The read MUST happen **inside** the lock for any read-modify-write +sequence (steering drain, queued-steering-cancel, etc.), so the +record read and the PATCH are atomic with respect to other +in-process writers (notably the lease-renewal heartbeat). A site +that reads the record (or pins an etag) *before* acquiring the lock +can have its etag invalidated by the heartbeat between the read and +the write, which under contention starves the retry budget. Because +the per-task lock is a **non-reentrant** `asyncio.Lock`, the +framework provides two helpers: a lock-acquiring update (for callers +that do not hold the lock) and a lock-held update (for callers that +already hold it, e.g. the drain); both select `If-Match` from the +tracked etag and refresh it on success. + +Lock lifecycle: + +- Per-`task_id` `asyncio.Lock` allocated lazily on first write. +- Released after the PATCH response is recorded (etag updated). +- Removed from the in-memory lock table when the local active-task + entry is torn down (no leaked locks). + +In-process contention now serializes; cross-process contention +(another worker reclaimed the lease) still surfaces as 412 because +the queue is in-process only. + +#### 25.3 412 (etag conflict) resolution — per-operation policy + +When a PATCH inside the queue gets a 412, the appropriate response +depends on the operation's INTENT. There is no single retry rule: + +| Operation | On 412, do what | +|---|---| +| Metadata flush | re-read state, overwrite the addressed namespace with local value (last-write-wins), retry (up to 5 attempts). | +| Steering append | re-read `_steering`, append to the NEW state's `pending_inputs`, bump `next_input_seq` from the NEW state, retry (up to 5 attempts). Idempotent when `input_id` is supplied. | +| Steering drain (Phase 1) | re-read `_steering`, drain the NEW head, retry (up to 5 attempts). | +| Steering drain (Phase 3) | re-read, retry (up to 5 attempts). | +| Lease renewal heartbeat | re-read lease; if still ours, retry; otherwise signal eviction. | +| Suspend / complete / fail terminal writes | **RE-READ + decide.** A 412 here means our etag is stale — that's all we know on its own. Re-read the record, then choose: (a) if the lease is **no longer ours** (`lease.owner` differs OR `lease.instance_id` differs OR `lease.expiry_count` bumped past our cached value) → ABANDON and signal awaiters via the eviction path (C-LSE-4 / C-ERR-2); the new owner is authoritative and our terminal would clobber their in-flight recovery. (b) If `status` is already terminal (`completed`) → ABANDON; another actor already wrote the terminal. (c) Otherwise (lease still ours, status still `in_progress`) → retry the terminal PATCH against the new etag, up to 5 attempts. Steering inputs that another process appended between our read and our retry are silently superseded by the terminal write — that is correct behavior because the steerer's `.result()` MUST then raise `TaskConflictError(current_status="completed")` per C-STR-6, which is how cross-process steering-after-terminate is supposed to surface. | +| Output write (part of suspend/complete) | inherits the parent operation's policy. | +| Resume-clear-output (part of resume) | re-read, retry (up to 5 attempts). | +| Recovery reclaim (inline) | ABANDON. The 412 IS the race-detection — another process beat us to the reclaim. Let the next caller / scan re-evaluate. | +| Recovery reclaim (cold-start / periodic) | ABANDON. Same reasoning. | + +Default retry budget is 5 attempts unless noted. Each retry +re-acquires the per-task lock before the re-read + re-merge + re-write +cycle. `LastInputIdPreconditionFailed` (for `if_last_input_id`) and +`EtagConflict` (for low-level callers) propagate as today. + +#### 25.4 Auto-extension piggyback on every PATCH + +Every PATCH the framework issues — renewal, metadata, steering, +suspend, etc. — MUST include the lease-extension trio +(`lease_owner`, `lease_instance_id`, `lease_duration_seconds`) so +the lease is refreshed as a side effect. The renewal loop's next +tick is computed dynamically from the per-task last-refresh time +(NOT a fixed cadence), so a PATCH within the last `interval` +seconds fully shadows the next heartbeat. See §56. + +**Lease renewal requires `in_progress`.** The task store accepts the +lease-extension trio as a *renewal* only when the record is already +`in_progress`, and as a *claim* only when the same PATCH transitions +the record INTO `in_progress` (e.g. reclaim, or the steering-drain +Phase-1 PATCH per §52). A PATCH that carries the lease trio against a +`suspended`/`pending`/terminal record WITHOUT a status flip to +`in_progress` is rejected ("lease renewal is only supported for +in_progress tasks"). Therefore any framework path that writes to a +record left `suspended` by a prior turn (notably the steering drain) +MUST set `status='in_progress'` in the same PATCH. The local provider +enforces this same rule so the conflict is reproducible without a +hosted deployment. + +### §26. Recovery — internal lifecycle, no public HTTP endpoint + +There is no HTTP route for resume. Resume is initiated from +caller code via the normal `Task.start` / `Task.run` (one-shot) +or `MultiTurnTask.start` / `MultiTurnTask.run` (multi-turn) entry +points. The framework's lifecycle state machine transitions a +`suspended` task back to `in_progress` and re-enters the handler +without exposing a server-side endpoint. + +Crash recovery for tasks that died mid-`in_progress` is handled +internally by the periodic recovery scanner described in §55: +the scanner detects abandoned leases and re-invokes the handler +with the persisted `payload["input"]` and +`entry_mode="recovered"`. + +--- + +## Part IV — Provider abstraction (storage backends) + +> **Visibility:** Everything in this part is **framework-internal**. +> The `TaskProvider` interface and the two concrete providers +> (`HostedTaskProvider`, `LocalFileTaskProvider`) are NOT part of +> the public surface defined in Part V — in the canonical Python +> implementation, all of these live in `_`-prefixed modules +> (`_provider.py`, `_client.py`, `_local_provider.py`) and are +> NOT re-exported from `durable/__init__.py`'s `__all__`. The +> abstraction exists to keep the manager testable and to let the +> framework swap hosted vs. local backends — but framework +> consumers are not expected (and not supported) to construct or +> consume providers directly. This part documents the contract a +> re-implementer (in another language) MUST satisfy when writing +> the provider layer. + +### §27. `TaskProvider` interface + +The framework abstracts over the storage backend via a single +async interface. Two providers ship: hosted (HTTP-backed) and local +(file-backed); a third (in-memory) is conceptually possible. + +``` +class TaskProvider: + async def create(request: TaskCreateRequest) -> TaskInfo: ... + async def get(task_id: str) -> TaskInfo | None: ... + async def update(task_id: str, patch: TaskPatchRequest) -> TaskInfo: ... + async def delete(task_id: str, *, force: bool = False, cascade: bool = False) -> None: ... + async def list(*, agent_name: str | None = None, + session_id: str | None = None, + status: TaskStatus | None = None, + tag: dict[str, str] | None = None, + source_type: str | None = None) -> list[TaskInfo]: ... +``` + +Semantic requirements: + +- `get(task_id)` MUST return `None` for missing tasks (not raise). +- `update()` MUST honor the `if_match` field on the patch for CAS. +- `update()` payload MUST shallow-merge. +- `update()` tags MUST null-as-delete merge. +- `update()` attachments MUST null-as-delete merge (§23.1). +- `delete()` MUST be idempotent at the SCHEDULING level (multiple + `.delete()` calls do not error). The provider's lower-level + `provider.delete(task_id)` MAY raise `TaskNotFound` for already- + deleted records; callers of the provider directly MUST handle + this. The canonical Python implementation's hosted provider + raises on 404 and the local provider raises on missing files; + `MultiTurnTask.delete(task_id)` shields user code from these by catching + "not found" substring matches and re-raising as `TaskNotFound` + the first time, and being a no-op only at the user-facing + `Task` surface. +- `list(...)` MUST filter server-side; framework relies on it. + +`TaskCreateRequest` and `TaskPatchRequest` are simple structs +mirroring the writable subset of `TaskInfo` (plus `if_match`, +`lease_owner`, `lease_instance_id`, `lease_duration_seconds`). + +### §28. Hosted provider (HTTP) + +The hosted provider implements `TaskProvider` over HTTP against the +Foundry Task Storage service. Selected when the platform-supplied +environment variable `FOUNDRY_HOSTING_ENVIRONMENT` is set. + +Key implementation notes: + +- **API version:** Pinned at framework build time. The framework + carries one `_API_VERSION` constant (current canonical value: + `"v1"`) and passes it as the `api-version` query parameter on + every request. +- **Authentication:** Bearer token from a `TokenCredential` + resolved at request time. Scope is `https://ai.azure.com/.default`. +- **User-Agent:** Identifies the framework + version + runtime + (`ai-agentserver-core/`). +- **Custom error classification:** The provider classifies every + non-success response into one of four labels and raises a typed + `TransportClassifiedError(classification=