diff --git a/agent-schema.json b/agent-schema.json index ad4bad372..374c82346 100644 --- a/agent-schema.json +++ b/agent-schema.json @@ -1051,6 +1051,20 @@ "$ref": "#/definitions/HookDefinition" } }, + "user_steering_messages_submit": { + "type": "array", + "description": "Hooks that run each time the runtime drains the steering queue and appends the queued user messages to the session \u2014 messages the user submitted while the agent was already working (mid-turn, after the model stopped, or while idle before the first model call). The drained messages are passed in the steering_messages field. Like user_prompt_submit, hooks can block the run (decision=block / continue=false / exit code 2) or contribute additional_context that is spliced into the conversation as a transient system message for the steered turn only \u2014 it is NOT persisted to the session.", + "items": { + "$ref": "#/definitions/HookDefinition" + } + }, + "user_followup_submit": { + "type": "array", + "description": "Hooks that run each time the runtime dequeues a follow-up message at the end of a turn and starts a fresh turn for it. Follow-ups are user messages queued for end-of-turn processing (the FollowUp API / queue), distinct from mid-turn steering \u2014 the model sees them as fresh input. The follow-up text is passed in the prompt field. Like user_prompt_submit, hooks can block the run (decision=block / continue=false / exit code 2) or contribute additional_context that is spliced into the conversation as a transient system message for the follow-up turn only \u2014 it is NOT persisted to the session.", + "items": { + "$ref": "#/definitions/HookDefinition" + } + }, "turn_start": { "type": "array", "description": "Hooks that run at the start of every agent turn (each model call). Their AdditionalContext is appended as transient system messages for that turn only \u2014 it is NOT persisted to the session, so per-turn signals (date, prompt files) are recomputed every turn instead of bloating message history on every resume.", diff --git a/docs/configuration/hooks/index.md b/docs/configuration/hooks/index.md index cc635644e..9c9a366b9 100644 --- a/docs/configuration/hooks/index.md +++ b/docs/configuration/hooks/index.md @@ -42,6 +42,8 @@ docker-agent dispatches the following hook events: | `permission_request` | Just before the runtime would prompt the user to approve a tool | Yes | | `session_start` | When a session begins or resumes | No | | `user_prompt_submit` | Once per user message, after submission and before the model runs | Yes | +| `user_steering_messages_submit` | Each time queued steering messages are drained (mid-turn, after stop, or while idle) | Yes | +| `user_followup_submit` | Each time a queued follow-up message starts a fresh turn (end-of-turn) | Yes | | `turn_start` | At the start of every agent turn (each model call) | No | | `turn_end` | At the end of every agent turn — fires no matter why the turn ended | No | | `before_llm_call` | Just before every model call (after `turn_start`) | Yes | @@ -256,6 +258,8 @@ In addition to the common fields, each event ships its own payload: | `permission_request` | `tool_name`, `tool_use_id`, `tool_input` | | `session_start` | `source` — one of `startup`, `resume`, `clear`, `compact` | | `user_prompt_submit` | `prompt` — the text the user just submitted | +| `user_steering_messages_submit` | `steering_messages` — the drained steering messages, in submission order | +| `user_followup_submit` | `prompt` — the text of the dequeued follow-up message | | `turn_start` | _none_ (just the common fields) | | `turn_end` | `agent_name`, `reason` — one of `normal`, `continue`, `steered`, `error`, `canceled`, `hook_blocked`, `loop_detected` | | `before_llm_call` | `iteration` — 1-based run-loop iteration counter (the model call this hook is gating), `model_id` | @@ -279,6 +283,8 @@ Notes: - `tool_response` for `post_tool_use` carries the tool's result; `tool_error` is `true` when the tool failed (the failure detail is surfaced inside `tool_response`). - `prompt` is only populated for `user_prompt_submit`. Sub-sessions (transferred tasks, background agents, skills) do **not** fire this event because their kick-off message is synthesised by the runtime, not authored by the user. +- `steering_messages` is only populated for `user_steering_messages_submit`. It carries the user messages the runtime just drained from the steering queue — messages submitted while the agent was already working (mid-turn, after the model stopped, or while idle before the first model call). +- `prompt` is also populated for `user_followup_submit`, carrying the text of the dequeued follow-up message (a user message queued for end-of-turn processing via the FollowUp API / queue, as opposed to mid-turn steering). - `stop_response` carries the model's final assistant text for `stop`, `after_llm_call`, and `subagent_stop`. `last_user_message` carries the latest user message at dispatch time. - `model_id` is populated for `after_llm_call` (and `before_llm_call`) in the canonical `/` form (e.g. `anthropic/claude-sonnet-4-5`). For harness agents, `model_id` is the harness label (e.g. `claude-code`) rather than a canonical model name — see [Coding Harnesses]({{ '/features/harnesses/' | relative_url }}). - `context_limit` is `0` when the model definition is unavailable (treat `0` as "unknown", not as a real limit). @@ -340,7 +346,7 @@ This is the symmetric counterpart of `pre_tool_use`'s `updated_input`, applied t ### Context-Contributing Events -For `session_start`, `user_prompt_submit`, `turn_start`, `post_tool_use`, `pre_compact`, and `stop`, hooks may set `hook_specific_output.additional_context` to inject text into the conversation. `turn_start` context is **transient** (recomputed every turn, never persisted); `session_start` context **persists** for the life of the session. (`worktree_create` also surfaces stdout, but to the CLI user rather than the conversation — the session doesn't exist yet.) +For `session_start`, `user_prompt_submit`, `user_steering_messages_submit`, `user_followup_submit`, `turn_start`, `post_tool_use`, `pre_compact`, and `stop`, hooks may set `hook_specific_output.additional_context` to inject text into the conversation. `turn_start` context is **transient** (recomputed every turn, never persisted); `session_start` context **persists** for the life of the session. `user_steering_messages_submit` and `user_followup_submit` context is **transient** like `user_prompt_submit` — it is spliced into the steered/follow-up turn only and never persisted. (`worktree_create` also surfaces stdout, but to the CLI user rather than the conversation — the session doesn't exist yet.) ### Before-Compaction Specific Output @@ -359,7 +365,7 @@ Returning `decision: "block"` (or exit code 2) instead vetoes the compaction ent ### Plain Text Output -For `session_start`, `user_prompt_submit`, `turn_start`, `post_tool_use`, `pre_compact`, and `stop` hooks, plain text written to stdout (i.e., output that is not valid JSON) is captured as additional context for the agent. For `pre_compact` it is appended to the compaction prompt; for the others it is spliced into the conversation as a (transient or persisted) system message depending on the event. +For `session_start`, `user_prompt_submit`, `user_steering_messages_submit`, `user_followup_submit`, `turn_start`, `post_tool_use`, `pre_compact`, and `stop` hooks, plain text written to stdout (i.e., output that is not valid JSON) is captured as additional context for the agent. For `pre_compact` it is appended to the compaction prompt; for the others it is spliced into the conversation as a (transient or persisted) system message depending on the event. ## Exit Codes @@ -653,6 +659,50 @@ Return `additional_context` (or plain stdout) to append guidance to the compacti It does **not** fire for sub-sessions (transferred tasks, background agents, skill sub-sessions) because their kick-off message is synthesised by the runtime. +### User-Steering-Messages-Submit: gate or enrich mid-flight steering + +`user_steering_messages_submit` is the steering-queue analogue of `user_prompt_submit`. It fires each time the runtime drains the steering queue — messages the user submitted while the agent was already working: mid-turn (after a batch of tool calls), after the model stopped, or while idle before the first model call. The drained messages arrive as a JSON array in `steering_messages`. Use it to: + +- block a run when steering violates policy (`decision: block` / exit code 2), +- inject context in response to the steering (`additional_context` is spliced as a transient system message for the steered turn — never persisted, exactly like `user_prompt_submit`), +- audit steering messages to a log. + +Unlike `turn_end` with `reason: steered`, which only observes the mid-turn and post-stop drains, this event fires on **every** drain — including steering applied while the agent was idle before its first model call. + +```yaml +hooks: + user_steering_messages_submit: + - type: command + timeout: 5 + command: | + INPUT=$(cat) + COUNT=$(echo "$INPUT" | jq -r '.steering_messages | length') + echo "$INPUT" | jq -r '.steering_messages[]' >> /tmp/agent-steering.log + if [ "$COUNT" -gt 0 ]; then + echo '{"hook_specific_output":{"additional_context":"The user sent new instructions while you were working — re-read the latest user messages and adjust course before continuing."}}' + fi +``` + +### User-Followup-Submit: gate or enrich queued follow-ups + +`user_followup_submit` is the follow-up-queue analogue of `user_prompt_submit`. It fires each time the runtime dequeues a follow-up message at the end of a turn and starts a fresh turn for it. Follow-ups are user messages queued for end-of-turn processing (the FollowUp API / queue) — distinct from mid-turn steering: the model sees a follow-up as fresh input, not an interruption, and each follow-up gets a full undivided turn. The follow-up text is in `prompt`. Use it to: + +- block a queued follow-up that violates policy (`decision: block` / exit code 2), +- inject per-follow-up context (`additional_context` is spliced as a transient system message for the follow-up turn — never persisted, exactly like `user_prompt_submit`), +- audit follow-up messages to a log. + +This closes the gap left by `user_prompt_submit`, which fires only for the first interactive prompt and never for queued follow-ups. + +```yaml +hooks: + user_followup_submit: + - type: command + timeout: 5 + command: | + INPUT=$(cat) + echo "$INPUT" | jq -r '.prompt' >> /tmp/agent-followups.log +``` + ### Subagent-Stop: observe handoff completions `subagent_stop` fires whenever a sub-agent finishes — `transfer_task` returns, a background agent completes, or a skill sub-session ends. It runs against the *parent* agent's hooks executor, so handlers configured on the orchestrator see every child completion in one place. The sub-agent's name is in `agent_name`, the parent's session ID in `parent_session_id`, and the child's final assistant message in `stop_response`. diff --git a/examples/hooks.yaml b/examples/hooks.yaml index 5ac38c416..812357084 100644 --- a/examples/hooks.yaml +++ b/examples/hooks.yaml @@ -14,6 +14,13 @@ # prompting the user # session_start - one-time setup; AdditionalContext PERSISTS in the session # user_prompt_submit- runs once per user message, before the first LLM call +# user_steering_messages_submit +# - runs each time queued steering messages are drained +# (mid-turn, after stop, or while idle); AdditionalContext +# is TRANSIENT, like user_prompt_submit +# user_followup_submit +# - runs each time a queued follow-up message starts a fresh +# turn at end-of-turn; AdditionalContext is TRANSIENT # turn_start - per-turn context; AdditionalContext is TRANSIENT # turn_end - per-turn finalizer; fires no matter why the turn ended # before_llm_call - just before each model call (observability, guardrails) @@ -64,6 +71,8 @@ # Logs (tail these in another terminal): # /tmp/agent-session.log (session_start, session_end) # /tmp/agent-prompts.log (user_prompt_submit) +# /tmp/agent-steering.log (user_steering_messages_submit) +# /tmp/agent-followups.log (user_followup_submit) # /tmp/agent-llm-calls.log (before_llm_call, after_llm_call) # /tmp/agent-turns.log (turn_end) # /tmp/agent-tool-results.log (post_tool_use) @@ -209,6 +218,51 @@ agents: echo '{"hook_specific_output":{"additional_context":"Hook hint: agent log files live under /tmp/agent-*.log"}}' fi + # ==================================================================== + # USER-STEERING-MESSAGES-SUBMIT - runs each time the runtime drains + # the steering queue, i.e. messages the user submitted while the + # agent was already working (mid-turn, after the model stopped, or + # while idle before the first LLM call). The drained messages arrive + # as a JSON array in the steering_messages field. This is the + # steering-queue analogue of user_prompt_submit: you can block the + # run (decision=block / continue=false / exit code 2) or inject + # additional_context, which becomes a TRANSIENT system message for + # the steered turn only (never persisted). + # ==================================================================== + user_steering_messages_submit: + - type: command + timeout: 5 + command: | + INPUT=$(cat) + SESSION_ID=$(echo "$INPUT" | jq -r '.session_id // "unknown"') + COUNT=$(echo "$INPUT" | jq -r '.steering_messages | length') + echo "$INPUT" | jq -r '.steering_messages[]' \ + | sed "s/^/[$(date)] [$SESSION_ID] steer: /" >> /tmp/agent-steering.log + # Example: nudge the model to acknowledge mid-flight steering. + if [ "$COUNT" -gt 0 ]; then + echo '{"hook_specific_output":{"additional_context":"The user sent new instructions while you were working — re-read the latest user messages and adjust course before continuing."}}' + fi + + # ==================================================================== + # USER-FOLLOWUP-SUBMIT - runs each time the runtime dequeues a + # follow-up message at the end of a turn and starts a fresh turn + # for it. Follow-ups are user messages queued for end-of-turn + # processing (the FollowUp API / queue), distinct from mid-turn + # steering: the model sees them as fresh input. The follow-up text + # is in the prompt field. Like user_prompt_submit, you can block + # the run (decision=block / continue=false / exit code 2) or inject + # additional_context, which becomes a TRANSIENT system message for + # the follow-up turn only (never persisted). + # ==================================================================== + user_followup_submit: + - type: command + timeout: 5 + command: | + INPUT=$(cat) + SESSION_ID=$(echo "$INPUT" | jq -r '.session_id // "unknown"') + echo "$INPUT" | jq -r '.prompt' \ + | sed "s/^/[$(date)] [$SESSION_ID] followup: /" >> /tmp/agent-followups.log + # ==================================================================== # TURN-START - runs at the start of every model call. # Result.AdditionalContext is spliced after the invariant cache diff --git a/pkg/config/latest/types.go b/pkg/config/latest/types.go index ed4ea32fd..f722daf73 100644 --- a/pkg/config/latest/types.go +++ b/pkg/config/latest/types.go @@ -1964,6 +1964,29 @@ type HooksConfig struct { // runtime, not authored by the user. UserPromptSubmit []HookDefinition `json:"user_prompt_submit,omitempty" yaml:"user_prompt_submit,omitempty"` + // UserSteeringMessagesSubmit hooks run once each time the runtime + // drains the steering queue and appends the queued user messages to + // the session — i.e. messages the user submitted while the agent was + // already working (mid-turn, after the model stopped, or while idle + // before the first model call). The drained messages are passed in + // the steering_messages field. Like user_prompt_submit, hooks can + // block the run (decision="block" / continue=false / exit code 2) or + // contribute additional_context that is spliced into the conversation + // as a transient system message for the steered turn only — it is NOT + // persisted to the session. + UserSteeringMessagesSubmit []HookDefinition `json:"user_steering_messages_submit,omitempty" yaml:"user_steering_messages_submit,omitempty"` + + // UserFollowupSubmit hooks run once each time the runtime dequeues a + // follow-up message at the end of a turn and starts a fresh turn for + // it. Follow-ups are user messages queued for end-of-turn processing + // (the FollowUp API / queue), as opposed to mid-turn steering. The + // follow-up text is passed in the prompt field. Like + // user_prompt_submit, hooks can block the run (decision="block" / + // continue=false / exit code 2) or contribute additional_context that + // is spliced into the conversation as a transient system message for + // the follow-up turn only — it is NOT persisted to the session. + UserFollowupSubmit []HookDefinition `json:"user_followup_submit,omitempty" yaml:"user_followup_submit,omitempty"` + // TurnStart hooks run at the start of every agent turn (each model // call). Their AdditionalContext is appended as transient system // messages for that turn only — it is NOT persisted to the session, @@ -2097,6 +2120,8 @@ func (h *HooksConfig) IsEmpty() bool { len(h.PermissionRequest) == 0 && len(h.SessionStart) == 0 && len(h.UserPromptSubmit) == 0 && + len(h.UserSteeringMessagesSubmit) == 0 && + len(h.UserFollowupSubmit) == 0 && len(h.TurnStart) == 0 && len(h.TurnEnd) == 0 && len(h.BeforeLLMCall) == 0 && @@ -2249,6 +2274,20 @@ func (h *HooksConfig) Validate() error { } } + // Validate UserSteeringMessagesSubmit hooks + for i, hook := range h.UserSteeringMessagesSubmit { + if err := hook.validate("user_steering_messages_submit", i); err != nil { + return err + } + } + + // Validate UserFollowupSubmit hooks + for i, hook := range h.UserFollowupSubmit { + if err := hook.validate("user_followup_submit", i); err != nil { + return err + } + } + // Validate TurnStart hooks for i, hook := range h.TurnStart { if err := hook.validate("turn_start", i); err != nil { diff --git a/pkg/hooks/contract_widening_test.go b/pkg/hooks/contract_widening_test.go index a122ca75b..f52d52eb0 100644 --- a/pkg/hooks/contract_widening_test.go +++ b/pkg/hooks/contract_widening_test.go @@ -2,6 +2,7 @@ package hooks_test import ( "context" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -153,6 +154,72 @@ func TestUserPromptSubmitBlockProducesDenyResult(t *testing.T) { "hook must see the user prompt via Input.Prompt") } +// TestUserSteeringMessagesSubmitBlockProducesDenyResult pins the +// contract for the user_steering_messages_submit event: a hook +// returning decision="block" must produce Result.Allowed=false so the +// runtime can stop the run after draining the steering queue, and the +// drained messages must be visible via Input.SteeringMessages. +func TestUserSteeringMessagesSubmitBlockProducesDenyResult(t *testing.T) { + t.Parallel() + + r := hooks.NewRegistry() + require.NoError(t, r.RegisterBuiltin("steer_guard", func(_ context.Context, in *hooks.Input, _ []string) (*hooks.Output, error) { + return &hooks.Output{ + Decision: hooks.DecisionBlockValue, + Reason: "steering rejected: " + strings.Join(in.SteeringMessages, "|"), + }, nil + })) + + exec := hooks.NewExecutorWithRegistry(&hooks.Config{ + UserSteeringMessagesSubmit: []hooks.Hook{{ + Type: hooks.HookTypeBuiltin, + Command: "steer_guard", + }}, + }, t.TempDir(), nil, r) + + res, err := exec.Dispatch(t.Context(), hooks.EventUserSteeringMessagesSubmit, &hooks.Input{ + SessionID: "s", + SteeringMessages: []string{"wait", "do this instead"}, + }) + require.NoError(t, err) + assert.False(t, res.Allowed) + assert.Contains(t, res.Message, "do this instead", + "hook must see the drained steering messages via Input.SteeringMessages") +} + +// TestUserFollowupSubmitBlockProducesDenyResult pins the contract for +// the user_followup_submit event: a hook returning decision="block" +// must produce Result.Allowed=false so the runtime can stop the run +// after dequeuing a follow-up, and the follow-up text must be visible +// via Input.Prompt. +func TestUserFollowupSubmitBlockProducesDenyResult(t *testing.T) { + t.Parallel() + + r := hooks.NewRegistry() + require.NoError(t, r.RegisterBuiltin("followup_guard", func(_ context.Context, in *hooks.Input, _ []string) (*hooks.Output, error) { + return &hooks.Output{ + Decision: hooks.DecisionBlockValue, + Reason: "followup rejected: " + in.Prompt, + }, nil + })) + + exec := hooks.NewExecutorWithRegistry(&hooks.Config{ + UserFollowupSubmit: []hooks.Hook{{ + Type: hooks.HookTypeBuiltin, + Command: "followup_guard", + }}, + }, t.TempDir(), nil, r) + + res, err := exec.Dispatch(t.Context(), hooks.EventUserFollowupSubmit, &hooks.Input{ + SessionID: "s", + Prompt: "do the follow-up thing", + }) + require.NoError(t, err) + assert.False(t, res.Allowed) + assert.Contains(t, res.Message, "do the follow-up thing", + "hook must see the dequeued follow-up text via Input.Prompt") +} + // TestPreCompactBlockProducesDenyResult pins the contract for the // pre_compact event: a hook returning decision="block" must produce // Result.Allowed=false so summarizeWithSource skips compaction. diff --git a/pkg/hooks/executor.go b/pkg/hooks/executor.go index d5cee1336..a4f05da8c 100644 --- a/pkg/hooks/executor.go +++ b/pkg/hooks/executor.go @@ -73,30 +73,32 @@ func compileEvents(c *Config) map[EventType][]matcher { return []matcher{{hooks: hooks}} } return map[EventType][]matcher{ - EventPreToolUse: compileMatchers(c.PreToolUse), - EventPostToolUse: compileMatchers(c.PostToolUse), - EventPermissionRequest: compileMatchers(c.PermissionRequest), - EventSessionStart: flat(c.SessionStart), - EventUserPromptSubmit: flat(c.UserPromptSubmit), - EventTurnStart: flat(c.TurnStart), - EventTurnEnd: flat(c.TurnEnd), - EventBeforeLLMCall: flat(c.BeforeLLMCall), - EventAfterLLMCall: flat(c.AfterLLMCall), - EventSessionEnd: flat(c.SessionEnd), - EventPreCompact: flat(c.PreCompact), - EventSubagentStop: flat(c.SubagentStop), - EventOnUserInput: flat(c.OnUserInput), - EventStop: flat(c.Stop), - EventNotification: flat(c.Notification), - EventOnError: flat(c.OnError), - EventOnMaxIterations: flat(c.OnMaxIterations), - EventOnAgentSwitch: flat(c.OnAgentSwitch), - EventOnSessionResume: flat(c.OnSessionResume), - EventOnToolApprovalDecision: flat(c.OnToolApprovalDecision), - EventBeforeCompaction: flat(c.BeforeCompaction), - EventAfterCompaction: flat(c.AfterCompaction), - EventToolResponseTransform: compileMatchers(c.ToolResponseTransform), - EventWorktreeCreate: flat(c.WorktreeCreate), + EventPreToolUse: compileMatchers(c.PreToolUse), + EventPostToolUse: compileMatchers(c.PostToolUse), + EventPermissionRequest: compileMatchers(c.PermissionRequest), + EventSessionStart: flat(c.SessionStart), + EventUserPromptSubmit: flat(c.UserPromptSubmit), + EventUserSteeringMessagesSubmit: flat(c.UserSteeringMessagesSubmit), + EventUserFollowupSubmit: flat(c.UserFollowupSubmit), + EventTurnStart: flat(c.TurnStart), + EventTurnEnd: flat(c.TurnEnd), + EventBeforeLLMCall: flat(c.BeforeLLMCall), + EventAfterLLMCall: flat(c.AfterLLMCall), + EventSessionEnd: flat(c.SessionEnd), + EventPreCompact: flat(c.PreCompact), + EventSubagentStop: flat(c.SubagentStop), + EventOnUserInput: flat(c.OnUserInput), + EventStop: flat(c.Stop), + EventNotification: flat(c.Notification), + EventOnError: flat(c.OnError), + EventOnMaxIterations: flat(c.OnMaxIterations), + EventOnAgentSwitch: flat(c.OnAgentSwitch), + EventOnSessionResume: flat(c.OnSessionResume), + EventOnToolApprovalDecision: flat(c.OnToolApprovalDecision), + EventBeforeCompaction: flat(c.BeforeCompaction), + EventAfterCompaction: flat(c.AfterCompaction), + EventToolResponseTransform: compileMatchers(c.ToolResponseTransform), + EventWorktreeCreate: flat(c.WorktreeCreate), } } @@ -293,6 +295,8 @@ func stdoutAsContext(event EventType) bool { case EventPostToolUse, EventSessionStart, EventUserPromptSubmit, + EventUserSteeringMessagesSubmit, + EventUserFollowupSubmit, EventTurnStart, EventPreCompact, EventStop, diff --git a/pkg/hooks/types.go b/pkg/hooks/types.go index 06e16be3f..83d1f2986 100644 --- a/pkg/hooks/types.go +++ b/pkg/hooks/types.go @@ -48,6 +48,28 @@ const ( // AdditionalContext is spliced into the conversation as a transient // system message for that turn only. EventUserPromptSubmit EventType = "user_prompt_submit" + // EventUserSteeringMessagesSubmit fires once each time the runtime + // drains the steering queue and appends the queued user messages to + // the session — messages the user submitted while the agent was + // already busy (mid-turn, after the model stopped, or while idle + // before the first model call). The drained messages are delivered + // in [Input.SteeringMessages]. Returning decision="block" (or + // continue=false / exit code 2) stops the run loop. AdditionalContext + // is spliced into the conversation as a transient system message for + // the steered turn only — the same transient treatment as + // user_prompt_submit, never persisted to the session. + EventUserSteeringMessagesSubmit EventType = "user_steering_messages_submit" + // EventUserFollowupSubmit fires once each time the runtime dequeues a + // follow-up message at the end of a turn and starts a fresh turn for + // it. Follow-ups are user messages queued for end-of-turn processing + // (the FollowUp API / queue), distinct from mid-turn steering: the + // model sees them as fresh input, not an interruption. The follow-up + // text is delivered in [Input.Prompt]. Returning decision="block" (or + // continue=false / exit code 2) stops the run loop. AdditionalContext + // is spliced into the conversation as a transient system message for + // the follow-up turn only — the same transient treatment as + // user_prompt_submit, never persisted to the session. + EventUserFollowupSubmit EventType = "user_followup_submit" // EventTurnStart fires at the start of every agent turn (each model // call). AdditionalContext is injected transiently and never persisted. EventTurnStart EventType = "turn_start" @@ -248,7 +270,11 @@ type Input struct { // Stop / AfterLLMCall / SubagentStop: the model's final response content. StopResponse string `json:"stop_response,omitempty"` // UserPromptSubmit specific: the text the user just submitted. + // UserFollowupSubmit also uses this field for the dequeued follow-up text. Prompt string `json:"prompt,omitempty"` + // UserSteeringMessagesSubmit specific: the user messages the runtime + // just drained from the steering queue, in submission order. + SteeringMessages []string `json:"steering_messages,omitempty"` // SubagentStop populates [Input.AgentName] (above) with the name of // the sub-agent that just finished. // SubagentStop specific: ID of the parent session that spawned the sub-agent. diff --git a/pkg/runtime/hooks.go b/pkg/runtime/hooks.go index cca582d90..faf5ad1f2 100644 --- a/pkg/runtime/hooks.go +++ b/pkg/runtime/hooks.go @@ -542,6 +542,51 @@ func (r *LocalRuntime) executeUserPromptSubmitHooks(ctx context.Context, sess *s return false, "", contextMessages(result) } +// executeUserSteeringMessagesSubmitHooks fires +// user_steering_messages_submit each time the runtime drains the +// steering queue and appends the queued user messages to the session. +// It is the steering-queue analogue of user_prompt_submit: the drained +// messages are passed in SteeringMessages, a terminating verdict +// (decision="block" / continue=false / exit 2) stops the run loop, and +// AdditionalContext is returned as a transient system message that the +// caller threads into the steered turn only — never persisted. +func (r *LocalRuntime) executeUserSteeringMessagesSubmitHooks(ctx context.Context, sess *session.Session, a *agent.Agent, steeringMessages []string, events EventSink) (stop bool, message string, contextMsgs []chat.Message) { + result := r.dispatchHook(ctx, a, hooks.EventUserSteeringMessagesSubmit, &hooks.Input{ + SessionID: sess.ID, + SteeringMessages: steeringMessages, + }, events) + if result == nil { + return false, "", nil + } + if !result.Allowed { + return true, result.Message, nil + } + return false, "", contextMessages(result) +} + +// executeUserFollowupSubmitHooks fires user_followup_submit each time +// the runtime dequeues a follow-up message at the end of a turn and +// starts a fresh turn for it. Follow-ups are user messages queued for +// end-of-turn processing (the FollowUp API / queue), distinct from +// mid-turn steering. It mirrors user_prompt_submit: the follow-up text +// is passed in Prompt, a terminating verdict (decision="block" / +// continue=false / exit 2) stops the run loop, and AdditionalContext is +// returned as a transient system message that the caller threads into +// the follow-up turn only — never persisted. +func (r *LocalRuntime) executeUserFollowupSubmitHooks(ctx context.Context, sess *session.Session, a *agent.Agent, prompt string, events EventSink) (stop bool, message string, contextMsgs []chat.Message) { + result := r.dispatchHook(ctx, a, hooks.EventUserFollowupSubmit, &hooks.Input{ + SessionID: sess.ID, + Prompt: prompt, + }, events) + if result == nil { + return false, "", nil + } + if !result.Allowed { + return true, result.Message, nil + } + return false, "", contextMessages(result) +} + // executePreCompactHooks fires pre_compact just before compaction. // The trigger reason ("manual", "auto", "overflow", "tool_overflow") // is reported in [hooks.Input.Source]. A terminating verdict skips diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index 84b66129a..d27e7656f 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -70,21 +70,50 @@ func (r *LocalRuntime) appendSteerAndEmit(sess *session.Session, sm QueuedMessag // point that doesn't require restructuring. TUI consumers may see a trailing // newline on non-last steered messages in multi-drain batches. // -// Returns (true, messageCountBefore) if any messages were drained and emitted; -// (false, 0) otherwise. -func (r *LocalRuntime) drainAndEmitSteered(ctx context.Context, sess *session.Session, events EventSink) (bool, int) { +// After appending the drained messages it fires the +// user_steering_messages_submit hook (the steering-queue analogue of +// user_prompt_submit), passing the drained message text. The hook may +// block the run (steerResult.stop) or contribute a transient system +// message (steerResult.contextMsgs) that the caller threads into the +// steered turn only — never persisted, exactly like user_prompt_submit. +// +// Returns drained=true with messageCountBefore set when any messages +// were drained and emitted; otherwise drained=false. +func (r *LocalRuntime) drainAndEmitSteered(ctx context.Context, sess *session.Session, a *agent.Agent, events EventSink) steerResult { steered := r.steerQueue.Drain(ctx) if len(steered) == 0 { - return false, 0 + return steerResult{} } messageCountBefore := len(sess.GetAllMessages()) + contents := make([]string, 0, len(steered)) for i, sm := range steered { + contents = append(contents, sm.Content) if i < len(steered)-1 { sm = appendNewlineToQueuedMessage(sm) } r.appendSteerAndEmit(sess, sm, events) } - return true, messageCountBefore + stop, stopMsg, ctxMsgs := r.executeUserSteeringMessagesSubmitHooks(ctx, sess, a, contents, events) + return steerResult{ + drained: true, + messageCountBefore: messageCountBefore, + stop: stop, + stopMsg: stopMsg, + contextMsgs: ctxMsgs, + } +} + +// steerResult is the outcome of a drainAndEmitSteered call: whether any +// messages were drained, the pre-drain message count (for +// compactIfNeeded), and the user_steering_messages_submit hook verdict +// (a terminating stop and/or a transient context message to thread into +// the steered turn). +type steerResult struct { + drained bool + messageCountBefore int + stop bool + stopMsg string + contextMsgs []chat.Message } // appendNewlineToQueuedMessage returns sm with "\n" appended to its text @@ -382,8 +411,15 @@ func (r *LocalRuntime) runStreamLoop(ctx context.Context, sess *session.Session, // Drain steer messages queued while idle or before the first model call // (covers idle-window and first-turn-miss races). - if drained, messageCountBeforeSteer := r.drainAndEmitSteered(ctx, sess, sink); drained { - r.compactIfNeeded(ctx, sess, a, contextLimit, messageCountBeforeSteer, sink) + if sr := r.drainAndEmitSteered(ctx, sess, a, sink); sr.drained { + if sr.stop { + slog.WarnContext(ctx, "user_steering_messages_submit hook signalled run termination", + "agent", a.Name(), "session_id", sess.ID, "reason", sr.stopMsg) + r.emitHookDrivenShutdown(ctx, a, sess, sr.stopMsg, sink) + return + } + ls.userPromptMsgs = sr.contextMsgs + r.compactIfNeeded(ctx, sess, a, contextLimit, sr.messageCountBefore, sink) } // Everything from turn_start onwards is wrapped in a closure so a @@ -647,7 +683,15 @@ func (r *LocalRuntime) runTurn( ls.toolModelOverride = toolexec.ResolveModelOverride(res.Calls, agentTools) // Drain steer messages that arrived during tool calls. - if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained { + if sr := r.drainAndEmitSteered(ctx, sess, a, events); sr.drained { + if sr.stop { + slog.WarnContext(ctx, "user_steering_messages_submit hook signalled run termination", + "agent", a.Name(), "session_id", sess.ID, "reason", sr.stopMsg) + r.emitHookDrivenShutdown(ctx, a, sess, sr.stopMsg, events) + endReason = turnEndReasonHookBlocked + return turnExit + } + ls.userPromptMsgs = sr.contextMsgs r.compactIfNeeded(ctx, sess, a, contextLimit, messageCountBeforeTools, events) endReason = turnEndReasonSteered return turnContinue @@ -658,7 +702,15 @@ func (r *LocalRuntime) runTurn( r.executeStopHooks(ctx, sess, a, res.Content, events) // Re-check steer queue: closes the race between the mid-loop drain and this stop. - if drained, _ := r.drainAndEmitSteered(ctx, sess, events); drained { + if sr := r.drainAndEmitSteered(ctx, sess, a, events); sr.drained { + if sr.stop { + slog.WarnContext(ctx, "user_steering_messages_submit hook signalled run termination", + "agent", a.Name(), "session_id", sess.ID, "reason", sr.stopMsg) + r.emitHookDrivenShutdown(ctx, a, sess, sr.stopMsg, events) + endReason = turnEndReasonHookBlocked + return turnExit + } + ls.userPromptMsgs = sr.contextMsgs r.compactIfNeeded(ctx, sess, a, contextLimit, messageCountBeforeTools, events) endReason = turnEndReasonSteered return turnContinue @@ -674,6 +726,15 @@ func (r *LocalRuntime) runTurn( userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) sess.AddMessage(userMsg) events.Emit(UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1)) + stop, msg, ctxMsgs := r.executeUserFollowupSubmitHooks(ctx, sess, a, followUp.Content, events) + if stop { + slog.WarnContext(ctx, "user_followup_submit hook signalled run termination", + "agent", a.Name(), "session_id", sess.ID, "reason", msg) + r.emitHookDrivenShutdown(ctx, a, sess, msg, events) + endReason = turnEndReasonHookBlocked + return turnExit + } + ls.userPromptMsgs = ctxMsgs r.compactIfNeeded(ctx, sess, a, contextLimit, messageCountBeforeTools, events) endReason = turnEndReasonContinue return turnContinue // re-enter the loop for a new turn diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 292b84ee9..86cc3fc62 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -3204,10 +3204,10 @@ func TestDrainAndEmitSteered_MultipleMessages(t *testing.T) { sess := session.New() events := make(chan Event, 16) - drained, _ := rt.drainAndEmitSteered(t.Context(), sess, NewChannelSink(events)) + sr := rt.drainAndEmitSteered(t.Context(), sess, root, NewChannelSink(events)) close(events) - assert.True(t, drained, "should report messages were drained") + assert.True(t, sr.drained, "should report messages were drained") // Three separate session messages must have been added. var userMsgs []string @@ -3268,10 +3268,10 @@ func TestDrainAndEmitSteered_MultiContent(t *testing.T) { sess := session.New() events := make(chan Event, 16) - drained, _ := rt.drainAndEmitSteered(t.Context(), sess, NewChannelSink(events)) + sr := rt.drainAndEmitSteered(t.Context(), sess, root, NewChannelSink(events)) close(events) - assert.True(t, drained) + assert.True(t, sr.drained) // Two session messages. var items []session.Item diff --git a/pkg/runtime/user_prompt_submit_test.go b/pkg/runtime/user_prompt_submit_test.go index 4a977df50..96c7f7082 100644 --- a/pkg/runtime/user_prompt_submit_test.go +++ b/pkg/runtime/user_prompt_submit_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/docker/docker-agent/pkg/agent" + "github.com/docker/docker-agent/pkg/chat" "github.com/docker/docker-agent/pkg/config/latest" "github.com/docker/docker-agent/pkg/hooks" "github.com/docker/docker-agent/pkg/session" @@ -58,6 +59,68 @@ func TestUserPromptSubmitSkippedForSubSessions(t *testing.T) { "their kick-off message is synthesised by the runtime, not authored by a human") } +// TestUserSteeringMessagesSubmitFiresOnDrain pins the contract that +// user_steering_messages_submit fires when the runtime drains the +// steering queue, and that the hook receives the drained messages via +// Input.SteeringMessages. Enqueuing before RunStream exercises the +// idle/first-turn drain site, which is the one user_prompt_submit does +// NOT cover. +func TestUserSteeringMessagesSubmitFiresOnDrain(t *testing.T) { + t.Parallel() + + const counterName = "test-user-steering-submit-counter" + var calls atomic.Int32 + var seen atomic.Value + + stream := newStreamBuilder(). + AddContent("ok"). + AddStopWithUsage(3, 2). + Build() + prov := &mockProvider{id: "test/mock-model", stream: stream} + + root := agent.New("root", "test agent", + agent.WithModel(prov), + agent.WithHooks(&latest.HooksConfig{ + UserSteeringMessagesSubmit: []latest.HookDefinition{ + {Type: "builtin", Command: counterName}, + }, + }), + ) + tm := team.New(team.WithAgents(root)) + + rt, err := NewLocalRuntime(tm, + WithSessionCompaction(false), + WithModelStore(mockModelStore{}), + ) + require.NoError(t, err) + + require.NoError(t, rt.hooksRegistry.RegisterBuiltin( + counterName, + func(_ context.Context, in *hooks.Input, _ []string) (*hooks.Output, error) { + calls.Add(1) + seen.Store(append([]string(nil), in.SteeringMessages...)) + return nil, nil + }, + )) + + // Enqueue before RunStream so the messages are drained at the + // idle/first-turn drain site at the top of the run loop. + require.NoError(t, rt.Steer(QueuedMessage{Content: "steer one"})) + require.NoError(t, rt.Steer(QueuedMessage{Content: "steer two"})) + + sess := session.New() + sess.Title = "Unit Test" + + for range rt.RunStream(t.Context(), sess) { + } + + assert.Equal(t, int32(1), calls.Load(), + "user_steering_messages_submit must fire once for the drained batch") + got, _ := seen.Load().([]string) + assert.Equal(t, []string{"steer one", "steer two"}, got, + "hook must receive the drained messages via Input.SteeringMessages, in order") +} + // setupUserPromptSubmitCounter wires up a single-turn mock runtime with // a builtin user_prompt_submit hook that atomically increments the // returned counter on every dispatch. Both tests above share this @@ -104,3 +167,71 @@ func setupUserPromptSubmitCounter(t *testing.T, opts ...session.Opt) (*atomic.In return &calls, rt, sess } + +// TestUserFollowupSubmitFiresOnDequeue pins the contract that +// user_followup_submit fires when the runtime dequeues a follow-up +// message at the end of a turn, and that the hook receives the +// follow-up text via Input.Prompt. A follow-up enqueued before +// RunStream is dequeued when the first turn stops, starting a fresh +// turn — the path user_prompt_submit never covers. +func TestUserFollowupSubmitFiresOnDequeue(t *testing.T) { + t.Parallel() + + const counterName = "test-user-followup-submit-counter" + var calls atomic.Int32 + var seen atomic.Value + + // Two turns: the first stops, the runtime dequeues the follow-up and + // runs a second turn which also stops (queue now empty). + newStopStream := func() *mockStream { + return newStreamBuilder(). + AddContent("ok"). + AddStopWithUsage(3, 2). + Build() + } + prov := &queueProvider{ + id: "test/mock-model", + streams: []chat.MessageStream{newStopStream(), newStopStream()}, + } + + root := agent.New("root", "test agent", + agent.WithModel(prov), + agent.WithHooks(&latest.HooksConfig{ + UserFollowupSubmit: []latest.HookDefinition{ + {Type: "builtin", Command: counterName}, + }, + }), + ) + tm := team.New(team.WithAgents(root)) + + rt, err := NewLocalRuntime(tm, + WithSessionCompaction(false), + WithModelStore(mockModelStore{}), + ) + require.NoError(t, err) + + require.NoError(t, rt.hooksRegistry.RegisterBuiltin( + counterName, + func(_ context.Context, in *hooks.Input, _ []string) (*hooks.Output, error) { + calls.Add(1) + seen.Store(in.Prompt) + return nil, nil + }, + )) + + // Enqueue before RunStream so the follow-up is dequeued when the + // first turn stops. + require.NoError(t, rt.FollowUp(QueuedMessage{Content: "please also do this"})) + + sess := session.New(session.WithUserMessage("hi")) + sess.Title = "Unit Test" + + for range rt.RunStream(t.Context(), sess) { + } + + assert.Equal(t, int32(1), calls.Load(), + "user_followup_submit must fire once for the dequeued follow-up") + got, _ := seen.Load().(string) + assert.Equal(t, "please also do this", got, + "hook must receive the follow-up text via Input.Prompt") +}