Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/claude_agent_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
ContextUsageResponse,
DeferredToolUse,
EffortLevel,
GovernanceDecision,
GovernanceHook,
HookCallback,
HookContext,
HookEventMessage,
Expand Down Expand Up @@ -577,6 +579,9 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"ContentBlock",
"ContextUsageCategory",
"ContextUsageResponse",
# Governance hooks
"GovernanceDecision",
"GovernanceHook",
# Tool callbacks
"CanUseTool",
"ToolPermissionContext",
Expand Down
121 changes: 95 additions & 26 deletions src/claude_agent_sdk/_internal/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from .._errors import ProcessError
from ..types import (
GovernanceDecision,
GovernanceHook,
PermissionMode,
PermissionResultAllow,
PermissionResultDeny,
Expand Down Expand Up @@ -84,6 +86,7 @@ def __init__(
agents: dict[str, dict[str, Any]] | None = None,
exclude_dynamic_sections: bool | None = None,
skills: list[str] | Literal["all"] | None = None,
governance_hook: GovernanceHook | None = None,
):
"""Initialize Query with transport and callbacks.

Expand All @@ -99,11 +102,14 @@ def __init__(
initialize (see ``SystemPromptPreset``)
skills: Optional skill allowlist to send via initialize so the CLI
can filter which skills are loaded into the system prompt
governance_hook: Optional policy-as-code hook called before every
tool call to allow, block, or rewrite the tool input
"""
self._initialize_timeout = initialize_timeout
self.transport = transport
self.is_streaming_mode = is_streaming_mode
self.can_use_tool = can_use_tool
self.governance_hook = governance_hook
self.hooks = hooks or {}
self.sdk_mcp_servers = sdk_mcp_servers or {}
self._agents = agents
Expand Down Expand Up @@ -405,36 +411,99 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None:
description=permission_request.get("description"),
)

response = await self.can_use_tool(
permission_request["tool_name"],
permission_request["input"],
context,
)
# Run governance hook first (before can_use_tool) when present.
# The hook may block the call or rewrite the input.
effective_input = original_input
if self.governance_hook is not None:
raw_decision = self.governance_hook(
permission_request["tool_name"],
original_input,
context,
)
# Support both sync and async callables.
decision: GovernanceDecision
if hasattr(raw_decision, "__await__"):
decision = await raw_decision # type: ignore[misc]
else:
decision = raw_decision # type: ignore[misc]

# Convert PermissionResult to expected dict format
if isinstance(response, PermissionResultAllow):
response_data = {
"behavior": "allow",
"updatedInput": (
response.updated_input
if response.updated_input is not None
else original_input
),
}
if response.updated_permissions is not None:
response_data["updatedPermissions"] = [
permission.to_dict()
for permission in response.updated_permissions
]
elif isinstance(response, PermissionResultDeny):
response_data = {"behavior": "deny", "message": response.message}
if response.interrupt:
response_data["interrupt"] = response.interrupt
if not decision["allowed"]:
# Block the tool call – send a deny with the reason.
reason = decision.get("reason", "Blocked by governance policy")
response_data = {"behavior": "deny", "message": reason}
else:
# Allow – honour modified_input when provided.
modified = decision.get("modified_input")
if modified is not None:
effective_input = modified
# Fall through to can_use_tool with (possibly modified) input.
response = await self.can_use_tool(
permission_request["tool_name"],
effective_input,
context,
)
if isinstance(response, PermissionResultAllow):
response_data = {
"behavior": "allow",
"updatedInput": (
response.updated_input
if response.updated_input is not None
else effective_input
),
}
if response.updated_permissions is not None:
response_data["updatedPermissions"] = [
p.to_dict() for p in response.updated_permissions
]
elif isinstance(response, PermissionResultDeny):
response_data = {
"behavior": "deny",
"message": response.message,
}
if response.interrupt:
response_data["interrupt"] = response.interrupt
else:
raise TypeError(
"Tool permission callback must return "
"PermissionResult (PermissionResultAllow or "
f"PermissionResultDeny), got {type(response)}"
)
else:
raise TypeError(
f"Tool permission callback must return PermissionResult (PermissionResultAllow or PermissionResultDeny), got {type(response)}"
response = await self.can_use_tool(
permission_request["tool_name"],
permission_request["input"],
context,
)

# Convert PermissionResult to expected dict format
if isinstance(response, PermissionResultAllow):
response_data = {
"behavior": "allow",
"updatedInput": (
response.updated_input
if response.updated_input is not None
else original_input
),
}
if response.updated_permissions is not None:
response_data["updatedPermissions"] = [
permission.to_dict()
for permission in response.updated_permissions
]
elif isinstance(response, PermissionResultDeny):
response_data = {
"behavior": "deny",
"message": response.message,
}
if response.interrupt:
response_data["interrupt"] = response.interrupt
else:
raise TypeError(
"Tool permission callback must return PermissionResult"
" (PermissionResultAllow or PermissionResultDeny),"
f" got {type(response)}"
)

elif subtype == "hook_callback":
hook_callback_request: SDKHookCallbackRequest = request_data # type: ignore[assignment]
# Handle hook callback
Expand Down
28 changes: 25 additions & 3 deletions src/claude_agent_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,24 @@ async def _connect_inner(
from ._internal.transport.subprocess_cli import SubprocessCLITransport

# Validate and configure permission settings (matching TypeScript SDK logic)
if self.options.can_use_tool:
#
# When governance_hook is set without can_use_tool, install a default
# pass-through can_use_tool so the governance hook fires for every tool
# call that reaches the control protocol.
resolved_can_use_tool = self.options.can_use_tool
if self.options.governance_hook is not None and resolved_can_use_tool is None:
from .types import PermissionResultAllow, ToolPermissionContext

async def _passthrough_can_use_tool(
_tool_name: str,
_tool_input: dict[str, Any],
_context: ToolPermissionContext,
) -> PermissionResultAllow:
return PermissionResultAllow()

resolved_can_use_tool = _passthrough_can_use_tool

if resolved_can_use_tool:
# canUseTool callback requires streaming mode (AsyncIterable prompt)
if isinstance(prompt, str):
raise ValueError(
Expand All @@ -173,7 +190,11 @@ async def _connect_inner(
)

# Automatically set permission_prompt_tool_name to "stdio" for control protocol
options = replace(self.options, permission_prompt_tool_name="stdio")
options = replace(
self.options,
permission_prompt_tool_name="stdio",
can_use_tool=resolved_can_use_tool,
)
else:
options = self.options

Expand Down Expand Up @@ -225,7 +246,7 @@ async def _connect_inner(
self._query = Query(
transport=self._transport,
is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode
can_use_tool=self.options.can_use_tool,
can_use_tool=options.can_use_tool,
hooks=self._convert_hooks_to_internal_format(self.options.hooks)
if self.options.hooks
else None,
Expand All @@ -234,6 +255,7 @@ async def _connect_inner(
agents=agents_dict,
exclude_dynamic_sections=exclude_dynamic_sections,
skills=self.options.skills,
governance_hook=self.options.governance_hook,
)

if self.options.session_store is not None:
Expand Down
127 changes: 127 additions & 0 deletions src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,30 @@ def from_dict(cls, data: dict[str, Any]) -> "PermissionUpdate":
)


# Governance hook types
class GovernanceDecision(TypedDict, total=False):
"""Decision returned by a governance hook for a pending tool call.

Fields:
allowed: Whether the tool call is permitted to proceed. Required.
reason: Optional human-readable explanation for the decision. When the
tool call is blocked (``allowed=False``) this message is forwarded
to the model as a rejection notice.
modified_input: If provided and ``allowed=True``, the tool executes with
this dict instead of the original input. Ignored when ``allowed=False``.
"""

allowed: Required[bool]
reason: str
modified_input: dict[str, Any]


GovernanceHook = Callable[
[str, dict[str, Any], "ToolPermissionContext"],
"GovernanceDecision | Awaitable[GovernanceDecision]",
]


# Tool callback types
@dataclass
class ToolPermissionContext:
Expand Down Expand Up @@ -1313,13 +1337,38 @@ class HookEventMessage(SystemMessage):
uuid: str | None = None


@dataclass
class SubagentTokenUsageEvent:
"""Token usage event emitted mid-stream per subagent during execution."""

input_tokens: int
output_tokens: int
session_id: str
uuid: str
cache_read_tokens: int = 0
cache_creation_tokens: int = 0
agent_id: str | None = None
timestamp: str | None = None


@dataclass
class BackgroundTaskLateCompletionEvent:
"""Synthetic SDK event emitted when a background task completes after turn boundary."""

task_id: str
status: str
source_message: "TaskNotificationMessage | TaskUpdatedMessage"


Message = (
UserMessage
| AssistantMessage
| SystemMessage
| ResultMessage
| StreamEvent
| RateLimitEvent
| SubagentTokenUsageEvent
| BackgroundTaskLateCompletionEvent
)


Expand Down Expand Up @@ -1812,6 +1861,30 @@ class ClaudeAgentOptions:
``PreToolUse`` hook via ``hooks`` instead.
"""

governance_hook: "GovernanceHook | None" = None
"""Policy-as-code hook that authorizes or blocks every tool call before execution.

Called before ``can_use_tool`` for **every** pending tool call, regardless of
permission rules or ``allowed_tools``. Receives the tool name, input dict, and
a :class:`ToolPermissionContext` and must return a :class:`GovernanceDecision`.

- ``allowed=True`` — the tool call proceeds (optionally with ``modified_input``).
- ``allowed=False`` — the tool call is blocked; the model receives ``reason`` as
a rejection message.
- ``modified_input`` — when ``allowed=True`` and this field is set, the tool
executes with this dict instead of the original input.

Both synchronous and asynchronous callables are accepted::

def my_policy(tool_name, tool_input, ctx):
if tool_name == "Bash" and "rm" in tool_input.get("command", ""):
return GovernanceDecision(allowed=False, reason="rm commands are blocked")
return GovernanceDecision(allowed=True)

When ``governance_hook`` is set without ``can_use_tool``, a default pass-through
``can_use_tool`` is installed automatically so the hook fires for every tool call.
"""

hooks: dict[HookEvent, list[HookMatcher]] | None = None
"""Hook callbacks for responding to various events during execution.

Expand Down Expand Up @@ -1993,6 +2066,60 @@ class ClaudeAgentOptions:
header.
"""

on_compaction_start: "Callable[[CompactionEvent], Awaitable[None]] | None" = None
"""Async callback invoked just before context compaction begins."""

on_compaction_end: "Callable[[CompactionEvent], Awaitable[None]] | None" = None
"""Async callback invoked just after context compaction completes."""

on_context_window_threshold: "Callable[[ContextWindowThresholdEvent], Awaitable[None]] | None" = None
"""Async callback invoked when context window usage exceeds the configured threshold."""

context_window_threshold_pct: float = 0.8
"""Fraction (0.0-1.0) at which on_context_window_threshold fires. Default 0.8."""

context_window_size: int | None = None
"""Model maximum context window size in tokens. Required for threshold tracking."""


# ---------------------------------------------------------------------------
# Session lifecycle event types
# ---------------------------------------------------------------------------


@dataclass
class CompactionEvent:
"""Event passed to on_compaction_start and on_compaction_end callbacks.

Attributes:
trigger: Why compaction was triggered: "auto" or "manual".
custom_instructions: Custom compaction instructions if any.
session_id: Session identifier.
raw: Raw data dict from the CLI PreCompact hook payload.
"""

trigger: str
session_id: str | None = None
raw: dict[str, Any] = field(default_factory=dict)
custom_instructions: str | None = None


@dataclass
class ContextWindowThresholdEvent:
"""Event passed to on_context_window_threshold callback.

Attributes:
pct_used: Fraction of context window used (0.0-1.0).
tokens_used: Total tokens currently filling the context window.
session_id: Session identifier.
raw_usage: Raw usage dict from the AssistantMessage.
"""

pct_used: float
tokens_used: int
session_id: str | None = None
raw_usage: dict[str, Any] = field(default_factory=dict)


# SDK Control Protocol
class SDKControlInterruptRequest(TypedDict):
Expand Down
Loading