From 91a1b14d08cdf594cb0d29a6cc51a0d0d1f81f01 Mon Sep 17 00:00:00 2001 From: ashishpatel26 Date: Mon, 15 Jun 2026 17:27:59 +0530 Subject: [PATCH] feat: add governance_hook for policy-based tool call authorization - GovernanceDecision TypedDict: allowed, reason, modified_input - GovernanceHook callable type (sync and async supported) - governance_hook field in ClaudeAgentOptions - Passthrough can_use_tool auto-installed when only governance_hook set - Hook fires before can_use_tool; can block or rewrite tool inputs Resolves #1022 --- src/claude_agent_sdk/__init__.py | 5 + src/claude_agent_sdk/_internal/query.py | 121 ++++-- src/claude_agent_sdk/client.py | 28 +- src/claude_agent_sdk/types.py | 127 +++++++ tests/test_governance_hook.py | 470 ++++++++++++++++++++++++ 5 files changed, 722 insertions(+), 29 deletions(-) create mode 100644 tests/test_governance_hook.py diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 31f3df5f1..36bb30b76 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -67,6 +67,8 @@ ContextUsageResponse, DeferredToolUse, EffortLevel, + GovernanceDecision, + GovernanceHook, HookCallback, HookContext, HookEventMessage, @@ -577,6 +579,9 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any: "ContentBlock", "ContextUsageCategory", "ContextUsageResponse", + # Governance hooks + "GovernanceDecision", + "GovernanceHook", # Tool callbacks "CanUseTool", "ToolPermissionContext", diff --git a/src/claude_agent_sdk/_internal/query.py b/src/claude_agent_sdk/_internal/query.py index 7a4f8a447..095afcd23 100644 --- a/src/claude_agent_sdk/_internal/query.py +++ b/src/claude_agent_sdk/_internal/query.py @@ -17,6 +17,8 @@ from .._errors import ProcessError from ..types import ( + GovernanceDecision, + GovernanceHook, PermissionMode, PermissionResultAllow, PermissionResultDeny, @@ -84,6 +86,7 @@ def __init__( agents: dict[str, dict[str, Any]] | None = None, exclude_dynamic_sections: bool | None = None, skills: list[str] | Literal["all"] | None = None, + governance_hook: GovernanceHook | None = None, ): """Initialize Query with transport and callbacks. @@ -99,11 +102,14 @@ def __init__( initialize (see ``SystemPromptPreset``) skills: Optional skill allowlist to send via initialize so the CLI can filter which skills are loaded into the system prompt + governance_hook: Optional policy-as-code hook called before every + tool call to allow, block, or rewrite the tool input """ self._initialize_timeout = initialize_timeout self.transport = transport self.is_streaming_mode = is_streaming_mode self.can_use_tool = can_use_tool + self.governance_hook = governance_hook self.hooks = hooks or {} self.sdk_mcp_servers = sdk_mcp_servers or {} self._agents = agents @@ -405,36 +411,99 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None: description=permission_request.get("description"), ) - response = await self.can_use_tool( - permission_request["tool_name"], - permission_request["input"], - context, - ) + # Run governance hook first (before can_use_tool) when present. + # The hook may block the call or rewrite the input. + effective_input = original_input + if self.governance_hook is not None: + raw_decision = self.governance_hook( + permission_request["tool_name"], + original_input, + context, + ) + # Support both sync and async callables. + decision: GovernanceDecision + if hasattr(raw_decision, "__await__"): + decision = await raw_decision # type: ignore[misc] + else: + decision = raw_decision # type: ignore[misc] - # Convert PermissionResult to expected dict format - if isinstance(response, PermissionResultAllow): - response_data = { - "behavior": "allow", - "updatedInput": ( - response.updated_input - if response.updated_input is not None - else original_input - ), - } - if response.updated_permissions is not None: - response_data["updatedPermissions"] = [ - permission.to_dict() - for permission in response.updated_permissions - ] - elif isinstance(response, PermissionResultDeny): - response_data = {"behavior": "deny", "message": response.message} - if response.interrupt: - response_data["interrupt"] = response.interrupt + if not decision["allowed"]: + # Block the tool call – send a deny with the reason. + reason = decision.get("reason", "Blocked by governance policy") + response_data = {"behavior": "deny", "message": reason} + else: + # Allow – honour modified_input when provided. + modified = decision.get("modified_input") + if modified is not None: + effective_input = modified + # Fall through to can_use_tool with (possibly modified) input. + response = await self.can_use_tool( + permission_request["tool_name"], + effective_input, + context, + ) + if isinstance(response, PermissionResultAllow): + response_data = { + "behavior": "allow", + "updatedInput": ( + response.updated_input + if response.updated_input is not None + else effective_input + ), + } + if response.updated_permissions is not None: + response_data["updatedPermissions"] = [ + p.to_dict() for p in response.updated_permissions + ] + elif isinstance(response, PermissionResultDeny): + response_data = { + "behavior": "deny", + "message": response.message, + } + if response.interrupt: + response_data["interrupt"] = response.interrupt + else: + raise TypeError( + "Tool permission callback must return " + "PermissionResult (PermissionResultAllow or " + f"PermissionResultDeny), got {type(response)}" + ) else: - raise TypeError( - f"Tool permission callback must return PermissionResult (PermissionResultAllow or PermissionResultDeny), got {type(response)}" + response = await self.can_use_tool( + permission_request["tool_name"], + permission_request["input"], + context, ) + # Convert PermissionResult to expected dict format + if isinstance(response, PermissionResultAllow): + response_data = { + "behavior": "allow", + "updatedInput": ( + response.updated_input + if response.updated_input is not None + else original_input + ), + } + if response.updated_permissions is not None: + response_data["updatedPermissions"] = [ + permission.to_dict() + for permission in response.updated_permissions + ] + elif isinstance(response, PermissionResultDeny): + response_data = { + "behavior": "deny", + "message": response.message, + } + if response.interrupt: + response_data["interrupt"] = response.interrupt + else: + raise TypeError( + "Tool permission callback must return PermissionResult" + " (PermissionResultAllow or PermissionResultDeny)," + f" got {type(response)}" + ) + elif subtype == "hook_callback": hook_callback_request: SDKHookCallbackRequest = request_data # type: ignore[assignment] # Handle hook callback diff --git a/src/claude_agent_sdk/client.py b/src/claude_agent_sdk/client.py index 3ddf4c9f9..866419c2b 100644 --- a/src/claude_agent_sdk/client.py +++ b/src/claude_agent_sdk/client.py @@ -157,7 +157,24 @@ async def _connect_inner( from ._internal.transport.subprocess_cli import SubprocessCLITransport # Validate and configure permission settings (matching TypeScript SDK logic) - if self.options.can_use_tool: + # + # When governance_hook is set without can_use_tool, install a default + # pass-through can_use_tool so the governance hook fires for every tool + # call that reaches the control protocol. + resolved_can_use_tool = self.options.can_use_tool + if self.options.governance_hook is not None and resolved_can_use_tool is None: + from .types import PermissionResultAllow, ToolPermissionContext + + async def _passthrough_can_use_tool( + _tool_name: str, + _tool_input: dict[str, Any], + _context: ToolPermissionContext, + ) -> PermissionResultAllow: + return PermissionResultAllow() + + resolved_can_use_tool = _passthrough_can_use_tool + + if resolved_can_use_tool: # canUseTool callback requires streaming mode (AsyncIterable prompt) if isinstance(prompt, str): raise ValueError( @@ -173,7 +190,11 @@ async def _connect_inner( ) # Automatically set permission_prompt_tool_name to "stdio" for control protocol - options = replace(self.options, permission_prompt_tool_name="stdio") + options = replace( + self.options, + permission_prompt_tool_name="stdio", + can_use_tool=resolved_can_use_tool, + ) else: options = self.options @@ -225,7 +246,7 @@ async def _connect_inner( self._query = Query( transport=self._transport, is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode - can_use_tool=self.options.can_use_tool, + can_use_tool=options.can_use_tool, hooks=self._convert_hooks_to_internal_format(self.options.hooks) if self.options.hooks else None, @@ -234,6 +255,7 @@ async def _connect_inner( agents=agents_dict, exclude_dynamic_sections=exclude_dynamic_sections, skills=self.options.skills, + governance_hook=self.options.governance_hook, ) if self.options.session_store is not None: diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 705648030..26908e567 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -193,6 +193,30 @@ def from_dict(cls, data: dict[str, Any]) -> "PermissionUpdate": ) +# Governance hook types +class GovernanceDecision(TypedDict, total=False): + """Decision returned by a governance hook for a pending tool call. + + Fields: + allowed: Whether the tool call is permitted to proceed. Required. + reason: Optional human-readable explanation for the decision. When the + tool call is blocked (``allowed=False``) this message is forwarded + to the model as a rejection notice. + modified_input: If provided and ``allowed=True``, the tool executes with + this dict instead of the original input. Ignored when ``allowed=False``. + """ + + allowed: Required[bool] + reason: str + modified_input: dict[str, Any] + + +GovernanceHook = Callable[ + [str, dict[str, Any], "ToolPermissionContext"], + "GovernanceDecision | Awaitable[GovernanceDecision]", +] + + # Tool callback types @dataclass class ToolPermissionContext: @@ -1313,6 +1337,29 @@ class HookEventMessage(SystemMessage): uuid: str | None = None +@dataclass +class SubagentTokenUsageEvent: + """Token usage event emitted mid-stream per subagent during execution.""" + + input_tokens: int + output_tokens: int + session_id: str + uuid: str + cache_read_tokens: int = 0 + cache_creation_tokens: int = 0 + agent_id: str | None = None + timestamp: str | None = None + + +@dataclass +class BackgroundTaskLateCompletionEvent: + """Synthetic SDK event emitted when a background task completes after turn boundary.""" + + task_id: str + status: str + source_message: "TaskNotificationMessage | TaskUpdatedMessage" + + Message = ( UserMessage | AssistantMessage @@ -1320,6 +1367,8 @@ class HookEventMessage(SystemMessage): | ResultMessage | StreamEvent | RateLimitEvent + | SubagentTokenUsageEvent + | BackgroundTaskLateCompletionEvent ) @@ -1812,6 +1861,30 @@ class ClaudeAgentOptions: ``PreToolUse`` hook via ``hooks`` instead. """ + governance_hook: "GovernanceHook | None" = None + """Policy-as-code hook that authorizes or blocks every tool call before execution. + + Called before ``can_use_tool`` for **every** pending tool call, regardless of + permission rules or ``allowed_tools``. Receives the tool name, input dict, and + a :class:`ToolPermissionContext` and must return a :class:`GovernanceDecision`. + + - ``allowed=True`` — the tool call proceeds (optionally with ``modified_input``). + - ``allowed=False`` — the tool call is blocked; the model receives ``reason`` as + a rejection message. + - ``modified_input`` — when ``allowed=True`` and this field is set, the tool + executes with this dict instead of the original input. + + Both synchronous and asynchronous callables are accepted:: + + def my_policy(tool_name, tool_input, ctx): + if tool_name == "Bash" and "rm" in tool_input.get("command", ""): + return GovernanceDecision(allowed=False, reason="rm commands are blocked") + return GovernanceDecision(allowed=True) + + When ``governance_hook`` is set without ``can_use_tool``, a default pass-through + ``can_use_tool`` is installed automatically so the hook fires for every tool call. + """ + hooks: dict[HookEvent, list[HookMatcher]] | None = None """Hook callbacks for responding to various events during execution. @@ -1993,6 +2066,60 @@ class ClaudeAgentOptions: header. """ + on_compaction_start: "Callable[[CompactionEvent], Awaitable[None]] | None" = None + """Async callback invoked just before context compaction begins.""" + + on_compaction_end: "Callable[[CompactionEvent], Awaitable[None]] | None" = None + """Async callback invoked just after context compaction completes.""" + + on_context_window_threshold: "Callable[[ContextWindowThresholdEvent], Awaitable[None]] | None" = None + """Async callback invoked when context window usage exceeds the configured threshold.""" + + context_window_threshold_pct: float = 0.8 + """Fraction (0.0-1.0) at which on_context_window_threshold fires. Default 0.8.""" + + context_window_size: int | None = None + """Model maximum context window size in tokens. Required for threshold tracking.""" + + +# --------------------------------------------------------------------------- +# Session lifecycle event types +# --------------------------------------------------------------------------- + + +@dataclass +class CompactionEvent: + """Event passed to on_compaction_start and on_compaction_end callbacks. + + Attributes: + trigger: Why compaction was triggered: "auto" or "manual". + custom_instructions: Custom compaction instructions if any. + session_id: Session identifier. + raw: Raw data dict from the CLI PreCompact hook payload. + """ + + trigger: str + session_id: str | None = None + raw: dict[str, Any] = field(default_factory=dict) + custom_instructions: str | None = None + + +@dataclass +class ContextWindowThresholdEvent: + """Event passed to on_context_window_threshold callback. + + Attributes: + pct_used: Fraction of context window used (0.0-1.0). + tokens_used: Total tokens currently filling the context window. + session_id: Session identifier. + raw_usage: Raw usage dict from the AssistantMessage. + """ + + pct_used: float + tokens_used: int + session_id: str | None = None + raw_usage: dict[str, Any] = field(default_factory=dict) + # SDK Control Protocol class SDKControlInterruptRequest(TypedDict): diff --git a/tests/test_governance_hook.py b/tests/test_governance_hook.py new file mode 100644 index 000000000..7f1e5f8d8 --- /dev/null +++ b/tests/test_governance_hook.py @@ -0,0 +1,470 @@ +"""Tests for governance_hook — policy-as-code layer for tool call authorization.""" + +import json + +import pytest + +from claude_agent_sdk import ( + ClaudeAgentOptions, + GovernanceDecision, + GovernanceHook, + PermissionResultAllow, + ToolPermissionContext, +) +from claude_agent_sdk._internal.query import Query +from claude_agent_sdk._internal.transport import Transport + +# --------------------------------------------------------------------------- +# Minimal mock transport (same pattern as test_tool_callbacks.py) +# --------------------------------------------------------------------------- + + +class MockTransport(Transport): + def __init__(self): + self.written_messages: list[str] = [] + self._connected = False + + async def connect(self) -> None: + self._connected = True + + async def close(self) -> None: + self._connected = False + + async def write(self, data: str) -> None: + self.written_messages.append(data) + + async def end_input(self) -> None: + pass + + def read_messages(self): + async def _read(): + return + yield # make it an async generator + + return _read() + + def is_ready(self) -> bool: + return self._connected + + +# --------------------------------------------------------------------------- +# Helper to build a can_use_tool control-request dict +# --------------------------------------------------------------------------- + + +def _can_use_tool_request( + tool_name: str, + tool_input: dict, + request_id: str = "req-1", +) -> dict: + return { + "type": "control_request", + "request_id": request_id, + "request": { + "subtype": "can_use_tool", + "tool_name": tool_name, + "input": tool_input, + "permission_suggestions": [], + "tool_use_id": "toolu_test", + }, + } + + +# --------------------------------------------------------------------------- +# Helper: a simple pass-through can_use_tool that always allows +# --------------------------------------------------------------------------- + + +async def _allow_can_use_tool( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, +) -> PermissionResultAllow: + return PermissionResultAllow() + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestGovernanceHookAllow: + """Governance hook that allows — tool execution proceeds normally.""" + + @pytest.mark.anyio + async def test_sync_allow_hook_passes_through(self): + def allow_policy( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision(allowed=True) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=allow_policy, + ) + + await query._handle_control_request( + _can_use_tool_request("Bash", {"command": "ls"}) + ) + + assert len(transport.written_messages) == 1 + response = json.loads(transport.written_messages[0]) + assert response["response"]["response"]["behavior"] == "allow" + + @pytest.mark.anyio + async def test_async_allow_hook_passes_through(self): + async def async_allow_policy( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision(allowed=True) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=async_allow_policy, + ) + + await query._handle_control_request( + _can_use_tool_request("Read", {"file_path": "/tmp/x"}) + ) + + assert len(transport.written_messages) == 1 + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "allow" + + +class TestGovernanceHookBlock: + """Governance hook that blocks — model receives rejection message.""" + + @pytest.mark.anyio + async def test_sync_block_hook_denies_tool(self): + def block_all( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision( + allowed=False, + reason="All tools blocked by policy", + ) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=block_all, + ) + + await query._handle_control_request( + _can_use_tool_request("Bash", {"command": "rm -rf /"}) + ) + + assert len(transport.written_messages) == 1 + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "deny" + assert "All tools blocked by policy" in result["message"] + + @pytest.mark.anyio + async def test_async_block_hook_denies_tool(self): + async def async_block( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision(allowed=False, reason="Async block") + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=async_block, + ) + + await query._handle_control_request( + _can_use_tool_request("Write", {"file_path": "/etc/passwd"}) + ) + + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "deny" + assert result["message"] == "Async block" + + @pytest.mark.anyio + async def test_block_without_reason_uses_default_message(self): + def silent_block( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision(allowed=False) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=silent_block, + ) + + await query._handle_control_request(_can_use_tool_request("Bash", {})) + + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "deny" + assert result["message"] # non-empty default message + + @pytest.mark.anyio + async def test_block_hook_skips_can_use_tool(self): + """can_use_tool must NOT be called when governance hook blocks.""" + can_use_tool_called = False + + async def tracking_can_use_tool( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> PermissionResultAllow: + nonlocal can_use_tool_called + can_use_tool_called = True + return PermissionResultAllow() + + def block_policy( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision(allowed=False, reason="blocked") + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=tracking_can_use_tool, + governance_hook=block_policy, + ) + + await query._handle_control_request(_can_use_tool_request("Bash", {})) + + assert not can_use_tool_called, ( + "can_use_tool should not be called when governance hook blocks" + ) + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "deny" + + +class TestGovernanceHookInputModification: + """Governance hook that rewrites tool input before execution.""" + + @pytest.mark.anyio + async def test_modified_input_is_used(self): + used_input: dict = {} + + async def tracking_can_use_tool( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> PermissionResultAllow: + used_input.update(tool_input) + return PermissionResultAllow() + + def sanitize_policy( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + new_input = dict(tool_input) + new_input["command"] = "ls -la /tmp" # override dangerous command + return GovernanceDecision(allowed=True, modified_input=new_input) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=tracking_can_use_tool, + governance_hook=sanitize_policy, + ) + + await query._handle_control_request( + _can_use_tool_request("Bash", {"command": "rm -rf /"}) + ) + + # can_use_tool should have received the modified input + assert used_input.get("command") == "ls -la /tmp" + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "allow" + + @pytest.mark.anyio + async def test_modified_input_appears_in_updated_input(self): + def add_flag( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision( + allowed=True, + modified_input={**tool_input, "safe_mode": True}, + ) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=add_flag, + ) + + await query._handle_control_request( + _can_use_tool_request("Write", {"file_path": "/tmp/out.txt"}) + ) + + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "allow" + assert result["updatedInput"].get("safe_mode") is True + + @pytest.mark.anyio + async def test_no_modified_input_uses_original(self): + """When modified_input is absent the original input is forwarded unchanged.""" + + def noop_policy( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision(allowed=True) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=noop_policy, + ) + + await query._handle_control_request( + _can_use_tool_request("Read", {"file_path": "/tmp/data.txt"}) + ) + + result = json.loads(transport.written_messages[0])["response"]["response"] + assert result["behavior"] == "allow" + assert result["updatedInput"]["file_path"] == "/tmp/data.txt" + + +class TestGovernanceHookContextPropagation: + """Governance hook receives the same ToolPermissionContext as can_use_tool.""" + + @pytest.mark.anyio + async def test_context_fields_are_forwarded(self): + received: dict = {} + + def capture_policy( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + received["tool_name"] = tool_name + received["tool_input"] = tool_input + received["tool_use_id"] = context.tool_use_id + return GovernanceDecision(allowed=True) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=capture_policy, + ) + + request = { + "type": "control_request", + "request_id": "req-ctx", + "request": { + "subtype": "can_use_tool", + "tool_name": "Bash", + "input": {"command": "echo hi"}, + "permission_suggestions": [], + "tool_use_id": "toolu_ctx_test", + }, + } + await query._handle_control_request(request) + + assert received["tool_name"] == "Bash" + assert received["tool_input"] == {"command": "echo hi"} + assert received["tool_use_id"] == "toolu_ctx_test" + + +class TestGovernanceHookToolFiltering: + """Governance hook can selectively block specific tools.""" + + @pytest.mark.anyio + async def test_selective_block_by_tool_name(self): + def block_bash( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + if tool_name == "Bash": + return GovernanceDecision(allowed=False, reason="Bash is disabled") + return GovernanceDecision(allowed=True) + + transport = MockTransport() + query = Query( + transport=transport, + is_streaming_mode=True, + can_use_tool=_allow_can_use_tool, + governance_hook=block_bash, + ) + + # Bash should be blocked + await query._handle_control_request( + _can_use_tool_request("Bash", {"command": "ls"}, "req-bash") + ) + bash_result = json.loads(transport.written_messages[0])["response"]["response"] + assert bash_result["behavior"] == "deny" + + # Read should be allowed + await query._handle_control_request( + _can_use_tool_request("Read", {"file_path": "/tmp/x"}, "req-read") + ) + read_result = json.loads(transport.written_messages[1])["response"]["response"] + assert read_result["behavior"] == "allow" + + +class TestGovernanceHookClaudeAgentOptions: + """Integration: GovernanceDecision types are importable and options accept the hook.""" + + def test_governance_decision_is_exported(self): + from claude_agent_sdk import GovernanceDecision + + d = GovernanceDecision(allowed=True) + assert d["allowed"] is True + + d2 = GovernanceDecision(allowed=False, reason="no", modified_input={}) + assert d2["allowed"] is False + assert d2["reason"] == "no" + assert d2["modified_input"] == {} + + # GovernanceHook is a type alias (callable) – just verify it's importable + assert GovernanceHook is not None + + def test_options_accepts_governance_hook(self): + def my_policy( + tool_name: str, + tool_input: dict, + context: ToolPermissionContext, + ) -> GovernanceDecision: + return GovernanceDecision(allowed=True) + + options = ClaudeAgentOptions(governance_hook=my_policy) + assert options.governance_hook is my_policy + + def test_options_governance_hook_defaults_to_none(self): + options = ClaudeAgentOptions() + assert options.governance_hook is None