Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddev/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ python = "3.13"
e2e-env = false
dependencies = [
"pyyaml",
"pytest-asyncio",
"vcrpy",
]
# TODO: remove this when the old CLI is gone
Expand Down
5 changes: 4 additions & 1 deletion ddev/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]
dependencies = [
"anthropic>=0.18.0",
"anthropic>=0.86.0",
"click~=8.1.6",
"coverage",
"datadog-api-client==2.20.0",
Expand Down Expand Up @@ -136,3 +136,6 @@ ban-relative-imports = "parents"
[tool.ruff.lint.per-file-ignores]
#Tests can use assertions and relative imports
"**/tests/**/*" = ["I252"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
3 changes: 3 additions & 0 deletions ddev/src/ddev/ai/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
219 changes: 219 additions & 0 deletions ddev/src/ddev/ai/agent/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from copy import deepcopy
from dataclasses import dataclass
from enum import StrEnum
from typing import Any, Final

import anthropic
from anthropic.types import MessageParam, ToolParam, ToolResultBlockParam

from ddev.ai.tools.core.registry import ToolRegistry

from .exceptions import (
AgentAPIError,
AgentConnectionError,
AgentError,
AgentRateLimitError,
)

DEFAULT_MODEL: Final[str] = "claude-sonnet-4-6"
DEFAULT_MAX_TOKENS: Final[int] = 8192 # max tokens per response
ALLOWED_TOOL_CALLERS: Final = ["code_execution_20260120"]


class StopReason(StrEnum):
"""Maps Anthropic API stop_reason strings to a typed enum."""

END_TURN = "end_turn"
MAX_TOKENS = "max_tokens"
STOP_SEQUENCE = "stop_sequence"
TOOL_USE = "tool_use"
PAUSE_TURN = "pause_turn"
REFUSAL = "refusal"


@dataclass(frozen=True)
class ToolCall:
"""A single tool invocation requested by the model."""

id: str
name: str
input: dict[str, Any]


@dataclass(frozen=True)
class ContextUsage:
"""Context window accounting for a single API call."""

window_size: int
used_tokens: int

@property
def context_pct(self) -> float:
return self.used_tokens / self.window_size * 100

@property
def remaining_tokens(self) -> int:
return self.window_size - self.used_tokens


@dataclass(frozen=True)
class TokenUsage:
"""Token accounting from a single API call."""

input_tokens: int # tokens sent to the model (system_prompt + history)
output_tokens: int # tokens the model generated
cache_read_input_tokens: int # tokens read from prompt cache
cache_creation_input_tokens: int # tokens written to prompt cache
context: ContextUsage


@dataclass(frozen=True)
class AgentResponse:
"""The complete response from a single AnthropicAgent.send() call.
Adds useful metadata to the response of the Anthropic API."""

stop_reason: StopReason
text: str
tool_calls: list[ToolCall]
usage: TokenUsage


class AnthropicAgent:
"""A wrapper around the Anthropic API that provides a simple interface for interacting with agents."""

def __init__(
self,
client: anthropic.AsyncAnthropic,
tools: ToolRegistry,
system_prompt: str,
name: str,
model: str = DEFAULT_MODEL,
max_tokens: int = DEFAULT_MAX_TOKENS,
programmatic_tool_calling: bool = False,
) -> None:
"""Initialize an AnthropicAgent.
Args:
client: The Anthropic client to use.
tools: The ToolRegistry to use (might not be used in every call if allowed_tools in send() is provided)
system_prompt: The system prompt to use.
name: The name of the agent.
model: The model to use.
max_tokens: The max tokens per response.
programmatic_tool_calling: Whether to allow programmatic tool calling.
"""

self._client = client
self._tools = tools
self._system_prompt = system_prompt
self.name = name
self._model = model
self._max_tokens = max_tokens
self._programmatic_tool_calling = programmatic_tool_calling
self._history: list[MessageParam] = []
self._context_window: int | None = None

@property
def history(self) -> list[MessageParam]:
"""Read-only snapshot of the conversation history."""
return deepcopy(self._history)

def reset(self) -> None:
"""Clear conversation history to start a new conversation."""
self._history = []

async def _get_context_window(self) -> int:
if self._context_window is None:
info = await self._client.models.retrieve(self._model)
self._context_window = info.max_input_tokens
return self._context_window

def _get_tool_definitions(self, allowed_tools: list[str] | None) -> list[ToolParam]:
"""Filter tool definitions by allowlist. None means all tools."""
definitions = self._tools.definitions
if allowed_tools is not None:
allowed = set(allowed_tools)
definitions = [d for d in definitions if d["name"] in allowed]
if not self._programmatic_tool_calling:
definitions = [{**d, "allowed_callers": ALLOWED_TOOL_CALLERS} for d in definitions]
Comment on lines +140 to +141
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve direct tool invocation when PTC is disabled

AnthropicAgent._get_tool_definitions() currently adds "allowed_callers": ["code_execution_20260120"] when programmatic_tool_calling is False. In Anthropic tool semantics, omitting allowed_callers keeps tools model-invokable (direct), while restricting to code_execution_* opts the tool into programmatic/code-execution callers only. With the default constructor value (False), this effectively disables normal model tool calls in send() and makes tools unavailable in standard messages.create flows.

Useful? React with 👍 / 👎.

return definitions

async def send(
self,
content: str | list[ToolResultBlockParam],
allowed_tools: list[str] | None = None,
) -> AgentResponse:
"""Send a message to the agent and return the response.
Args:
content: The content to send to the agent.
allowed_tools: The tools in the ToolRegistry to allow the agent to use.
Returns:
An AgentResponse object containing the response from the agent.
"""
tool_defs = self._get_tool_definitions(allowed_tools)

user_msg: MessageParam = {"role": "user", "content": content}
messages = [*self._history, user_msg]

try:
response = await self._client.messages.create(
model=self._model,
max_tokens=self._max_tokens,
system=self._system_prompt,
messages=messages,
tools=tool_defs if tool_defs else anthropic.NOT_GIVEN,
)
except anthropic.APIConnectionError as e:
raise AgentConnectionError(f"Connection failed: {e}") from e
except anthropic.RateLimitError as e:
raise AgentRateLimitError(f"Rate limit exceeded: {e}") from e
except anthropic.APIStatusError as e:
raise AgentAPIError(e.status_code, e.message) from e
except anthropic.APIResponseValidationError as e:
raise AgentError(f"Response validation failed: {e}") from e

# stop_reason is None only in streaming responses; we use non-streaming, so None is unexpected
if response.stop_reason is None:
raise AgentError("Received null stop_reason from API")

try:
stop_reason = StopReason(response.stop_reason)
except ValueError as e:
raise AgentError(f"Unknown stop_reason: {response.stop_reason!r}") from e

text_parts: list[str] = []
tool_calls: list[ToolCall] = []

for block in response.content:
if isinstance(block, anthropic.types.TextBlock):
text_parts.append(block.text)
elif isinstance(block, anthropic.types.ToolUseBlock):
tool_calls.append(ToolCall(id=block.id, name=block.name, input=dict(block.input)))
# ThinkingBlock and RedactedThinkingBlock are intentionally ignored.
# Extended thinking support can add a `thinking: str` field to AgentResponse later.

cache_read = response.usage.cache_read_input_tokens or 0
cache_creation = response.usage.cache_creation_input_tokens or 0
used_tokens = response.usage.input_tokens + cache_read + cache_creation
usage = TokenUsage(
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cache_read_input_tokens=cache_read,
cache_creation_input_tokens=cache_creation,
context=ContextUsage(window_size=await self._get_context_window(), used_tokens=used_tokens),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid dropping completed replies on context lookup failure

After messages.create() succeeds, send() unconditionally awaits _get_context_window() while building usage metadata. If models.retrieve() fails (network/API error), the method raises before history is updated and before returning the already-generated assistant output. This turns a successful completion into an exception path and can force retries (and duplicate spend) for transient metadata failures.

Useful? React with 👍 / 👎.

)

agent_response = AgentResponse(
stop_reason=stop_reason,
text="\n".join(text_parts),
tool_calls=tool_calls,
usage=usage,
)

# Save to history only after a successful response.
self._history.extend([user_msg, {"role": "assistant", "content": response.content}])

return agent_response
29 changes: 29 additions & 0 deletions ddev/src/ddev/ai/agent/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)


class AgentError(Exception):
"""Base class for all errors raised by AnthropicAgent."""

pass


class AgentConnectionError(AgentError):
"""Network failure — the API was unreachable."""

pass


class AgentRateLimitError(AgentError):
"""Rate limit hit — the request may be retried after a delay."""

pass


class AgentAPIError(AgentError):
"""The API returned an error status code."""

def __init__(self, status_code: int, message: str) -> None:
super().__init__(message)
self.status_code = status_code
9 changes: 2 additions & 7 deletions ddev/src/ddev/ai/tools/core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from typing import Final

from anthropic.types import ToolParam

from .protocol import ToolProtocol
from .types import ToolResult

ALLOWED_TOOL_CALLERS: Final = ["code_execution_20260120"]


class ToolRegistry:
"""Registry holding all available tools."""
Expand All @@ -20,9 +16,8 @@ def __init__(self, tools: list[ToolProtocol]) -> None:

@property
def definitions(self) -> list[ToolParam]:
"""Return Anthropic SDK tool definitions for all registered tools.
Each tool definition dict is not mutated, but a new dict is returned with the allowed_callers key added."""
return [{**tool.definition, "allowed_callers": ALLOWED_TOOL_CALLERS} for tool in self._tools.values()]
"""Return Anthropic SDK tool definitions for all registered tools."""
return [tool.definition for tool in self._tools.values()]

async def run(self, name: str, raw: dict[str, object]) -> ToolResult:
"""Execute a tool by name, returning an error result if not found."""
Expand Down
3 changes: 3 additions & 0 deletions ddev/tests/ai/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
Loading
Loading