From f4303d1e7211783d19beca6554e44eb73bb29c42 Mon Sep 17 00:00:00 2001 From: Max Parke Date: Tue, 26 May 2026 17:48:23 -0400 Subject: [PATCH 1/9] chore(deps): drop unused runtime deps and exclude tests from wheel (#367) Co-authored-by: Claude Opus 4.7 (1M context) --- pyproject.toml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bc210cace..662f72ea9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,6 @@ dependencies = [ "rich>=13.9.2,<14", "fastapi>=0.115.0", "starlette>=0.49.1", - "tornado>=6.5.5", "uvicorn>=0.31.1", "watchfiles>=0.24.0,<1.0", "python-on-whales>=0.73.0,<0.74", @@ -33,24 +32,17 @@ dependencies = [ "litellm>=1.83.7,<2", "kubernetes>=25.0.0,<36.0.0", "jinja2>=3.1.3,<4", - "mcp[cli]>=1.4.1", + "mcp>=1.4.1", "scale-gp>=0.1.0a59", "openai-agents==0.14.1", "pydantic-ai-slim>=1.0,<2", - "tzlocal>=5.3.1", - "tzdata>=2025.2", - "pytest>=8.4.0", "json_log_formatter>=1.1.1", - "pytest-asyncio>=1.0.0", "scale-gp-beta>=0.2.0", - "ipykernel>=6.29.5", "openai>=2.2,<3", # Required by openai-agents; litellm now supports openai 2.x (issue #13711 resolved: https://github.com/BerriAI/litellm/issues/13711) "cloudpickle>=3.1.1", - "datadog>=0.52.1", "ddtrace>=3.13.0", "yaspin>=3.1.0", "claude-agent-sdk>=0.1.0", - "anthropic>=0.40.0", "langgraph-checkpoint>=2.0.0", "opentelemetry-sdk>=1.20.0", "opentelemetry-api>=1.20.0", @@ -152,6 +144,14 @@ include = [ [tool.hatch.build.targets.wheel] packages = ["src/agentex"] +# Don't ship internal test files in the wheel. `lib/cli/templates/**/test_agent.py.j2` +# is intentionally kept — those render into user projects. +exclude = [ + "src/agentex/lib/**/tests/**", + "src/agentex/lib/**/test_*.py", + "src/agentex/lib/**/conftest.py", + "src/agentex/lib/**/pytest.ini", +] [tool.hatch.build.targets.sdist] # Basically everything except hidden files/directories (such as .github, .devcontainers, .python-version, etc) From 38ed3384094f7f07f6b2482489f457fd1dc4f76d Mon Sep 17 00:00:00 2001 From: "stainless-app[bot]" <142633134+stainless-app[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 23:16:11 +0000 Subject: [PATCH 2/9] feat(api): add cleaned_at field to task response types --- .stats.yml | 4 ++-- src/agentex/types/task.py | 2 ++ src/agentex/types/task_list_response.py | 2 ++ src/agentex/types/task_retrieve_by_name_response.py | 2 ++ src/agentex/types/task_retrieve_response.py | 2 ++ 5 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.stats.yml b/.stats.yml index 1de9197e7..5e5e990c0 100644 --- a/.stats.yml +++ b/.stats.yml @@ -1,4 +1,4 @@ configured_endpoints: 63 -openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-77ce1b851a8b44f0e67ce2c3ebc20cd92d5af16ab9d3a9224344cd4c83ffb564.yml -openapi_spec_hash: d31d828c46635cbc20165177c7187a70 +openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-4d4bf80af19e6a2ef6b890d5d978316b86e9939d8d5116e94b90117525c61325.yml +openapi_spec_hash: 133afeacb42000ed4f9e076abf4b50fd config_hash: ba5183ca635940fd202d05a2a9fcb630 diff --git a/src/agentex/types/task.py b/src/agentex/types/task.py index 8b9cde47f..3769d5128 100644 --- a/src/agentex/types/task.py +++ b/src/agentex/types/task.py @@ -12,6 +12,8 @@ class Task(BaseModel): id: str + cleaned_at: Optional[datetime] = None + created_at: Optional[datetime] = None name: Optional[str] = None diff --git a/src/agentex/types/task_list_response.py b/src/agentex/types/task_list_response.py index 85f87c0e5..b8d0ab73d 100644 --- a/src/agentex/types/task_list_response.py +++ b/src/agentex/types/task_list_response.py @@ -17,6 +17,8 @@ class TaskListResponseItem(BaseModel): agents: Optional[List[Agent]] = None + cleaned_at: Optional[datetime] = None + created_at: Optional[datetime] = None name: Optional[str] = None diff --git a/src/agentex/types/task_retrieve_by_name_response.py b/src/agentex/types/task_retrieve_by_name_response.py index 1ffc6819a..557764d04 100644 --- a/src/agentex/types/task_retrieve_by_name_response.py +++ b/src/agentex/types/task_retrieve_by_name_response.py @@ -17,6 +17,8 @@ class TaskRetrieveByNameResponse(BaseModel): agents: Optional[List[Agent]] = None + cleaned_at: Optional[datetime] = None + created_at: Optional[datetime] = None name: Optional[str] = None diff --git a/src/agentex/types/task_retrieve_response.py b/src/agentex/types/task_retrieve_response.py index 5f2629468..bb1398d67 100644 --- a/src/agentex/types/task_retrieve_response.py +++ b/src/agentex/types/task_retrieve_response.py @@ -17,6 +17,8 @@ class TaskRetrieveResponse(BaseModel): agents: Optional[List[Agent]] = None + cleaned_at: Optional[datetime] = None + created_at: Optional[datetime] = None name: Optional[str] = None From 6f1c14fd61077da52038361642a9fbc4a0a56c8b Mon Sep 17 00:00:00 2001 From: Max Parke Date: Wed, 27 May 2026 11:35:59 -0400 Subject: [PATCH 3/9] refactor(types): promote protocol types to agentex.protocol.* (#371) Co-authored-by: Claude Opus 4.7 (1M context) --- CLAUDE.md | 16 +++ .../default-langgraph/project/acp.py.j2 | 2 +- .../default-pydantic-ai/project/acp.py.j2 | 2 +- .../cli/templates/default/project/acp.py.j2 | 2 +- .../sync-langgraph/project/acp.py.j2 | 2 +- .../sync-openai-agents/project/acp.py.j2 | 2 +- .../sync-pydantic-ai/project/acp.py.j2 | 2 +- .../lib/cli/templates/sync/project/acp.py.j2 | 2 +- .../project/workflow.py.j2 | 2 +- .../project/workflow.py.j2 | 2 +- .../templates/temporal/project/workflow.py.j2 | 2 +- .../services/temporal_task_service.py | 2 +- .../lib/core/temporal/workflows/workflow.py | 2 +- .../lib/sdk/fastacp/base/base_acp_server.py | 4 +- .../lib/sdk/fastacp/impl/async_base_acp.py | 2 +- src/agentex/lib/sdk/fastacp/impl/sync_acp.py | 2 +- .../lib/sdk/fastacp/impl/temporal_acp.py | 2 +- src/agentex/lib/sdk/fastacp/tests/conftest.py | 4 +- .../sdk/fastacp/tests/test_base_acp_server.py | 2 +- .../lib/sdk/fastacp/tests/test_integration.py | 2 +- src/agentex/lib/types/acp.py | 131 ++---------------- src/agentex/lib/types/json_rpc.py | 58 ++------ src/agentex/protocol/__init__.py | 16 +++ src/agentex/protocol/acp.py | 116 ++++++++++++++++ src/agentex/protocol/json_rpc.py | 63 +++++++++ tests/test_header_forwarding.py | 2 +- tests/test_protocol_shims.py | 98 +++++++++++++ 27 files changed, 355 insertions(+), 187 deletions(-) create mode 100644 src/agentex/protocol/__init__.py create mode 100644 src/agentex/protocol/acp.py create mode 100644 src/agentex/protocol/json_rpc.py create mode 100644 tests/test_protocol_shims.py diff --git a/CLAUDE.md b/CLAUDE.md index 7e0df1779..7f62a42fd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -55,11 +55,27 @@ The package provides the `agentex` CLI with these main commands: ### Code Structure - `/src/agentex/` - Core SDK and generated API client code +- `/src/agentex/protocol/` - **Canonical** location for wire-protocol shapes + (JSON-RPC envelopes, ACP method-param types). Depends only on `pydantic` + and the Stainless-generated `agentex.types.*` surface, so it is safe to + import from a future slim REST-only install. + - `acp.py` - `RPCMethod`, `CreateTaskParams`, `SendMessageParams`, + `SendEventParams`, `CancelTaskParams`, `RPC_SYNC_METHODS`, + `PARAMS_MODEL_BY_METHOD` + - `json_rpc.py` - `JSONRPCRequest`, `JSONRPCResponse`, `JSONRPCError` - `/src/agentex/lib/` - Custom library code (not modified by code generator) - `/cli/` - Command-line interface implementation - `/core/` - Core services, adapters, and temporal workflows - `/sdk/` - SDK utilities and FastACP implementation - `/types/` - Custom type definitions + - `acp.py`, `json_rpc.py` - **back-compat shims** re-exporting from + `agentex.protocol.*`. Existing `from agentex.lib.types.{acp,json_rpc} + import ...` keeps working; new code should import from the canonical + `agentex.protocol.*` paths. + - Other modules (`tracing`, `agent_card`, `credentials`, `fastacp`, + `llm_messages`, `converters`, etc.) stay here — they have heavier + transitive deps (temporal, openai-agents, model_utils/yaml) and + aren't slim-safe. - `/utils/` - Utility functions - `/examples/` - Example implementations and tutorials - `/tests/` - Test suites diff --git a/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 index 6099f5c3e..3309dc07e 100644 --- a/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 @@ -18,7 +18,7 @@ import agentex.lib.adk as adk from agentex.lib.adk import create_langgraph_tracing_handler, stream_langgraph_events from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config from agentex.lib.sdk.fastacp.fastacp import FastACP -from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskParams from agentex.lib.types.fastacp import AsyncACPConfig from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger diff --git a/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 b/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 index b63683da1..5692396b2 100644 --- a/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 @@ -28,7 +28,7 @@ from agentex.lib.adk import ( stream_pydantic_ai_events, create_pydantic_ai_tracing_handler, ) -from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskParams from agentex.lib.types.fastacp import AsyncACPConfig from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger diff --git a/src/agentex/lib/cli/templates/default/project/acp.py.j2 b/src/agentex/lib/cli/templates/default/project/acp.py.j2 index 5478b51b5..b0da14a5c 100644 --- a/src/agentex/lib/cli/templates/default/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/default/project/acp.py.j2 @@ -1,6 +1,6 @@ from agentex.lib.sdk.fastacp.fastacp import FastACP from agentex.lib.types.fastacp import AsyncACPConfig -from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskParams from agentex.lib.utils.logging import make_logger from agentex.types.text_content import TextContent from agentex.lib import adk diff --git a/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 index 2edd20fcb..54538d0c9 100644 --- a/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 @@ -11,7 +11,7 @@ import agentex.lib.adk as adk from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config from agentex.lib.sdk.fastacp.fastacp import FastACP -from agentex.lib.types.acp import SendMessageParams +from agentex.protocol.acp import SendMessageParams from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.types.task_message_content import TaskMessageContent diff --git a/src/agentex/lib/cli/templates/sync-openai-agents/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync-openai-agents/project/acp.py.j2 index 0b3b482fe..4e2517838 100644 --- a/src/agentex/lib/cli/templates/sync-openai-agents/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/sync-openai-agents/project/acp.py.j2 @@ -4,7 +4,7 @@ from typing import AsyncGenerator, List from agentex.lib import adk from agentex.lib.adk.providers._modules.sync_provider import SyncStreamingProvider, convert_openai_to_agentex_events from agentex.lib.sdk.fastacp.fastacp import FastACP -from agentex.lib.types.acp import SendMessageParams +from agentex.protocol.acp import SendMessageParams from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.model_utils import BaseModel diff --git a/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 index e07f57a1a..4925e847f 100644 --- a/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 @@ -22,7 +22,7 @@ from agentex.lib.adk import ( create_pydantic_ai_tracing_handler, convert_pydantic_ai_to_agentex_events, ) -from agentex.lib.types.acp import SendMessageParams +from agentex.protocol.acp import SendMessageParams from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.lib.sdk.fastacp.fastacp import FastACP diff --git a/src/agentex/lib/cli/templates/sync/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync/project/acp.py.j2 index 7184d26aa..ce5069a4c 100644 --- a/src/agentex/lib/cli/templates/sync/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/sync/project/acp.py.j2 @@ -1,6 +1,6 @@ from typing import AsyncGenerator, Union from agentex.lib.sdk.fastacp.fastacp import FastACP -from agentex.lib.types.acp import SendMessageParams +from agentex.protocol.acp import SendMessageParams from agentex.types.task_message_update import TaskMessageUpdate from agentex.types.task_message_content import TaskMessageContent diff --git a/src/agentex/lib/cli/templates/temporal-openai-agents/project/workflow.py.j2 b/src/agentex/lib/cli/templates/temporal-openai-agents/project/workflow.py.j2 index 5b95d4479..2b81bb335 100644 --- a/src/agentex/lib/cli/templates/temporal-openai-agents/project/workflow.py.j2 +++ b/src/agentex/lib/cli/templates/temporal-openai-agents/project/workflow.py.j2 @@ -4,7 +4,7 @@ import os from temporalio import workflow from agentex.lib import adk -from agentex.lib.types.acp import CreateTaskParams, SendEventParams +from agentex.protocol.acp import CreateTaskParams, SendEventParams from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow from agentex.lib.core.temporal.types.workflow import SignalName from agentex.lib.utils.logging import make_logger diff --git a/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/workflow.py.j2 b/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/workflow.py.j2 index 23e5156f1..66a91d7a8 100644 --- a/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/workflow.py.j2 +++ b/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/workflow.py.j2 @@ -23,7 +23,7 @@ from temporalio import workflow from project.agent import TaskDeps, temporal_agent from agentex.lib import adk -from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.protocol.acp import SendEventParams, CreateTaskParams from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.types.text_content import TextContent diff --git a/src/agentex/lib/cli/templates/temporal/project/workflow.py.j2 b/src/agentex/lib/cli/templates/temporal/project/workflow.py.j2 index ad756eb15..56db5abf3 100644 --- a/src/agentex/lib/cli/templates/temporal/project/workflow.py.j2 +++ b/src/agentex/lib/cli/templates/temporal/project/workflow.py.j2 @@ -3,7 +3,7 @@ import json from temporalio import workflow from agentex.lib import adk -from agentex.lib.types.acp import CreateTaskParams, SendEventParams +from agentex.protocol.acp import CreateTaskParams, SendEventParams from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow from agentex.lib.core.temporal.types.workflow import SignalName from agentex.lib.utils.logging import make_logger diff --git a/src/agentex/lib/core/temporal/services/temporal_task_service.py b/src/agentex/lib/core/temporal/services/temporal_task_service.py index 81eb22389..9d66c0c1f 100644 --- a/src/agentex/lib/core/temporal/services/temporal_task_service.py +++ b/src/agentex/lib/core/temporal/services/temporal_task_service.py @@ -6,7 +6,7 @@ from agentex.types.task import Task from agentex.types.agent import Agent from agentex.types.event import Event -from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.protocol.acp import SendEventParams, CreateTaskParams from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.clients.temporal.types import WorkflowState from agentex.lib.core.temporal.types.workflow import SignalName diff --git a/src/agentex/lib/core/temporal/workflows/workflow.py b/src/agentex/lib/core/temporal/workflows/workflow.py index 727f3ac85..3e4498162 100644 --- a/src/agentex/lib/core/temporal/workflows/workflow.py +++ b/src/agentex/lib/core/temporal/workflows/workflow.py @@ -2,7 +2,7 @@ from temporalio import workflow -from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.protocol.acp import SendEventParams, CreateTaskParams from agentex.lib.utils.logging import make_logger from agentex.lib.core.temporal.types.workflow import SignalName diff --git a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py index c48816020..b0b1c3685 100644 --- a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py +++ b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py @@ -14,7 +14,7 @@ from starlette.types import Send, Scope, ASGIApp, Receive from fastapi.responses import StreamingResponse -from agentex.lib.types.acp import ( +from agentex.protocol.acp import ( RPC_SYNC_METHODS, PARAMS_MODEL_BY_METHOD, RPCMethod, @@ -24,7 +24,7 @@ SendMessageParams, ) from agentex.lib.utils.logging import make_logger, ctx_var_request_id -from agentex.lib.types.json_rpc import JSONRPCError, JSONRPCRequest, JSONRPCResponse +from agentex.protocol.json_rpc import JSONRPCError, JSONRPCRequest, JSONRPCResponse from agentex.lib.utils.model_utils import BaseModel from agentex.lib.utils.registration import register_agent diff --git a/src/agentex/lib/sdk/fastacp/impl/async_base_acp.py b/src/agentex/lib/sdk/fastacp/impl/async_base_acp.py index d2b8bac92..e9d20f150 100644 --- a/src/agentex/lib/sdk/fastacp/impl/async_base_acp.py +++ b/src/agentex/lib/sdk/fastacp/impl/async_base_acp.py @@ -1,7 +1,7 @@ from typing import Any from typing_extensions import override -from agentex.lib.types.acp import ( +from agentex.protocol.acp import ( SendEventParams, CancelTaskParams, CreateTaskParams, diff --git a/src/agentex/lib/sdk/fastacp/impl/sync_acp.py b/src/agentex/lib/sdk/fastacp/impl/sync_acp.py index 4898a9637..5ecad073e 100644 --- a/src/agentex/lib/sdk/fastacp/impl/sync_acp.py +++ b/src/agentex/lib/sdk/fastacp/impl/sync_acp.py @@ -3,7 +3,7 @@ from typing import Any, override from collections.abc import AsyncGenerator -from agentex.lib.types.acp import SendMessageParams +from agentex.protocol.acp import SendMessageParams from agentex.lib.utils.logging import make_logger from agentex.types.task_message_delta import TextDelta from agentex.types.task_message_update import ( diff --git a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py index f64e16d72..54fe72e6c 100644 --- a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py +++ b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py @@ -6,7 +6,7 @@ from fastapi import FastAPI from temporalio.converter import PayloadCodec -from agentex.lib.types.acp import ( +from agentex.protocol.acp import ( SendEventParams, CancelTaskParams, CreateTaskParams, diff --git a/src/agentex/lib/sdk/fastacp/tests/conftest.py b/src/agentex/lib/sdk/fastacp/tests/conftest.py index 8941f16eb..59ecbfee3 100644 --- a/src/agentex/lib/sdk/fastacp/tests/conftest.py +++ b/src/agentex/lib/sdk/fastacp/tests/conftest.py @@ -13,12 +13,12 @@ from agentex.types.task import Task from agentex.types.agent import Agent -from agentex.lib.types.acp import ( +from agentex.protocol.acp import ( CancelTaskParams, CreateTaskParams, SendMessageParams, ) -from agentex.lib.types.json_rpc import JSONRPCRequest +from agentex.protocol.json_rpc import JSONRPCRequest from agentex.types.task_message import TaskMessageContent from agentex.types.task_message_content import TextContent from agentex.lib.sdk.fastacp.impl.sync_acp import SyncACP diff --git a/src/agentex/lib/sdk/fastacp/tests/test_base_acp_server.py b/src/agentex/lib/sdk/fastacp/tests/test_base_acp_server.py index 0816ac436..8a218187e 100644 --- a/src/agentex/lib/sdk/fastacp/tests/test_base_acp_server.py +++ b/src/agentex/lib/sdk/fastacp/tests/test_base_acp_server.py @@ -5,7 +5,7 @@ import pytest from fastapi.testclient import TestClient -from agentex.lib.types.acp import ( +from agentex.protocol.acp import ( RPCMethod, SendEventParams, CancelTaskParams, diff --git a/src/agentex/lib/sdk/fastacp/tests/test_integration.py b/src/agentex/lib/sdk/fastacp/tests/test_integration.py index c6f310af1..c72d336e3 100644 --- a/src/agentex/lib/sdk/fastacp/tests/test_integration.py +++ b/src/agentex/lib/sdk/fastacp/tests/test_integration.py @@ -5,7 +5,7 @@ import httpx import pytest -from agentex.lib.types.acp import ( +from agentex.protocol.acp import ( RPCMethod, SendEventParams, CancelTaskParams, diff --git a/src/agentex/lib/types/acp.py b/src/agentex/lib/types/acp.py index d719b4fd5..74295ef88 100644 --- a/src/agentex/lib/types/acp.py +++ b/src/agentex/lib/types/acp.py @@ -1,116 +1,15 @@ -from __future__ import annotations - -from enum import Enum -from typing import Any - -from pydantic import Field, BaseModel - -from agentex.types.task import Task -from agentex.types.agent import Agent -from agentex.types.event import Event -from agentex.types.task_message_content import TaskMessageContent - - -class RPCMethod(str, Enum): - """Available JSON-RPC methods for agent communication.""" - - EVENT_SEND = "event/send" - MESSAGE_SEND = "message/send" - TASK_CANCEL = "task/cancel" - TASK_CREATE = "task/create" - - -class CreateTaskParams(BaseModel): - """Parameters for task/create method. - - Attributes: - agent: The agent that the task was sent to. - task: The task to be created. - params: The parameters for the task as inputted by the user. - request: Additional request context including headers forwarded to this agent. - """ - - agent: Agent = Field(..., description="The agent that the task was sent to") - task: Task = Field(..., description="The task to be created") - params: dict[str, Any] | None = Field( - None, - description="The parameters for the task as inputted by the user", - ) - request: dict[str, Any] | None = Field( - default=None, - description="Additional request context including headers forwarded to this agent", - ) - - -class SendMessageParams(BaseModel): - """Parameters for message/send method. - - Attributes: - agent: The agent that the message was sent to. - task: The task that the message was sent to. - content: The message that was sent to the agent. - stream: Whether to stream the message back to the agentex server from the agent. - request: Additional request context including headers forwarded to this agent. - """ - - agent: Agent = Field(..., description="The agent that the message was sent to") - task: Task = Field(..., description="The task that the message was sent to") - content: TaskMessageContent = Field( - ..., description="The message that was sent to the agent" - ) - stream: bool = Field( - False, - description="Whether to stream the message back to the agentex server from the agent", - ) - request: dict[str, Any] | None = Field( - default=None, - description="Additional request context including headers forwarded to this agent", - ) - - -class SendEventParams(BaseModel): - """Parameters for event/send method. - - Attributes: - agent: The agent that the event was sent to. - task: The task that the message was sent to. - event: The event that was sent to the agent. - request: Additional request context including headers forwarded to this agent. - """ - - agent: Agent = Field(..., description="The agent that the event was sent to") - task: Task = Field(..., description="The task that the message was sent to") - event: Event = Field(..., description="The event that was sent to the agent") - request: dict[str, Any] | None = Field( - default=None, - description="Additional request context including headers forwarded to this agent", - ) - - -class CancelTaskParams(BaseModel): - """Parameters for task/cancel method. - - Attributes: - agent: The agent that the task was sent to. - task: The task that was cancelled. - request: Additional request context including headers forwarded to this agent. - """ - - agent: Agent = Field(..., description="The agent that the task was sent to") - task: Task = Field(..., description="The task that was cancelled") - request: dict[str, Any] | None = Field( - default=None, - description="Additional request context including headers forwarded to this agent", - ) - - -RPC_SYNC_METHODS = [ - RPCMethod.MESSAGE_SEND, -] - -PARAMS_MODEL_BY_METHOD: dict[RPCMethod, type[BaseModel]] = { - RPCMethod.EVENT_SEND: SendEventParams, - RPCMethod.TASK_CANCEL: CancelTaskParams, - RPCMethod.MESSAGE_SEND: SendMessageParams, - RPCMethod.TASK_CREATE: CreateTaskParams, -} +"""Back-compat shim. The canonical location is :mod:`agentex.protocol.acp`. + +Kept here so existing ``from agentex.lib.types.acp import ...`` imports +continue to work. New code should import from the canonical path. +""" + +from agentex.protocol.acp import ( # noqa: F401 + RPC_SYNC_METHODS, + PARAMS_MODEL_BY_METHOD, + RPCMethod, + SendEventParams, + CancelTaskParams, + CreateTaskParams, + SendMessageParams, +) diff --git a/src/agentex/lib/types/json_rpc.py b/src/agentex/lib/types/json_rpc.py index b89e9d6b2..b010f93f7 100644 --- a/src/agentex/lib/types/json_rpc.py +++ b/src/agentex/lib/types/json_rpc.py @@ -1,51 +1,11 @@ -from __future__ import annotations +"""Back-compat shim. The canonical location is :mod:`agentex.protocol.json_rpc`. -from typing import Any, Literal +Kept here so existing ``from agentex.lib.types.json_rpc import ...`` imports +continue to work. New code should import from the canonical path. +""" -from agentex.lib.utils.model_utils import BaseModel - - -class JSONRPCError(BaseModel): - """JSON-RPC 2.0 Error - - Attributes: - code: The error code - message: The error message - data: The error data - """ - - code: int - message: str - data: Any | None = None - - -class JSONRPCRequest(BaseModel): - """JSON-RPC 2.0 Request - - Attributes: - jsonrpc: The JSON-RPC version - method: The method to call - params: The parameters for the request - id: The ID of the request - """ - - jsonrpc: Literal["2.0"] = "2.0" - method: str - params: dict[str, Any] - id: int | str | None = None - - -class JSONRPCResponse(BaseModel): - """JSON-RPC 2.0 Response - - Attributes: - jsonrpc: The JSON-RPC version - result: The result of the request - error: The error of the request - id: The ID of the request - """ - - jsonrpc: Literal["2.0"] = "2.0" - result: dict[str, Any] | None = None - error: JSONRPCError | None = None - id: int | str | None = None +from agentex.protocol.json_rpc import ( # noqa: F401 + JSONRPCError, + JSONRPCRequest, + JSONRPCResponse, +) diff --git a/src/agentex/protocol/__init__.py b/src/agentex/protocol/__init__.py new file mode 100644 index 000000000..be9db981a --- /dev/null +++ b/src/agentex/protocol/__init__.py @@ -0,0 +1,16 @@ +"""Wire-protocol shapes for Agentex. + +The modules under `agentex.protocol.*` are the typed shapes for talking to +an Agentex agent over JSON-RPC (the ACP / Agent Communication Protocol) +without pulling in the heavy ADK runtime. They depend only on pydantic and +the Stainless-generated `agentex.types.*` surface, so they are safe to +import from a slim REST-only install. + +Hand-rolled JSON-RPC clients (e.g. the one in `egp-api-backend`) can switch +from constructing `{"jsonrpc": "2.0", "method": "...", "params": {...}}` +dicts by hand to constructing `JSONRPCRequest(method=RPCMethod.TASK_CREATE, +params=CreateTaskParams(...).model_dump())`. + +For back-compat, the same classes are re-exported from +`agentex.lib.types.{acp,json_rpc}` (the historical locations). +""" diff --git a/src/agentex/protocol/acp.py b/src/agentex/protocol/acp.py new file mode 100644 index 000000000..d719b4fd5 --- /dev/null +++ b/src/agentex/protocol/acp.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from enum import Enum +from typing import Any + +from pydantic import Field, BaseModel + +from agentex.types.task import Task +from agentex.types.agent import Agent +from agentex.types.event import Event +from agentex.types.task_message_content import TaskMessageContent + + +class RPCMethod(str, Enum): + """Available JSON-RPC methods for agent communication.""" + + EVENT_SEND = "event/send" + MESSAGE_SEND = "message/send" + TASK_CANCEL = "task/cancel" + TASK_CREATE = "task/create" + + +class CreateTaskParams(BaseModel): + """Parameters for task/create method. + + Attributes: + agent: The agent that the task was sent to. + task: The task to be created. + params: The parameters for the task as inputted by the user. + request: Additional request context including headers forwarded to this agent. + """ + + agent: Agent = Field(..., description="The agent that the task was sent to") + task: Task = Field(..., description="The task to be created") + params: dict[str, Any] | None = Field( + None, + description="The parameters for the task as inputted by the user", + ) + request: dict[str, Any] | None = Field( + default=None, + description="Additional request context including headers forwarded to this agent", + ) + + +class SendMessageParams(BaseModel): + """Parameters for message/send method. + + Attributes: + agent: The agent that the message was sent to. + task: The task that the message was sent to. + content: The message that was sent to the agent. + stream: Whether to stream the message back to the agentex server from the agent. + request: Additional request context including headers forwarded to this agent. + """ + + agent: Agent = Field(..., description="The agent that the message was sent to") + task: Task = Field(..., description="The task that the message was sent to") + content: TaskMessageContent = Field( + ..., description="The message that was sent to the agent" + ) + stream: bool = Field( + False, + description="Whether to stream the message back to the agentex server from the agent", + ) + request: dict[str, Any] | None = Field( + default=None, + description="Additional request context including headers forwarded to this agent", + ) + + +class SendEventParams(BaseModel): + """Parameters for event/send method. + + Attributes: + agent: The agent that the event was sent to. + task: The task that the message was sent to. + event: The event that was sent to the agent. + request: Additional request context including headers forwarded to this agent. + """ + + agent: Agent = Field(..., description="The agent that the event was sent to") + task: Task = Field(..., description="The task that the message was sent to") + event: Event = Field(..., description="The event that was sent to the agent") + request: dict[str, Any] | None = Field( + default=None, + description="Additional request context including headers forwarded to this agent", + ) + + +class CancelTaskParams(BaseModel): + """Parameters for task/cancel method. + + Attributes: + agent: The agent that the task was sent to. + task: The task that was cancelled. + request: Additional request context including headers forwarded to this agent. + """ + + agent: Agent = Field(..., description="The agent that the task was sent to") + task: Task = Field(..., description="The task that was cancelled") + request: dict[str, Any] | None = Field( + default=None, + description="Additional request context including headers forwarded to this agent", + ) + + +RPC_SYNC_METHODS = [ + RPCMethod.MESSAGE_SEND, +] + +PARAMS_MODEL_BY_METHOD: dict[RPCMethod, type[BaseModel]] = { + RPCMethod.EVENT_SEND: SendEventParams, + RPCMethod.TASK_CANCEL: CancelTaskParams, + RPCMethod.MESSAGE_SEND: SendMessageParams, + RPCMethod.TASK_CREATE: CreateTaskParams, +} diff --git a/src/agentex/protocol/json_rpc.py b/src/agentex/protocol/json_rpc.py new file mode 100644 index 000000000..be03a4936 --- /dev/null +++ b/src/agentex/protocol/json_rpc.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict + +# Preserve the config the previous `agentex.lib.utils.model_utils.BaseModel` +# applied — `from_attributes=True` lets callers `model_validate` from +# attribute-bearing objects (not just dicts); `populate_by_name=True` is a +# harmless default future-proofing for any field aliases. +_PROTOCOL_MODEL_CONFIG = ConfigDict(from_attributes=True, populate_by_name=True) + + +class JSONRPCError(BaseModel): + """JSON-RPC 2.0 Error + + Attributes: + code: The error code + message: The error message + data: The error data + """ + + model_config = _PROTOCOL_MODEL_CONFIG + + code: int + message: str + data: Any | None = None + + +class JSONRPCRequest(BaseModel): + """JSON-RPC 2.0 Request + + Attributes: + jsonrpc: The JSON-RPC version + method: The method to call + params: The parameters for the request + id: The ID of the request + """ + + model_config = _PROTOCOL_MODEL_CONFIG + + jsonrpc: Literal["2.0"] = "2.0" + method: str + params: dict[str, Any] + id: int | str | None = None + + +class JSONRPCResponse(BaseModel): + """JSON-RPC 2.0 Response + + Attributes: + jsonrpc: The JSON-RPC version + result: The result of the request + error: The error of the request + id: The ID of the request + """ + + model_config = _PROTOCOL_MODEL_CONFIG + + jsonrpc: Literal["2.0"] = "2.0" + result: dict[str, Any] | None = None + error: JSONRPCError | None = None + id: int | str | None = None diff --git a/tests/test_header_forwarding.py b/tests/test_header_forwarding.py index 51c3a685f..596c6729c 100644 --- a/tests/test_header_forwarding.py +++ b/tests/test_header_forwarding.py @@ -46,7 +46,7 @@ class _StubTracer(_StubAsyncTracer): from agentex.lib.core.services.adk.acp.acp import ACPService from agentex.lib.sdk.fastacp.base.base_acp_server import BaseACPServer -from agentex.lib.types.acp import RPCMethod, SendMessageParams, SendEventParams +from agentex.protocol.acp import RPCMethod, SendMessageParams, SendEventParams from agentex.types.task_message_content import TextContent from agentex.lib.sdk.fastacp.impl.temporal_acp import TemporalACP from agentex.lib.core.temporal.services.temporal_task_service import TemporalTaskService diff --git a/tests/test_protocol_shims.py b/tests/test_protocol_shims.py new file mode 100644 index 000000000..e5e651b68 --- /dev/null +++ b/tests/test_protocol_shims.py @@ -0,0 +1,98 @@ +"""Tests that pin the back-compat contract for protocol-type shims. + +The canonical location for wire-protocol shapes is :mod:`agentex.protocol` +(see PR scaleapi/scale-agentex-python#371). The historical locations +:mod:`agentex.lib.types.acp` and :mod:`agentex.lib.types.json_rpc` are +preserved as re-export shims so external consumers' existing imports +continue to work. + +These tests enforce two invariants: + +1. **Symbol parity** — every public name the original modules exported + is still importable from the old path. Greptile flagged + ``RPC_SYNC_METHODS`` and ``PARAMS_MODEL_BY_METHOD`` as missing in an + earlier pass; this test prevents that regression. +2. **Identity** — the class objects at the shim path are the *same* + objects as the canonical path. Without this, type-narrowing via + ``isinstance`` or pattern matching would silently misbehave for code + that mixes import styles. + +Also asserts the :class:`pydantic.ConfigDict` settings on the JSON-RPC +classes survived the move from :mod:`agentex.lib.utils.model_utils` to +plain :mod:`pydantic` — Greptile flagged the silent loss of +``from_attributes=True`` / ``populate_by_name=True``. +""" + +from __future__ import annotations + + +def test_acp_shim_re_exports_all_original_symbols() -> None: + """Every name historically exported from agentex.lib.types.acp must + still be importable from that path via the back-compat shim.""" + # Importing each symbol; ImportError here means the shim regressed. + from agentex.lib.types.acp import ( # noqa: F401 + RPC_SYNC_METHODS, + PARAMS_MODEL_BY_METHOD, + RPCMethod, + SendEventParams, + CancelTaskParams, + CreateTaskParams, + SendMessageParams, + ) + + +def test_json_rpc_shim_re_exports_all_original_symbols() -> None: + """Every name historically exported from agentex.lib.types.json_rpc + must still be importable from that path via the back-compat shim.""" + from agentex.lib.types.json_rpc import ( # noqa: F401 + JSONRPCError, + JSONRPCRequest, + JSONRPCResponse, + ) + + +def test_acp_shim_classes_are_identical_to_canonical() -> None: + """Shim re-exports must be the *same* class objects as the canonical + path. Different objects would break ``isinstance`` for code that + mixes import styles.""" + from agentex.protocol import acp as canon + from agentex.lib.types import acp as shim + + assert shim.RPCMethod is canon.RPCMethod + assert shim.CreateTaskParams is canon.CreateTaskParams + assert shim.SendMessageParams is canon.SendMessageParams + assert shim.SendEventParams is canon.SendEventParams + assert shim.CancelTaskParams is canon.CancelTaskParams + assert shim.RPC_SYNC_METHODS is canon.RPC_SYNC_METHODS + assert shim.PARAMS_MODEL_BY_METHOD is canon.PARAMS_MODEL_BY_METHOD + + +def test_json_rpc_shim_classes_are_identical_to_canonical() -> None: + """Same identity check for the JSON-RPC envelope types.""" + from agentex.protocol import json_rpc as canon + from agentex.lib.types import json_rpc as shim + + assert shim.JSONRPCError is canon.JSONRPCError + assert shim.JSONRPCRequest is canon.JSONRPCRequest + assert shim.JSONRPCResponse is canon.JSONRPCResponse + + +def test_json_rpc_classes_preserve_legacy_model_config() -> None: + """Pre-refactor, JSON-RPC classes inherited + ``from_attributes=True`` / ``populate_by_name=True`` from + ``agentex.lib.utils.model_utils.BaseModel``. The refactor swapped + to plain ``pydantic.BaseModel`` and set ``model_config`` explicitly + to preserve both flags. Catch any future drop.""" + from agentex.protocol.json_rpc import ( + JSONRPCError, + JSONRPCRequest, + JSONRPCResponse, + ) + + for cls in (JSONRPCError, JSONRPCRequest, JSONRPCResponse): + assert cls.model_config.get("from_attributes") is True, ( + f"{cls.__name__}.model_config dropped from_attributes=True" + ) + assert cls.model_config.get("populate_by_name") is True, ( + f"{cls.__name__}.model_config dropped populate_by_name=True" + ) From feec8426f79e9f02533451d44997717655fd33f2 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 28 May 2026 18:02:59 -0700 Subject: [PATCH 4/9] perf(tracing): span queue linger + per-loop httpx keepalive (#362) Co-authored-by: stainless-app[bot] <142633134+stainless-app[bot]@users.noreply.github.com> Co-authored-by: Declan Brady Co-authored-by: Michael Chou --- .../.ipynb_checkpoints/dev-checkpoint.ipynb | 166 +++++++++++++ .../tutorials/00_sync/010_multiturn/dev.ipynb | 4 +- src/agentex/lib/adk/_modules/tracing.py | 11 +- .../processors/agentex_tracing_processor.py | 42 +++- .../processors/sgp_tracing_processor.py | 79 ++++-- src/agentex/lib/core/tracing/span_queue.py | 221 ++++++++++++++-- .../test_agentex_tracing_processor.py | 132 ++++++++++ .../processors/test_sgp_tracing_processor.py | 169 +++++++++++-- tests/lib/core/tracing/test_span_queue.py | 235 +++++++++++++++++- 9 files changed, 967 insertions(+), 92 deletions(-) create mode 100644 examples/tutorials/00_sync/010_multiturn/.ipynb_checkpoints/dev-checkpoint.ipynb create mode 100644 tests/lib/core/tracing/processors/test_agentex_tracing_processor.py diff --git a/examples/tutorials/00_sync/010_multiturn/.ipynb_checkpoints/dev-checkpoint.ipynb b/examples/tutorials/00_sync/010_multiturn/.ipynb_checkpoints/dev-checkpoint.ipynb new file mode 100644 index 000000000..d82cf5775 --- /dev/null +++ b/examples/tutorials/00_sync/010_multiturn/.ipynb_checkpoints/dev-checkpoint.ipynb @@ -0,0 +1,166 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"s010-multiturn\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "# # (Optional) Create a new task. If you don't create a new task, each message will be sent to a new task. The server will create the task for you.\n", + "\n", + "# import uuid\n", + "\n", + "# TASK_ID = str(uuid.uuid4())[:8]\n", + "\n", + "# rpc_response = client.agents.rpc_by_name(\n", + "# agent_name=AGENT_NAME,\n", + "# method=\"task/create\",\n", + "# params={\n", + "# \"name\": f\"{TASK_ID}-task\",\n", + "# \"params\": {}\n", + "# }\n", + "# )\n", + "\n", + "# task = rpc_response.result\n", + "# print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "# Test non streaming response\n", + "from agentex.types import TextContent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_message(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": False\n", + " }\n", + ")\n", + "\n", + "if not rpc_response or not rpc_response.result:\n", + " raise ValueError(\"No result in response\")\n", + "\n", + "# Extract and print just the text content from the response\n", + "for task_message in rpc_response.result:\n", + " content = task_message.content\n", + " if isinstance(content, TextContent):\n", + " text = content.content\n", + " print(text)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "# Test streaming response\n", + "from agentex.types.text_delta import TextDelta\n", + "from agentex.types.task_message_update import StreamTaskMessageFull, StreamTaskMessageDelta\n", + "\n", + "# The result object of message/send will be a TaskMessageUpdate which is a union of the following types:\n", + "# - StreamTaskMessageStart: \n", + "# - An indicator that a streaming message was started, doesn't contain any useful content\n", + "# - StreamTaskMessageDelta: \n", + "# - A delta of a streaming message, contains the text delta to aggregate\n", + "# - StreamTaskMessageDone: \n", + "# - An indicator that a streaming message was done, doesn't contain any useful content\n", + "# - StreamTaskMessageFull: \n", + "# - A non-streaming message, there is nothing to aggregate, since this contains the full message, not deltas\n", + "\n", + "# Whenn processing StreamTaskMessageDelta, if you are expecting more than TextDeltas, such as DataDelta, ToolRequestDelta, or ToolResponseDelta, you can process them as well\n", + "# Whenn processing StreamTaskMessageFull, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "for agent_rpc_response_chunk in client.agents.send_message_stream(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": True\n", + " }\n", + "):\n", + " # We know that the result of the message/send when stream is set to True will be a TaskMessageUpdate\n", + " task_message_update = agent_rpc_response_chunk.result\n", + " # Print oly the text deltas as they arrive or any full messages\n", + " if isinstance(task_message_update, StreamTaskMessageDelta):\n", + " delta = task_message_update.delta\n", + " if isinstance(delta, TextDelta):\n", + " print(delta.text_delta, end=\"\", flush=True)\n", + " else:\n", + " print(f\"Found non-text {type(task_message)} object in streaming message.\")\n", + " elif isinstance(task_message_update, StreamTaskMessageFull):\n", + " content = task_message_update.content\n", + " if isinstance(content, TextContent):\n", + " print(content.content)\n", + " else:\n", + " print(f\"Found non-text {type(task_message)} object in full message.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/tutorials/00_sync/010_multiturn/dev.ipynb b/examples/tutorials/00_sync/010_multiturn/dev.ipynb index d82cf5775..c7c50532f 100644 --- a/examples/tutorials/00_sync/010_multiturn/dev.ipynb +++ b/examples/tutorials/00_sync/010_multiturn/dev.ipynb @@ -144,7 +144,7 @@ ], "metadata": { "kernelspec": { - "display_name": ".venv", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -158,7 +158,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.9" + "version": "3.14.2" } }, "nbformat": 4, diff --git a/src/agentex/lib/adk/_modules/tracing.py b/src/agentex/lib/adk/_modules/tracing.py index 67150f01d..8694c2078 100644 --- a/src/agentex/lib/adk/_modules/tracing.py +++ b/src/agentex/lib/adk/_modules/tracing.py @@ -67,14 +67,13 @@ def _tracing_service(self) -> TracingService: if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id): import httpx - # Disable keepalive so each span HTTP call gets a fresh TCP - # connection. Reused connections carry asyncio primitives bound - # to the event loop that created them; in sync-ACP / streaming - # contexts the loop context can shift between calls, causing - # "bound to a different event loop" RuntimeErrors. + # Keepalive ON: connections are reused within a single event + # loop, eliminating the TLS-handshake-per-span penalty under + # load. Cross-loop safety is preserved by rebuilding the + # client whenever loop_id changes (the conditional above). agentex_client = create_async_agentex_client( http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), + limits=httpx.Limits(max_keepalive_connections=20), ), ) tracer = AsyncTracer(agentex_client) diff --git a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py index f91634b64..98d50546b 100644 --- a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, override +import asyncio +import weakref +from typing import TYPE_CHECKING, Any, Dict, override from agentex import Agentex from agentex.types.span import Span @@ -9,6 +11,9 @@ AsyncTracingProcessor, ) +if TYPE_CHECKING: + from agentex import AsyncAgentex + class AgentexSyncTracingProcessor(SyncTracingProcessor): def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 @@ -67,19 +72,40 @@ def shutdown(self) -> None: class AgentexAsyncTracingProcessor(AsyncTracingProcessor): def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 + # Per-event-loop client cache. httpx.AsyncClient is bound to the + # loop that created it, so in sync-ACP / streaming contexts (where + # the active loop can change between requests) we keep one client + # per loop instead of disabling keepalive entirely. The cache is a + # WeakKeyDictionary so a GC'd loop and its client are evicted + # automatically — using id() as a key would reuse entries when + # CPython recycles a freed loop's memory address. + self._clients_by_loop: weakref.WeakKeyDictionary[ + asyncio.AbstractEventLoop, "AsyncAgentex" + ] = weakref.WeakKeyDictionary() + + def _build_client(self) -> "AsyncAgentex": import httpx - # Disable keepalive so each span HTTP call gets a fresh TCP connection. - # Reused connections carry asyncio primitives bound to the event loop - # that created them; in sync-ACP / streaming contexts the loop context - # can shift between calls, causing "bound to a different event loop" - # RuntimeErrors. - self.client = create_async_agentex_client( + # Keepalive ON: connections are reused within a single event loop, + # eliminating the TLS-handshake-per-span penalty under load. + return create_async_agentex_client( http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), + limits=httpx.Limits(max_keepalive_connections=20), ), ) + @property + def client(self) -> "AsyncAgentex": + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return self._build_client() + client = self._clients_by_loop.get(loop) + if client is None: + client = self._build_client() + self._clients_by_loop[loop] = client + return client + # TODO(AGX1-199): Add batch create/update endpoints to Agentex API and use # them here instead of one HTTP call per span. # https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index 3a1c96c1b..a21eff7d3 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio +import weakref from typing import cast, override import scale_gp_beta.lib.tracing as tracing @@ -92,23 +94,50 @@ def shutdown(self) -> None: class SGPAsyncTracingProcessor(AsyncTracingProcessor): def __init__(self, config: SGPTracingProcessorConfig): self.disabled = config.sgp_api_key == "" or config.sgp_account_id == "" + self._config = config + # Per-event-loop client cache. httpx.AsyncClient ties its connection + # pool to the loop it was created on; in sync-ACP / streaming contexts + # the active loop can change between requests. Caching per loop lets + # us keep keepalive on within each loop while staying safe across + # loops. The cache is a WeakKeyDictionary so a GC'd loop and its + # client are evicted automatically — using id() as a key would reuse + # entries when CPython recycles a freed loop's memory address. + self._clients_by_loop: weakref.WeakKeyDictionary[ + asyncio.AbstractEventLoop, AsyncSGPClient + ] = weakref.WeakKeyDictionary() + self.env_vars = EnvironmentVariables.refresh() + + def _build_client(self) -> AsyncSGPClient: import httpx - # Disable keepalive so each HTTP call gets a fresh TCP connection, - # avoiding "bound to a different event loop" errors in sync-ACP. - self.sgp_async_client = ( - AsyncSGPClient( - api_key=config.sgp_api_key, - account_id=config.sgp_account_id, - base_url=config.sgp_base_url, - http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), - ), - ) - if not self.disabled - else None + return AsyncSGPClient( + api_key=self._config.sgp_api_key, + account_id=self._config.sgp_account_id, + base_url=self._config.sgp_base_url, + # Keepalive ON: connections are reused within a single event loop, + # which removes the TLS-handshake-per-span penalty observed under + # load. Cross-loop safety is preserved by the per-loop cache. + http_client=httpx.AsyncClient( + limits=httpx.Limits(max_keepalive_connections=20), + ), ) - self.env_vars = EnvironmentVariables.refresh() + + def _get_client(self) -> AsyncSGPClient | None: + """Return the AsyncSGPClient bound to the current event loop, creating + one on first use. Returns None when the processor is disabled.""" + if self.disabled: + return None + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # Called from outside an event loop — should not happen on the + # hot path, but build a one-off client rather than crashing. + return self._build_client() + client = self._clients_by_loop.get(loop) + if client is None: + client = self._build_client() + self._clients_by_loop[loop] = client + return client @override async def on_span_start(self, span: Span) -> None: @@ -123,31 +152,29 @@ async def on_spans_start(self, spans: list[Span]) -> None: if not spans: return - sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans] - - if self.disabled: + client = self._get_client() + if client is None: logger.warning("SGP is disabled, skipping span upsert") return - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[s.to_request_params() for s in sgp_spans] - ) + + sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans] + await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans]) @override async def on_spans_end(self, spans: list[Span]) -> None: if not spans: return + client = self._get_client() + if client is None: + return + sgp_spans: list[SGPSpan] = [] for span in spans: sgp_span = _build_sgp_span(span, self.env_vars) sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr] sgp_spans.append(sgp_span) - - if self.disabled: - return - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[s.to_request_params() for s in sgp_spans] - ) + await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans]) @override async def shutdown(self) -> None: diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index d0d92669e..f9105718b 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os import asyncio from enum import Enum from dataclasses import dataclass @@ -13,6 +14,47 @@ logger = make_logger(__name__) _DEFAULT_BATCH_SIZE = 50 +_DEFAULT_LINGER_MS = 100 +# 0 == unbounded (preserves prior behavior). A bound makes backpressure +# visible (dropped spans are counted) and caps worst-case memory. +_DEFAULT_MAX_SIZE = 0 +# Total attempts per batch for a *transient* failure (1 == no retry). +_DEFAULT_MAX_RETRIES = 1 +# HTTP statuses worth retrying at the queue level. These are explicit +# backpressure / transient signals; everything else (esp. 401/403/4xx auth and +# validation errors) is a permanent failure that re-enqueuing cannot fix. Note +# the underlying SGP client already retries these internally, so queue-level +# retry only helps when its budget is exhausted by a longer blip. +_RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504}) + + +def _read_int_env(name: str, default: int, *, minimum: int = 0) -> int: + """Read a non-negative int from the environment, clamping to ``minimum`` + and falling back to ``default`` when unset or unparseable.""" + raw = os.environ.get(name) + if raw is None: + return default + try: + return max(minimum, int(raw)) + except ValueError: + logger.warning("Ignoring invalid %s=%r; using default %d", name, raw, default) + return default + + +def _read_linger_ms_env() -> int: + """Read AGENTEX_SPAN_QUEUE_LINGER_MS from the environment, falling back to + _DEFAULT_LINGER_MS when unset or unparseable. Negative values are clamped + to 0 (i.e. "drain immediately, no linger").""" + return _read_int_env("AGENTEX_SPAN_QUEUE_LINGER_MS", _DEFAULT_LINGER_MS) + + +def _is_retryable_exc(exc: BaseException) -> bool: + """A failure is retryable only when it carries an HTTP ``status_code`` in + the retryable set. Connection/timeout errors (no status_code) have already + been retried by the SGP client, and bare exceptions (programming bugs) must + never be retried — re-enqueuing them would spin forever.""" + status_code = getattr(exc, "status_code", None) + return isinstance(status_code, int) and status_code in _RETRYABLE_STATUS_CODES class SpanEventType(str, Enum): @@ -25,6 +67,9 @@ class _SpanQueueItem: event_type: SpanEventType span: Span processors: list[AsyncTracingProcessor] + # Number of times this item has already been dispatched. Used to bound + # re-enqueue on transient failures. + attempts: int = 0 class AsyncSpanQueue: @@ -35,13 +80,69 @@ class AsyncSpanQueue: batch are flushed concurrently, then all END events, so that per-span start-before-end ordering is preserved while HTTP calls for independent spans execute in parallel. + + Once the drain loop picks up the first item, it lingers up to + ``linger_ms`` waiting for more items to coalesce into the same batch. + Without the linger the drain almost always returned size-1 batches under + real agent workloads, because spans typically arrive a few ms apart. + + Reliability: + - ``max_size`` bounds the queue. When full, new events are dropped and + counted (see ``dropped_spans``) rather than growing memory without limit. + ``0`` keeps the queue unbounded. + - A batch that fails with a *transient* HTTP status (429/5xx) is + re-enqueued up to ``max_retries`` total attempts. Permanent failures + (auth/validation/bugs) are dropped and counted immediately. """ - def __init__(self, batch_size: int = _DEFAULT_BATCH_SIZE) -> None: - self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue() + def __init__( + self, + batch_size: int = _DEFAULT_BATCH_SIZE, + linger_ms: int | None = None, + max_size: int | None = None, + max_retries: int | None = None, + ) -> None: + resolved_max_size = ( + _read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size) + ) + self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size) self._drain_task: asyncio.Task[None] | None = None self._stopping = False self._batch_size = batch_size + self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms) + self._max_retries = ( + _read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1) + if max_retries is None + else max(1, max_retries) + ) + # Total spans dropped for any reason (full queue, shutdown, permanent + # failure, exhausted retries). Surfaced for metrics/observability so + # span loss stops being silent. + self._dropped_spans = 0 + + @property + def dropped_spans(self) -> int: + """Cumulative count of spans dropped (never delivered).""" + return self._dropped_spans + + @property + def depth(self) -> int: + """Current number of items waiting in the queue.""" + return self._queue.qsize() + + def _record_drop(self, count: int, reason: str) -> None: + if count <= 0: + return + self._dropped_spans += count + # Warn on the first drop and then sparsely, so a drop storm is visible + # without flooding the log. + if self._dropped_spans == count or self._dropped_spans % 100 < count: + logger.warning( + "Span queue dropped %d span(s) (%s); %d dropped in total", + count, + reason, + self._dropped_spans, + ) def enqueue( self, @@ -50,10 +151,13 @@ def enqueue( processors: list[AsyncTracingProcessor], ) -> None: if self._stopping: - logger.warning("Span queue is shutting down, dropping %s event for span %s", event_type.value, span.id) + self._record_drop(1, "queue shutting down") return self._ensure_drain_running() - self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors)) + try: + self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors)) + except asyncio.QueueFull: + self._record_drop(1, "queue full") def _ensure_drain_running(self) -> None: if self._drain_task is None or self._drain_task.done(): @@ -69,12 +173,28 @@ async def _drain_loop(self) -> None: first = await self._queue.get() batch: list[_SpanQueueItem] = [first] - # Opportunistically grab more ready items (non-blocking). - while len(batch) < self._batch_size: - try: - batch.append(self._queue.get_nowait()) - except asyncio.QueueEmpty: - break + # Linger briefly so spans emitted within the window coalesce into + # one batch. Stop early when the batch fills, when the linger + # window elapses, or as soon as the queue is briefly empty *after* + # the deadline. + if self._linger_ms > 0 and not self._stopping: + loop = asyncio.get_running_loop() + deadline = loop.time() + (self._linger_ms / 1000.0) + while len(batch) < self._batch_size: + remaining = deadline - loop.time() + if remaining <= 0: + break + try: + batch.append(await asyncio.wait_for(self._queue.get(), timeout=remaining)) + except asyncio.TimeoutError: + break + else: + # No linger — drain whatever is already queued and stop. + while len(batch) < self._batch_size: + try: + batch.append(self._queue.get_nowait()) + except asyncio.QueueEmpty: + break try: # Separate START and END events. Processing all STARTs before @@ -93,8 +213,7 @@ async def _drain_loop(self) -> None: # Release span data for GC. batch.clear() - @staticmethod - async def _process_items(items: list[_SpanQueueItem]) -> None: + async def _process_items(self, items: list[_SpanQueueItem]) -> None: """Dispatch a batch of same-event-type items to each processor in one call. Groups spans by processor so each processor sees its full slice of the @@ -109,26 +228,78 @@ async def _process_items(items: list[_SpanQueueItem]) -> None: "_process_items requires all items to share the same event_type; " "callers must split START and END batches before dispatching." ) - by_processor: dict[AsyncTracingProcessor, list[Span]] = {} + by_processor: dict[AsyncTracingProcessor, list[_SpanQueueItem]] = {} for item in items: for p in item.processors: - by_processor.setdefault(p, []).append(item.span) + by_processor.setdefault(p, []).append(item) - async def _handle(p: AsyncTracingProcessor, spans: list[Span]) -> None: - try: - if event_type == SpanEventType.START: - await p.on_spans_start(spans) - else: - await p.on_spans_end(spans) - except Exception: - logger.exception( - "Tracing processor %s failed handling %d spans during %s", + await asyncio.gather(*[self._handle(p, batch, event_type) for p, batch in by_processor.items()]) + + async def _handle( + self, + p: AsyncTracingProcessor, + items: list[_SpanQueueItem], + event_type: SpanEventType, + ) -> None: + spans = [item.span for item in items] + try: + if event_type == SpanEventType.START: + await p.on_spans_start(spans) + else: + await p.on_spans_end(spans) + except Exception as exc: + self._handle_failure(p, items, event_type, exc) + + def _handle_failure( + self, + p: AsyncTracingProcessor, + items: list[_SpanQueueItem], + event_type: SpanEventType, + exc: Exception, + ) -> None: + # Re-enqueue transient failures, drop everything else. Re-enqueue is + # bounded by max_retries, so even during shutdown the queue's join() + # still terminates after a finite number of passes. + if _is_retryable_exc(exc): + retriable = [item for item in items if item.attempts + 1 < self._max_retries] + exhausted = len(items) - len(retriable) + if exhausted: + self._record_drop(exhausted, f"{type(p).__name__} retries exhausted during {event_type.value}") + for item in retriable: + self._reenqueue(item, p) + if retriable: + logger.warning( + "Tracing processor %s failed handling %d spans during %s (%s); re-enqueued %d for retry", type(p).__name__, - len(spans), + len(items), event_type.value, + type(exc).__name__, + len(retriable), ) + return + + self._record_drop(len(items), f"{type(p).__name__} permanent failure during {event_type.value}") + logger.exception( + "Tracing processor %s failed handling %d spans during %s", + type(p).__name__, + len(items), + event_type.value, + ) - await asyncio.gather(*[_handle(p, spans) for p, spans in by_processor.items()]) + def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: + """Put a single failed item back on the queue, scoped to the processor + that failed, with an incremented attempt count.""" + try: + self._queue.put_nowait( + _SpanQueueItem( + event_type=item.event_type, + span=item.span, + processors=[p], + attempts=item.attempts + 1, + ) + ) + except asyncio.QueueFull: + self._record_drop(1, "queue full on retry") # ------------------------------------------------------------------ # Shutdown diff --git a/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py new file mode 100644 index 000000000..ec1ed5e88 --- /dev/null +++ b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import asyncio +import weakref +from unittest.mock import MagicMock, patch + +import pytest + +# AgentexAsyncTracingProcessor pulls in agentex.lib.adk via +# create_async_agentex_client, which in turn imports pydantic_ai at package +# init. Skip these tests cleanly when pydantic_ai isn't installed (the SDK +# dev venv state) so collection doesn't error out. +pytest.importorskip( + "pydantic_ai", + reason="agentex.lib.adk import chain requires pydantic_ai", +) + +# Import the processor module up front so unittest.mock.patch() can resolve +# attributes by string path. The tracing_processor_manager only loads this +# module lazily, so without this explicit import the patches below would fail +# with AttributeError at __enter__ time. +import agentex.lib.core.tracing.processors.agentex_tracing_processor # noqa: E402, F401 + +MODULE = "agentex.lib.core.tracing.processors.agentex_tracing_processor" + + +def _make_config() -> MagicMock: + """Empty config — AgentexTracingProcessorConfig is unused by __init__.""" + return MagicMock() + + +class TestAgentexAsyncTracingProcessor: + """Coverage for the per-event-loop client cache. The SGP processor has + matching tests; mirror them here so a regression in the Agentex side + (e.g. an accidental refactor that switches back to a plain dict, or + drops the lazy lookup) does not slip through unnoticed. + """ + + async def test_client_caches_per_event_loop(self): + """First access builds the client; subsequent accesses in the same + running loop must return the cached instance. + """ + with patch(f"{MODULE}.create_async_agentex_client") as mock_factory: + mock_factory.side_effect = lambda **kwargs: MagicMock() + + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + + # Construction must not eagerly build the client (no running loop + # guarantee at module import time). + assert mock_factory.call_count == 0 + + c1 = processor.client + c2 = processor.client + c3 = processor.client + + assert mock_factory.call_count == 1, ( + f"Expected client to be built once per loop, but " + f"create_async_agentex_client was called {mock_factory.call_count} times" + ) + assert c1 is c2 is c3 + + async def test_client_keepalive_is_enabled(self): + """Regression guard: the per-loop client must use keepalive — the + whole reason for the per-loop cache. Verify max_keepalive_connections > 0. + """ + import httpx as _httpx + + captured_limits: list[_httpx.Limits] = [] + original_async_client = _httpx.AsyncClient + + def capture_limits(*args, **kwargs): + limits = kwargs.get("limits") + if limits is not None: + captured_limits.append(limits) + return original_async_client(*args, **kwargs) + + with patch(f"{MODULE}.create_async_agentex_client") as mock_factory, patch( + "httpx.AsyncClient", side_effect=capture_limits + ): + mock_factory.side_effect = lambda **kwargs: MagicMock() + + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + _ = processor.client + + assert len(captured_limits) == 1 + max_keepalive = captured_limits[0].max_keepalive_connections + assert max_keepalive is not None and max_keepalive > 0, ( + f"Agentex async client should have keepalive enabled, got " + f"max_keepalive_connections={max_keepalive}" + ) + + def test_cache_is_weakkeydict_and_evicts_dead_loops(self): + """Regression guard for the id()-reuse bug: the per-loop cache must + be a WeakKeyDictionary so a GC'd loop's entry is evicted. Otherwise + a new loop landing at the same memory address would reuse the dead + loop's client, reintroducing the "bound to a different event loop" + error the per-loop cache was built to prevent. + """ + import gc + + with patch(f"{MODULE}.create_async_agentex_client"): + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + + # Storage type itself: WeakKeyDictionary, not plain dict. + assert isinstance(processor._clients_by_loop, weakref.WeakKeyDictionary) + + # End-to-end check: insert under a loop, drop the loop, the entry + # must vanish after GC. + loop = asyncio.new_event_loop() + try: + processor._clients_by_loop[loop] = MagicMock() + assert len(processor._clients_by_loop) == 1 + finally: + loop.close() + del loop + gc.collect() + assert len(processor._clients_by_loop) == 0, ( + "WeakKeyDictionary should have evicted the dead loop's entry; " + "remaining keys would cause stale-client reuse on id() recycling." + ) diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 4614fe540..090cd9a09 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -1,6 +1,7 @@ from __future__ import annotations import uuid +import asyncio from datetime import UTC, datetime from unittest.mock import AsyncMock, MagicMock, patch @@ -129,19 +130,20 @@ def _make_processor(): processor = SGPAsyncTracingProcessor(_make_config()) - # Wire up the mock client after construction (constructor stores it) - processor.sgp_async_client = mock_async_client + # Force the per-loop cache to return the mock for whatever loop the + # test runs on, by stubbing _get_client directly. + processor._get_client = lambda: mock_async_client # type: ignore[method-assign] - return processor, mock_create_span + return processor, mock_create_span, mock_async_client def test_processor_holds_no_per_span_state(self): """Stateless processor must not retain any per-span dict between lifecycle events.""" - processor, _ = self._make_processor() + processor, _, _ = self._make_processor() assert not hasattr(processor, "_spans") async def test_span_lifecycle_produces_two_upserts(self): """Each span produces one upsert_batch call on start and one on end.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): span = _make_span() @@ -149,7 +151,7 @@ async def test_span_lifecycle_produces_two_upserts(self): span.end_time = datetime.now(UTC) await processor.on_span_end(span) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 2 + assert mock_client.spans.upsert_batch.call_count == 2 async def test_span_end_without_prior_start_still_upserts(self): """Cross-pod Temporal case: END activity lands on a pod that never saw START. @@ -157,7 +159,7 @@ async def test_span_end_without_prior_start_still_upserts(self): Today this used to be a silent no-op. After the stateless refactor it must still upsert a complete span via upsert_batch. """ - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): span = _make_span() @@ -165,13 +167,13 @@ async def test_span_end_without_prior_start_still_upserts(self): # No on_span_start — END lands here for the first time. await processor.on_span_end(span) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 1 - items = processor.sgp_async_client.spans.upsert_batch.call_args.kwargs["items"] + assert mock_client.spans.upsert_batch.call_count == 1 + items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == 1 async def test_sgp_span_input_and_output_propagated_on_end(self): """on_span_end should send the span's current input and output via upsert_batch.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() captured: list[MagicMock] = [] @@ -196,7 +198,7 @@ def capture_create_span(**kwargs): span.end_time = datetime.now(UTC) await processor.on_span_end(span) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 2 # start + end + assert mock_client.spans.upsert_batch.call_count == 2 # start + end # The end-time SGPSpan should have end_time populated. end_span = captured[-1] assert end_span.end_time is not None @@ -207,36 +209,167 @@ def capture_create_span(**kwargs): async def test_on_spans_start_sends_single_upsert_for_batch(self): """Given N spans at once, on_spans_start should make ONE upsert_batch HTTP call.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() n = 10 spans = [_make_span() for _ in range(n)] with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): await processor.on_spans_start(spans) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 1, ( + assert mock_client.spans.upsert_batch.call_count == 1, ( "Batched on_spans_start must make exactly one upsert_batch HTTP call" ) - items = processor.sgp_async_client.spans.upsert_batch.call_args.kwargs["items"] + items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == n + async def test_get_client_caches_per_event_loop(self): + """The processor must keep one client per event loop, and reuse it + across calls within the same loop. This is what enables connection + keepalive instead of paying a TLS handshake per span. + """ + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch( + f"{MODULE}.AsyncSGPClient" + ) as mock_sgp_cls: + mock_sgp_cls.side_effect = lambda **kwargs: MagicMock() + + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + # Construction should NOT eagerly build the client (no running + # loop guarantee at import time). + assert mock_sgp_cls.call_count == 0 + + c1 = processor._get_client() + c2 = processor._get_client() + c3 = processor._get_client() + + # First call builds the client; subsequent calls in the same + # loop return the cached one. + assert mock_sgp_cls.call_count == 1, ( + f"Expected client to be built once per loop, but AsyncSGPClient " + f"was called {mock_sgp_cls.call_count} times" + ) + assert c1 is c2 is c3 + + async def test_get_client_keepalive_is_enabled(self): + """Regression guard: the per-loop client must use keepalive (the whole + point of the per-loop cache). Verify max_keepalive_connections > 0. + """ + import httpx as _httpx + + captured_limits: list[_httpx.Limits] = [] + + original_async_client = _httpx.AsyncClient + + def capture_limits(*args, **kwargs): + limits = kwargs.get("limits") + if limits is not None: + captured_limits.append(limits) + return original_async_client(*args, **kwargs) + + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch( + f"{MODULE}.AsyncSGPClient" + ), patch("httpx.AsyncClient", side_effect=capture_limits): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + processor._get_client() + + assert len(captured_limits) == 1 + max_keepalive = captured_limits[0].max_keepalive_connections + assert max_keepalive is not None and max_keepalive > 0, ( + f"SGP async client should have keepalive enabled, got " + f"max_keepalive_connections={max_keepalive}" + ) + + def test_cache_is_weakkeydict_and_evicts_dead_loops(self): + """Regression guard for the id()-reuse bug: the per-loop cache must + be a WeakKeyDictionary so a GC'd loop's entry is evicted. Otherwise + a new loop landing at the same memory address would reuse the dead + loop's client, reintroducing the "bound to a different event loop" + error the per-loop cache was built to prevent. + """ + import gc + import weakref + + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.AsyncSGPClient"): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + # Storage type itself: WeakKeyDictionary, not plain dict. + assert isinstance(processor._clients_by_loop, weakref.WeakKeyDictionary) + + # End-to-end check: insert under a loop, drop the loop, the entry + # must vanish after GC. + loop = asyncio.new_event_loop() + try: + processor._clients_by_loop[loop] = MagicMock() + assert len(processor._clients_by_loop) == 1 + finally: + loop.close() + del loop + gc.collect() + assert len(processor._clients_by_loop) == 0, ( + "WeakKeyDictionary should have evicted the dead loop's entry; " + "remaining keys would cause stale-client reuse on id() recycling." + ) + + async def test_disabled_processor_returns_none_client(self): + """When config is missing api_key/account_id, _get_client must return + None and no HTTP client must be constructed.""" + from agentex.lib.types.tracing import SGPTracingProcessorConfig + + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch( + f"{MODULE}.AsyncSGPClient" + ) as mock_sgp_cls: + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor( + SGPTracingProcessorConfig(sgp_api_key="", sgp_account_id="") + ) + + assert processor._get_client() is None + assert mock_sgp_cls.call_count == 0 + async def test_on_spans_end_sends_single_upsert_for_batch(self): """Given N spans at once, on_spans_end should make ONE upsert_batch HTTP call.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() n = 10 spans = [_make_span() for _ in range(n)] with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): await processor.on_spans_start(spans) - processor.sgp_async_client.spans.upsert_batch.reset_mock() + mock_client.spans.upsert_batch.reset_mock() for span in spans: span.end_time = datetime.now(UTC) await processor.on_spans_end(spans) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 1, ( + assert mock_client.spans.upsert_batch.call_count == 1, ( "Batched on_spans_end must make exactly one upsert_batch HTTP call" ) - items = processor.sgp_async_client.spans.upsert_batch.call_args.kwargs["items"] + items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == n diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index 0d6b2fd0a..2e68cf88d 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -205,9 +205,7 @@ async def slow_start(span: Span) -> None: await queue.shutdown() - assert max_concurrency > 1, ( - f"Expected concurrent processing, but max concurrency was {max_concurrency}" - ) + assert max_concurrency > 1, f"Expected concurrent processing, but max concurrency was {max_concurrency}" async def test_batch_faster_than_serial(self): """Batched drain should be significantly faster than serial for slow processors.""" @@ -253,7 +251,7 @@ async def test_mixed_event_types_raise_assertion(self): ] try: - await AsyncSpanQueue._process_items(mixed) + await AsyncSpanQueue()._process_items(mixed) except AssertionError: return else: @@ -288,9 +286,7 @@ async def capture_starts(spans: list[Span]) -> None: await queue.shutdown() # on_spans_start must have been called exactly once with all 5 spans. - assert proc.on_spans_start.call_count == 1, ( - f"Expected one batched call, got {proc.on_spans_start.call_count}" - ) + assert proc.on_spans_start.call_count == 1, f"Expected one batched call, got {proc.on_spans_start.call_count}" assert received == [ids] async def test_batched_end_dispatch_single_call_per_drain(self): @@ -315,6 +311,231 @@ async def capture_ends(spans: list[Span]) -> None: assert received == [ids] +class TestAsyncSpanQueueLinger: + """The drain loop should linger briefly after the first item arrives so + that concurrently-emitted spans coalesce into one batch, instead of each + span producing its own size-1 drain cycle. + """ + + async def test_linger_coalesces_staggered_enqueues_into_one_batch(self): + """Spans enqueued a few ms apart should land in the SAME drain batch + when the linger window is wider than the gap between them. + """ + received: list[list[str]] = [] + + async def capture_starts(spans: list[Span]) -> None: + received.append([s.id for s in spans]) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=capture_starts) + proc.on_spans_end = AsyncMock() + + # Linger of 100ms; we enqueue 3 items 20ms apart, well inside the window. + queue = AsyncSpanQueue(linger_ms=100) + + for i in range(3): + queue.enqueue(SpanEventType.START, _make_span(f"span-{i}"), [proc]) + await asyncio.sleep(0.02) + + await queue.shutdown() + + # All three should arrive in one batched call thanks to the linger. + assert proc.on_spans_start.call_count == 1, ( + f"Expected one batch from linger-coalesced enqueues, got " + f"{proc.on_spans_start.call_count} batches: {received}" + ) + assert received == [["span-0", "span-1", "span-2"]] + + async def test_linger_zero_drains_immediately(self): + """With linger_ms=0, the drain loop should NOT wait — staggered + enqueues produce separate batches (back-compat with prior behavior). + """ + received: list[list[str]] = [] + + async def capture_starts(spans: list[Span]) -> None: + received.append([s.id for s in spans]) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=capture_starts) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(linger_ms=0) + + for i in range(3): + queue.enqueue(SpanEventType.START, _make_span(f"span-{i}"), [proc]) + # Give the drain loop time to pick up and process each one. + await asyncio.sleep(0.05) + + await queue.shutdown() + + # With no linger, each staggered enqueue produces its own batch. + assert proc.on_spans_start.call_count == 3, ( + f"Expected three size-1 batches without linger, got {proc.on_spans_start.call_count}: {received}" + ) + + async def test_linger_respects_batch_size_cap(self): + """The linger must not push batches over batch_size.""" + received: list[list[str]] = [] + + async def capture_starts(spans: list[Span]) -> None: + received.append([s.id for s in spans]) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=capture_starts) + proc.on_spans_end = AsyncMock() + + # Tight batch cap, linger wide enough to coalesce but not so large + # that the tail singleton stalls the test for hundreds of ms. + queue = AsyncSpanQueue(batch_size=3, linger_ms=50) + + ids = [f"span-{i}" for i in range(7)] + for i in ids: + queue.enqueue(SpanEventType.START, _make_span(i), [proc]) + + await queue.shutdown() + + # 7 spans / batch_size=3 ⇒ at least 3 batches (3, 3, 1). None should + # exceed the cap. + for batch in received: + assert len(batch) <= 3, f"Batch exceeded cap: {batch}" + assert sum(len(b) for b in received) == 7 + + +class _FakeHTTPError(Exception): + """Mimics an SGP/httpx status error: carries a ``status_code`` attribute.""" + + def __init__(self, status_code: int) -> None: + self.status_code = status_code + super().__init__(f"HTTP {status_code}") + + +class TestAsyncSpanQueueDropObservability: + """Silent span loss should be counted so it is measurable, and a bounded + queue should shed load deterministically instead of growing without limit. + """ + + async def test_full_queue_drops_are_counted(self): + release = asyncio.Event() + + async def block_first(spans: list[Span]) -> None: + # Block the drain on its first batch so the queue can fill behind it. + await release.wait() + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=block_first) + proc.on_spans_end = AsyncMock() + + # max_size=1, no linger: the drain pulls item-0 and blocks; item-1 fills + # the queue; items 2 and 3 are dropped. + queue = AsyncSpanQueue(max_size=1, linger_ms=0) + + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await asyncio.sleep(0.02) # let the drain pick up s0 and block + queue.enqueue(SpanEventType.START, _make_span("s1"), [proc]) + queue.enqueue(SpanEventType.START, _make_span("s2"), [proc]) + queue.enqueue(SpanEventType.START, _make_span("s3"), [proc]) + + assert queue.dropped_spans == 2, f"expected 2 dropped, got {queue.dropped_spans}" + + release.set() + await queue.shutdown() + + async def test_no_drops_under_normal_load(self): + proc = _make_processor() + queue = AsyncSpanQueue() + for i in range(5): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + await queue.shutdown() + assert queue.dropped_spans == 0 + + +class TestAsyncSpanQueueRetry: + """Transient HTTP failures (429/5xx) should be re-enqueued up to a bounded + number of attempts; auth/other errors must be dropped (and counted), never + retried. + """ + + async def test_retryable_status_is_reenqueued_and_eventually_succeeds(self): + attempts = 0 + + async def fail_then_succeed(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + if attempts == 1: + raise _FakeHTTPError(503) + # second attempt succeeds + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=fail_then_succeed) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 2, "503 should be retried once, then succeed" + assert queue.dropped_spans == 0, "successful retry must not count as a drop" + + async def test_non_retryable_status_is_dropped_not_retried(self): + attempts = 0 + + async def always_401(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + raise _FakeHTTPError(401) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=always_401) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 1, "401 is non-retryable — must be tried exactly once" + assert queue.dropped_spans == 1 + + async def test_non_http_exception_is_not_retried(self): + """A plain bug (no status_code) must not be retried into an infinite + loop — preserves the original drain-continues-on-error contract.""" + attempts = 0 + + async def boom(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + raise RuntimeError("bug, not transient") + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=boom) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 1 + assert queue.dropped_spans == 1 + + async def test_retryable_exhausts_attempts_then_drops(self): + attempts = 0 + + async def always_503(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + raise _FakeHTTPError(503) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=always_503) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 3, "should try up to max_retries times" + assert queue.dropped_spans == 1 + + class TestAsyncSpanQueueIntegration: async def test_integration_with_async_trace(self): call_log: list[tuple[str, str]] = [] From e1b31d91abadec572989b805592b788500d61994 Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Fri, 29 May 2026 00:16:21 -0400 Subject: [PATCH 5/9] feat(deps): bump openai-agents to >=0.14.3 for scale-sandbox oai_agents adapter (#375) Co-authored-by: Claude Opus 4.8 (1M context) --- .../tests/test_agent.py | 1 + pyproject.toml | 2 +- requirements-dev.lock | 2 +- requirements.lock | 2 +- uv.lock | 8 ++++---- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py index 73cf60db9..437a8f16c 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py @@ -1,3 +1,4 @@ +# ci: touch to re-run tutorial integration tests for the openai-agents>=0.14.3 bump """ Sample tests for AgentEx ACP agent. diff --git a/pyproject.toml b/pyproject.toml index 662f72ea9..c2411ffbd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "jinja2>=3.1.3,<4", "mcp>=1.4.1", "scale-gp>=0.1.0a59", - "openai-agents==0.14.1", + "openai-agents>=0.14.3,<0.15", "pydantic-ai-slim>=1.0,<2", "json_log_formatter>=1.1.1", "scale-gp-beta>=0.2.0", diff --git a/requirements-dev.lock b/requirements-dev.lock index b2263af59..dc7ee59b8 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -237,7 +237,7 @@ openai==2.30.0 # via agentex-sdk # via litellm # via openai-agents -openai-agents==0.14.1 +openai-agents==0.14.8 # via agentex-sdk opentelemetry-api==1.40.0 # via agentex-sdk diff --git a/requirements.lock b/requirements.lock index 986bff99b..daea78837 100644 --- a/requirements.lock +++ b/requirements.lock @@ -215,7 +215,7 @@ openai==2.30.0 # via agentex-sdk # via litellm # via openai-agents -openai-agents==0.14.1 +openai-agents==0.14.8 # via agentex-sdk opentelemetry-api==1.40.0 # via agentex-sdk diff --git a/uv.lock b/uv.lock index c87d4cc48..9615e7f9e 100644 --- a/uv.lock +++ b/uv.lock @@ -97,7 +97,7 @@ requires-dist = [ { name = "litellm", specifier = ">=1.83.7,<2" }, { name = "mcp", extras = ["cli"], specifier = ">=1.4.1" }, { name = "openai", specifier = ">=2.2,<3" }, - { name = "openai-agents", specifier = "==0.14.1" }, + { name = "openai-agents", specifier = ">=0.14.3,<0.15" }, { name = "opentelemetry-api", specifier = ">=1.20.0" }, { name = "opentelemetry-sdk", specifier = ">=1.20.0" }, { name = "pydantic", specifier = ">=2.0.0,<3" }, @@ -1561,7 +1561,7 @@ wheels = [ [[package]] name = "openai-agents" -version = "0.14.1" +version = "0.14.8" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "griffelib" }, @@ -1573,9 +1573,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/16/ae/8af117a3a4a06ad72b4a60759fbab98a7158f0eb36c47d90d5d883610781/openai_agents-0.14.1.tar.gz", hash = "sha256:7baac1b4c0a171d32c82eb6e9c207fd010e0d08dd39a5e8cd762ee27969457dc", size = 5256058, upload-time = "2026-04-15T19:26:52.42Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d5/8a/d36ab647f05e790ec97dda9e4c0eb39d8840269d6a5194887b5dec92bd0d/openai_agents-0.14.8.tar.gz", hash = "sha256:fe1cb58b4150a07292a94f15d8fd5217ee9195bd6bcd8a6a46fdb1d9b08a70b7", size = 5314520, upload-time = "2026-04-29T03:40:07.6Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ac/1c/98efddc1e10b23a397ca901b6f6d251908be2e6a30a92e0c25fb5dff6348/openai_agents-0.14.1-py3-none-any.whl", hash = "sha256:09ea6e9c49c785dc490e071e17ea3a0b0cce9c5f97daeb50ee6dc8fb490dc817", size = 797518, upload-time = "2026-04-15T19:26:50.106Z" }, + { url = "https://files.pythonhosted.org/packages/af/6e/1e9adcedcde7b163579b88a68f765a4915be4ead0713270386d9432cfd2f/openai_agents-0.14.8-py3-none-any.whl", hash = "sha256:2937ef582ccaa45d59e89839ed8948cb2a6d808bc9940f0881793c21f37f7776", size = 817332, upload-time = "2026-04-29T03:40:05.68Z" }, ] [[package]] From 0a2418cc9f9b06e3bdc46099106e50d226412fa0 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Fri, 29 May 2026 10:30:08 -0700 Subject: [PATCH 6/9] fix(tutorials): restore tutorial CI deps after agentex-sdk 0.11.5 (pytest + debugpy) (#379) --- examples/tutorials/run_agent_test.sh | 13 ++++++++++--- src/agentex/lib/utils/debug.py | 9 +++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/examples/tutorials/run_agent_test.sh b/examples/tutorials/run_agent_test.sh index d785ab088..c6fd17960 100755 --- a/examples/tutorials/run_agent_test.sh +++ b/examples/tutorials/run_agent_test.sh @@ -259,8 +259,15 @@ run_test() { cd "$tutorial_path" || return 1 - # Run the tests with retry mechanism - local -a pytest_cmd=("uv" "run" "pytest") + # Run the tests with retry mechanism. + # + # pytest is brought in explicitly via --with: the tutorials only list it + # under an optional `dev` extra (which `uv run` does not install), and it + # used to be pulled in transitively by agentex-sdk's runtime deps. Once + # agentex-sdk 0.11.5 dropped pytest as a runtime dep, `uv run pytest` could + # no longer find it ("Failed to spawn: pytest"). Requesting it directly is + # robust across all tutorials regardless of how each declares test deps. + local -a pytest_cmd=("uv" "run" "--with" "pytest" "--with" "pytest-asyncio" "pytest") if [ "$BUILD_CLI" = true ]; then local wheel_file wheel_file=$(ls /home/runner/work/*/*/dist/agentex_sdk-*.whl 2>/dev/null | head -n1) @@ -268,7 +275,7 @@ run_test() { wheel_file=$(ls "${SCRIPT_DIR}/../../dist/agentex_sdk-*.whl" 2>/dev/null | head -n1) fi if [[ -n "$wheel_file" ]]; then - pytest_cmd=("uv" "run" "--with" "$wheel_file" "pytest") + pytest_cmd=("uv" "run" "--with" "$wheel_file" "--with" "pytest" "--with" "pytest-asyncio" "pytest") fi fi diff --git a/src/agentex/lib/utils/debug.py b/src/agentex/lib/utils/debug.py index 69cbf6b19..831199f9a 100644 --- a/src/agentex/lib/utils/debug.py +++ b/src/agentex/lib/utils/debug.py @@ -6,8 +6,6 @@ import os -import debugpy # type: ignore - from agentex.lib.utils.logging import make_logger logger = make_logger(__name__) @@ -30,6 +28,13 @@ def setup_debug_if_enabled() -> None: Any exception from debugpy setup (will bubble up naturally) """ if os.getenv("AGENTEX_DEBUG_ENABLED") == "true": + # Imported lazily: debugpy is a development-only tool, so a normal + # worker startup must not require it to be installed. Importing it at + # module scope forced it onto every worker (it used to be satisfied + # transitively via ipykernel; that dep was dropped in agentex-sdk + # 0.11.5, surfacing this as "No module named 'debugpy'"). + import debugpy # type: ignore + debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5678")) debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "worker") wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true" From d04624e6899e43a0429ef2deeb84509265b9f636 Mon Sep 17 00:00:00 2001 From: Matteo Librizzi Date: Fri, 29 May 2026 18:36:33 +0100 Subject: [PATCH 7/9] feat(lib): expose data_converter kwarg on AgentexWorker and Temporal client APIs (#372) --- .../core/clients/temporal/temporal_client.py | 33 +++- .../lib/core/clients/temporal/utils.py | 43 +++-- .../lib/core/temporal/workers/worker.py | 37 ++-- src/agentex/lib/sdk/fastacp/fastacp.py | 2 + .../lib/sdk/fastacp/impl/temporal_acp.py | 16 +- src/agentex/lib/types/fastacp.py | 24 ++- tests/lib/test_payload_codec.py | 171 +++++++++++++++++- 7 files changed, 288 insertions(+), 38 deletions(-) diff --git a/src/agentex/lib/core/clients/temporal/temporal_client.py b/src/agentex/lib/core/clients/temporal/temporal_client.py index f44648da2..ca17d30ff 100644 --- a/src/agentex/lib/core/clients/temporal/temporal_client.py +++ b/src/agentex/lib/core/clients/temporal/temporal_client.py @@ -7,7 +7,7 @@ from temporalio.client import Client, WorkflowExecutionStatus from temporalio.common import RetryPolicy as TemporalRetryPolicy, WorkflowIDReusePolicy from temporalio.service import RPCError, RPCStatusCode -from temporalio.converter import PayloadCodec +from temporalio.converter import PayloadCodec, DataConverter from agentex.lib.utils.logging import make_logger from agentex.lib.utils.model_utils import BaseModel @@ -78,11 +78,16 @@ class TemporalClient: def __init__( - self, temporal_client: Client | None = None, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None + self, + temporal_client: Client | None = None, + plugins: list[Any] = [], + payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ): self._client: Client | None = temporal_client self._plugins = plugins self._payload_codec = payload_codec + self._data_converter = data_converter @property def client(self) -> Client: @@ -92,7 +97,13 @@ def client(self) -> Client: return self._client @classmethod - async def create(cls, temporal_address: str, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None): + async def create( + cls, + temporal_address: str, + plugins: list[Any] = [], + payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, + ): if temporal_address in [ "false", "False", @@ -105,8 +116,13 @@ async def create(cls, temporal_address: str, plugins: list[Any] = [], payload_co ]: _client = None else: - _client = await get_temporal_client(temporal_address, plugins=plugins, payload_codec=payload_codec) - return cls(_client, plugins, payload_codec) + _client = await get_temporal_client( + temporal_address, + plugins=plugins, + payload_codec=payload_codec, + data_converter=data_converter, + ) + return cls(_client, plugins, payload_codec, data_converter) async def setup(self, temporal_address: str): self._client = await self._get_temporal_client(temporal_address=temporal_address) @@ -124,7 +140,12 @@ async def _get_temporal_client(self, temporal_address: str) -> Client | None: ]: return None else: - return await get_temporal_client(temporal_address, plugins=self._plugins, payload_codec=self._payload_codec) + return await get_temporal_client( + temporal_address, + plugins=self._plugins, + payload_codec=self._payload_codec, + data_converter=self._data_converter, + ) async def start_workflow( self, diff --git a/src/agentex/lib/core/clients/temporal/utils.py b/src/agentex/lib/core/clients/temporal/utils.py index 8c2241c62..95319720a 100644 --- a/src/agentex/lib/core/clients/temporal/utils.py +++ b/src/agentex/lib/core/clients/temporal/utils.py @@ -6,7 +6,7 @@ from temporalio.client import Client, Plugin as ClientPlugin from temporalio.worker import Interceptor from temporalio.runtime import Runtime, TelemetryConfig, OpenTelemetryConfig -from temporalio.converter import PayloadCodec +from temporalio.converter import PayloadCodec, DataConverter from temporalio.contrib.pydantic import pydantic_data_converter # class DateTimeJSONEncoder(AdvancedJSONEncoder): @@ -86,6 +86,7 @@ async def get_temporal_client( metrics_url: str | None = None, plugins: list[Any] = [], payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ) -> Client: """ Create a Temporal client with plugin integration. @@ -94,7 +95,14 @@ async def get_temporal_client( temporal_address: Temporal server address metrics_url: Optional metrics endpoint URL plugins: List of Temporal plugins to include - payload_codec: Optional payload codec for encoding/decoding payloads (e.g. encryption, compression) + payload_codec: Optional payload codec for encoding/decoding payloads + (e.g. encryption, compression). Cannot be combined with the + OpenAIAgentsPlugin via this kwarg — see ``data_converter``. + data_converter: Optional pre-built ``DataConverter``. Use this when + composing the OpenAIAgentsPlugin with a payload codec: build a + ``DataConverter(payload_converter_class=OpenAIPayloadConverter, + payload_codec=...)`` and pass it here. Mutually exclusive with + ``payload_codec``. Returns: Configured Temporal client @@ -103,29 +111,40 @@ async def get_temporal_client( if plugins: validate_client_plugins(plugins) - # Check if OpenAI plugin is present - it needs to configure its own data converter + if payload_codec is not None and data_converter is not None: + raise ValueError( + "Pass payload_codec inside `data_converter` " + "(DataConverter(..., payload_codec=...)) instead of as a separate " + "kwarg. Specifying both is ambiguous." + ) + # Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents from temporalio.contrib.openai_agents import OpenAIAgentsPlugin has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])) - if has_openai_plugin and payload_codec is not None: + if has_openai_plugin and payload_codec is not None and data_converter is None: raise ValueError( - "payload_codec is not supported alongside OpenAIAgentsPlugin: the plugin " - "installs its own data converter and the codec would be silently ignored, " - "leaving payloads unencoded. Remove one or the other." + "payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would " + "be silently dropped by the plugin's data-converter transformer. " + "Build a DataConverter explicitly with " + "`payload_converter_class=OpenAIPayloadConverter` (or a subclass) " + "and `payload_codec=...`, then pass it via the `data_converter` " + "kwarg instead." ) - connect_kwargs = { + connect_kwargs: dict[str, Any] = { "target_host": temporal_address, "plugins": plugins, } - if not has_openai_plugin: - data_converter = pydantic_data_converter - if payload_codec: - data_converter = dataclasses.replace(data_converter, payload_codec=payload_codec) + if data_converter is not None: connect_kwargs["data_converter"] = data_converter + elif not has_openai_plugin: + dc = pydantic_data_converter + if payload_codec: + dc = dataclasses.replace(dc, payload_codec=payload_codec) + connect_kwargs["data_converter"] = dc if not metrics_url: client = await Client.connect(**connect_kwargs) diff --git a/src/agentex/lib/core/temporal/workers/worker.py b/src/agentex/lib/core/temporal/workers/worker.py index 2e8591242..253b6759f 100644 --- a/src/agentex/lib/core/temporal/workers/worker.py +++ b/src/agentex/lib/core/temporal/workers/worker.py @@ -95,35 +95,45 @@ async def get_temporal_client( metrics_url: str | None = None, plugins: list = [], payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ) -> Client: if plugins != []: # We don't need to validate the plugins if they are empty _validate_plugins(plugins) - # Check if OpenAI plugin is present - it needs to configure its own data converter + if payload_codec is not None and data_converter is not None: + raise ValueError( + "Pass payload_codec inside `data_converter` " + "(DataConverter(..., payload_codec=...)) instead of as a separate " + "kwarg. Specifying both is ambiguous." + ) + # Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents from temporalio.contrib.openai_agents import OpenAIAgentsPlugin has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])) - if has_openai_plugin and payload_codec is not None: + if has_openai_plugin and payload_codec is not None and data_converter is None: raise ValueError( - "payload_codec is not supported alongside OpenAIAgentsPlugin: the plugin " - "installs its own data converter and the codec would be silently ignored, " - "leaving payloads unencoded. Remove one or the other." + "payload_codec passed as a kwarg alongside OpenAIAgentsPlugin would " + "be silently dropped by the plugin's data-converter transformer. " + "Build a DataConverter explicitly with " + "`payload_converter_class=OpenAIPayloadConverter` (or a subclass) " + "and `payload_codec=...`, then pass it via the `data_converter` " + "kwarg instead." ) - # Build connection kwargs - connect_kwargs = { + connect_kwargs: dict[str, Any] = { "target_host": temporal_address, "plugins": plugins, } - # Only set data_converter if OpenAI plugin is not present - if not has_openai_plugin: - data_converter = custom_data_converter - if payload_codec: - data_converter = dataclasses.replace(data_converter, payload_codec=payload_codec) + if data_converter is not None: connect_kwargs["data_converter"] = data_converter + elif not has_openai_plugin: + dc = custom_data_converter + if payload_codec: + dc = dataclasses.replace(dc, payload_codec=payload_codec) + connect_kwargs["data_converter"] = dc if not metrics_url: client = await Client.connect(**connect_kwargs) @@ -145,6 +155,7 @@ def __init__( interceptors: list = [], metrics_url: str | None = None, payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ): self.task_queue = task_queue self.activity_handles = [] @@ -159,6 +170,7 @@ def __init__( self.interceptors = interceptors self.metrics_url = metrics_url self.payload_codec = payload_codec + self.data_converter = data_converter @overload async def run( @@ -195,6 +207,7 @@ async def run( plugins=self.plugins, metrics_url=self.metrics_url, payload_codec=self.payload_codec, + data_converter=self.data_converter, ) # Enable debug mode if AgentEx debug is enabled (disables deadlock detection) diff --git a/src/agentex/lib/sdk/fastacp/fastacp.py b/src/agentex/lib/sdk/fastacp/fastacp.py index fbd4f0511..42859793d 100644 --- a/src/agentex/lib/sdk/fastacp/fastacp.py +++ b/src/agentex/lib/sdk/fastacp/fastacp.py @@ -65,6 +65,8 @@ def create_async_acp(config: AsyncACPConfig, **kwargs) -> BaseACPServer: temporal_config["interceptors"] = config.interceptors # type: ignore[attr-defined] if hasattr(config, "payload_codec"): temporal_config["payload_codec"] = config.payload_codec # type: ignore[attr-defined] + if hasattr(config, "data_converter"): + temporal_config["data_converter"] = config.data_converter # type: ignore[attr-defined] return implementation_class.create(**temporal_config) else: return implementation_class.create(**kwargs) diff --git a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py index 54fe72e6c..69d843720 100644 --- a/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py +++ b/src/agentex/lib/sdk/fastacp/impl/temporal_acp.py @@ -4,7 +4,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI -from temporalio.converter import PayloadCodec +from temporalio.converter import PayloadCodec, DataConverter from agentex.protocol.acp import ( SendEventParams, @@ -33,6 +33,7 @@ def __init__( plugins: list[Any] | None = None, interceptors: list[Any] | None = None, payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ): super().__init__() self._temporal_task_service = temporal_task_service @@ -40,6 +41,7 @@ def __init__( self._plugins = plugins or [] self._interceptors = interceptors or [] self._payload_codec = payload_codec + self._data_converter = data_converter @classmethod @override @@ -49,12 +51,17 @@ def create( plugins: list[Any] | None = None, interceptors: list[Any] | None = None, payload_codec: PayloadCodec | None = None, + data_converter: DataConverter | None = None, ) -> "TemporalACP": logger.info("Initializing TemporalACP instance") # Create instance without temporal client initially temporal_acp = cls( - temporal_address=temporal_address, plugins=plugins, interceptors=interceptors, payload_codec=payload_codec + temporal_address=temporal_address, + plugins=plugins, + interceptors=interceptors, + payload_codec=payload_codec, + data_converter=data_converter, ) temporal_acp._setup_handlers() logger.info("TemporalACP instance initialized now") @@ -71,7 +78,10 @@ async def lifespan(app: FastAPI): if self._temporal_task_service is None: env_vars = EnvironmentVariables.refresh() temporal_client = await TemporalClient.create( - temporal_address=self._temporal_address, plugins=self._plugins, payload_codec=self._payload_codec + temporal_address=self._temporal_address, + plugins=self._plugins, + payload_codec=self._payload_codec, + data_converter=self._data_converter, ) self._temporal_task_service = TemporalTaskService( temporal_client=temporal_client, diff --git a/src/agentex/lib/types/fastacp.py b/src/agentex/lib/types/fastacp.py index e11091e93..493ca5f11 100644 --- a/src/agentex/lib/types/fastacp.py +++ b/src/agentex/lib/types/fastacp.py @@ -2,7 +2,7 @@ from typing import Any, Literal -from pydantic import Field, BaseModel, field_validator +from pydantic import Field, BaseModel, field_validator, model_validator from agentex.lib.core.clients.temporal.utils import validate_client_plugins, validate_worker_interceptors @@ -56,7 +56,16 @@ class TemporalACPConfig(AsyncACPConfig): encoding/decoding payloads (e.g. encryption, compression). NOTE: this only configures the ACP (client) side. The worker side must be configured separately via ``AgentexWorker(payload_codec=...)`` - with the SAME codec, or decode will fail at runtime. + with the SAME codec, or decode will fail at runtime. Cannot be + combined with ``OpenAIAgentsPlugin``; use ``data_converter`` + instead in that case. + data_converter: Optional pre-built ``temporalio.converter.DataConverter``. + Use this when composing the ``OpenAIAgentsPlugin`` with a payload + codec: build a ``DataConverter(payload_converter_class= + OpenAIPayloadConverter, payload_codec=...)`` and pass it here. + Mutually exclusive with ``payload_codec``. The worker side must + be configured separately via ``AgentexWorker(data_converter=...)`` + with the SAME converter, or decode will fail at runtime. """ type: Literal["temporal"] = Field(default="temporal", frozen=True) @@ -64,6 +73,7 @@ class TemporalACPConfig(AsyncACPConfig): plugins: list[Any] = Field(default=[], frozen=True) interceptors: list[Any] = Field(default=[], frozen=True) payload_codec: Any = Field(default=None, frozen=True) + data_converter: Any = Field(default=None, frozen=True) @field_validator("plugins") @classmethod @@ -79,6 +89,16 @@ def validate_interceptors(cls, v: list[Any]) -> list[Any]: validate_worker_interceptors(v) return v + @model_validator(mode="after") + def _validate_codec_and_data_converter_mutually_exclusive(self) -> "TemporalACPConfig": + if self.payload_codec is not None and self.data_converter is not None: + raise ValueError( + "Pass payload_codec inside `data_converter` " + "(DataConverter(..., payload_codec=...)) instead of as a separate " + "field. Specifying both is ambiguous." + ) + return self + class AsyncBaseACPConfig(AsyncACPConfig): """Configuration for AsyncBaseACP implementation diff --git a/tests/lib/test_payload_codec.py b/tests/lib/test_payload_codec.py index bb2b24228..59736dc21 100644 --- a/tests/lib/test_payload_codec.py +++ b/tests/lib/test_payload_codec.py @@ -5,7 +5,11 @@ import pytest from temporalio.client import Client, Plugin as ClientPlugin -from temporalio.converter import PayloadCodec +from temporalio.converter import ( + PayloadCodec, + DataConverter, + DefaultPayloadConverter, +) from temporalio.contrib.pydantic import pydantic_data_converter @@ -68,6 +72,28 @@ async def test_create_propagates_codec_to_get_temporal_client(self): mock_get.assert_awaited_once() assert mock_get.await_args.kwargs["payload_codec"] is codec + def test_init_stores_data_converter(self): + from agentex.lib.core.clients.temporal.temporal_client import TemporalClient + + dc = DataConverter(payload_codec=_NoopCodec()) + client = TemporalClient(data_converter=dc) + assert client._data_converter is dc + + def test_init_default_data_converter_is_none(self): + from agentex.lib.core.clients.temporal.temporal_client import TemporalClient + + assert TemporalClient()._data_converter is None + + async def test_create_propagates_data_converter_to_get_temporal_client(self): + import agentex.lib.core.clients.temporal.temporal_client as module + + dc = DataConverter(payload_codec=_NoopCodec()) + with patch.object(module, "get_temporal_client", new=AsyncMock(return_value=object())) as mock_get: + await module.TemporalClient.create(temporal_address="localhost:7233", plugins=[], data_converter=dc) + + mock_get.assert_awaited_once() + assert mock_get.await_args.kwargs["data_converter"] is dc + class TestGetTemporalClientUtils: async def test_no_codec_uses_pydantic_data_converter_unchanged(self): @@ -96,7 +122,7 @@ async def test_codec_with_openai_plugin_raises(self): codec = _NoopCodec() with _patch_openai_plugin(), _mock_connect() as mock_connect: - with pytest.raises(ValueError, match="payload_codec is not supported alongside OpenAIAgentsPlugin"): + with pytest.raises(ValueError, match="silently dropped by the plugin's data-converter transformer"): await get_temporal_client( temporal_address="localhost:7233", plugins=[_FakeOpenAIPlugin()], @@ -112,6 +138,42 @@ async def test_openai_plugin_without_codec_omits_data_converter(self): assert "data_converter" not in mock_connect.await_args.kwargs + async def test_data_converter_passthrough_with_openai_plugin(self): + from agentex.lib.core.clients.temporal.utils import get_temporal_client + + dc = DataConverter(payload_codec=_NoopCodec()) + with _patch_openai_plugin(), _mock_connect() as mock_connect: + await get_temporal_client( + temporal_address="localhost:7233", + plugins=[_FakeOpenAIPlugin()], + data_converter=dc, + ) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_data_converter_passthrough_without_openai_plugin(self): + from agentex.lib.core.clients.temporal.utils import get_temporal_client + + dc = DataConverter(payload_converter_class=DefaultPayloadConverter) + with _mock_connect() as mock_connect: + await get_temporal_client(temporal_address="localhost:7233", data_converter=dc) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_codec_and_data_converter_together_raises(self): + from agentex.lib.core.clients.temporal.utils import get_temporal_client + + codec = _NoopCodec() + dc = DataConverter(payload_codec=codec) + with _mock_connect() as mock_connect: + with pytest.raises(ValueError, match="Pass payload_codec inside `data_converter`"): + await get_temporal_client( + temporal_address="localhost:7233", + payload_codec=codec, + data_converter=dc, + ) + mock_connect.assert_not_awaited() + class TestGetTemporalClientWorker: async def test_no_codec_uses_custom_data_converter_unchanged(self): @@ -140,7 +202,7 @@ async def test_codec_with_openai_plugin_raises(self): codec = _NoopCodec() with _patch_openai_plugin(), _mock_connect() as mock_connect: - with pytest.raises(ValueError, match="payload_codec is not supported alongside OpenAIAgentsPlugin"): + with pytest.raises(ValueError, match="silently dropped by the plugin's data-converter transformer"): await get_temporal_client( temporal_address="localhost:7233", plugins=[_FakeOpenAIPlugin()], @@ -156,6 +218,42 @@ async def test_openai_plugin_without_codec_omits_data_converter(self): assert "data_converter" not in mock_connect.await_args.kwargs + async def test_data_converter_passthrough_with_openai_plugin(self): + from agentex.lib.core.temporal.workers.worker import get_temporal_client + + dc = DataConverter(payload_codec=_NoopCodec()) + with _patch_openai_plugin(), _mock_connect() as mock_connect: + await get_temporal_client( + temporal_address="localhost:7233", + plugins=[_FakeOpenAIPlugin()], + data_converter=dc, + ) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_data_converter_passthrough_without_openai_plugin(self): + from agentex.lib.core.temporal.workers.worker import get_temporal_client + + dc = DataConverter(payload_converter_class=DefaultPayloadConverter) + with _mock_connect() as mock_connect: + await get_temporal_client(temporal_address="localhost:7233", data_converter=dc) + + assert mock_connect.await_args.kwargs["data_converter"] is dc + + async def test_codec_and_data_converter_together_raises(self): + from agentex.lib.core.temporal.workers.worker import get_temporal_client + + codec = _NoopCodec() + dc = DataConverter(payload_codec=codec) + with _mock_connect() as mock_connect: + with pytest.raises(ValueError, match="Pass payload_codec inside `data_converter`"): + await get_temporal_client( + temporal_address="localhost:7233", + payload_codec=codec, + data_converter=dc, + ) + mock_connect.assert_not_awaited() + class TestAgentexWorkerCodec: def test_worker_stores_payload_codec(self): @@ -171,6 +269,19 @@ def test_worker_default_payload_codec_is_none(self): worker = AgentexWorker(task_queue="test-queue", health_check_port=80) assert worker.payload_codec is None + def test_worker_stores_data_converter(self): + from agentex.lib.core.temporal.workers.worker import AgentexWorker + + dc = DataConverter(payload_codec=_NoopCodec()) + worker = AgentexWorker(task_queue="test-queue", health_check_port=80, data_converter=dc) + assert worker.data_converter is dc + + def test_worker_default_data_converter_is_none(self): + from agentex.lib.core.temporal.workers.worker import AgentexWorker + + worker = AgentexWorker(task_queue="test-queue", health_check_port=80) + assert worker.data_converter is None + class TestTemporalACPCodec: def test_create_stores_payload_codec(self): @@ -186,6 +297,19 @@ def test_create_default_payload_codec_is_none(self): acp = TemporalACP.create(temporal_address="localhost:7233") assert acp._payload_codec is None + def test_create_stores_data_converter(self): + from agentex.lib.sdk.fastacp.impl.temporal_acp import TemporalACP + + dc = DataConverter(payload_codec=_NoopCodec()) + acp = TemporalACP.create(temporal_address="localhost:7233", data_converter=dc) + assert acp._data_converter is dc + + def test_create_default_data_converter_is_none(self): + from agentex.lib.sdk.fastacp.impl.temporal_acp import TemporalACP + + acp = TemporalACP.create(temporal_address="localhost:7233") + assert acp._data_converter is None + class TestFastACPConfigCodec: def test_config_default_codec_is_none(self): @@ -218,3 +342,44 @@ def fake_create(**kwargs): FastACP.create("async", config=config) assert captured.get("payload_codec") is codec + + def test_config_default_data_converter_is_none(self): + from agentex.lib.types.fastacp import TemporalACPConfig + + assert TemporalACPConfig().data_converter is None + + def test_config_accepts_data_converter(self): + from agentex.lib.types.fastacp import TemporalACPConfig + + dc = DataConverter(payload_codec=_NoopCodec()) + assert TemporalACPConfig(data_converter=dc).data_converter is dc + + def test_config_rejects_codec_and_data_converter_together(self): + from pydantic import ValidationError + + from agentex.lib.types.fastacp import TemporalACPConfig + + codec = _NoopCodec() + dc = DataConverter(payload_codec=codec) + with pytest.raises(ValidationError, match="Pass payload_codec inside `data_converter`"): + TemporalACPConfig(payload_codec=codec, data_converter=dc) + + def test_fastacp_forwards_data_converter_from_config(self): + from agentex.lib.types.fastacp import TemporalACPConfig + from agentex.lib.sdk.fastacp.fastacp import FastACP + + dc = DataConverter(payload_codec=_NoopCodec()) + config = TemporalACPConfig(data_converter=dc) + captured: dict[str, Any] = {} + + def fake_create(**kwargs): + captured.update(kwargs) + return object() + + with patch( + "agentex.lib.sdk.fastacp.impl.temporal_acp.TemporalACP.create", + side_effect=fake_create, + ): + FastACP.create("async", config=config) + + assert captured.get("data_converter") is dc From ab5a7d9732a56d47efad469675c7630046106ef6 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Fri, 29 May 2026 14:57:05 -0400 Subject: [PATCH 8/9] chore: back-merge release 0.11.5 into next (#381) --- .release-please-manifest.json | 2 +- CHANGELOG.md | 24 ++++++++++++++++++++++++ pyproject.toml | 2 +- src/agentex/_version.py | 2 +- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/.release-please-manifest.json b/.release-please-manifest.json index d1eff5521..5a36a4131 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "0.11.4" + ".": "0.11.5" } diff --git a/CHANGELOG.md b/CHANGELOG.md index 35cf6735e..4088d956f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,29 @@ # Changelog +## 0.11.5 (2026-05-29) + +Full Changelog: [v0.11.4...v0.11.5](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.4...v0.11.5) + +### Features + +* **api:** add cleaned_at field to task response types ([38ed338](https://github.com/scaleapi/scale-agentex-python/commit/38ed3384094f7f07f6b2482489f457fd1dc4f76d)) +* **deps:** bump openai-agents to >=0.14.3 for scale-sandbox oai_agents adapter ([#375](https://github.com/scaleapi/scale-agentex-python/issues/375)) ([e1b31d9](https://github.com/scaleapi/scale-agentex-python/commit/e1b31d91abadec572989b805592b788500d61994)) + + +### Performance Improvements + +* **tracing:** span queue linger + per-loop httpx keepalive ([#362](https://github.com/scaleapi/scale-agentex-python/issues/362)) ([feec842](https://github.com/scaleapi/scale-agentex-python/commit/feec8426f79e9f02533451d44997717655fd33f2)) + + +### Chores + +* **deps:** drop unused runtime deps and exclude tests from wheel ([#367](https://github.com/scaleapi/scale-agentex-python/issues/367)) ([f4303d1](https://github.com/scaleapi/scale-agentex-python/commit/f4303d1e7211783d19beca6554e44eb73bb29c42)) + + +### Refactors + +* **types:** promote protocol types to agentex.protocol.* ([#371](https://github.com/scaleapi/scale-agentex-python/issues/371)) ([6f1c14f](https://github.com/scaleapi/scale-agentex-python/commit/6f1c14fd61077da52038361642a9fbc4a0a56c8b)) + ## 0.11.4 (2026-05-26) Full Changelog: [v0.11.3...v0.11.4](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.3...v0.11.4) diff --git a/pyproject.toml b/pyproject.toml index c2411ffbd..7cf27ff4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agentex-sdk" -version = "0.11.4" +version = "0.11.5" description = "The official Python library for the agentex API" dynamic = ["readme"] license = "Apache-2.0" diff --git a/src/agentex/_version.py b/src/agentex/_version.py index 4efc70f7f..6902d0b63 100644 --- a/src/agentex/_version.py +++ b/src/agentex/_version.py @@ -1,4 +1,4 @@ # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. __title__ = "agentex" -__version__ = "0.11.4" # x-release-please-version +__version__ = "0.11.5" # x-release-please-version From a759611adb26d181b440c7cbb71262feec70b346 Mon Sep 17 00:00:00 2001 From: "stainless-app[bot]" <142633134+stainless-app[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 18:57:29 +0000 Subject: [PATCH 9/9] release: 0.11.6 --- .release-please-manifest.json | 2 +- CHANGELOG.md | 31 +++++++++++++++++++++++++++++++ pyproject.toml | 2 +- src/agentex/_version.py | 2 +- 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/.release-please-manifest.json b/.release-please-manifest.json index 5a36a4131..2045b8264 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "0.11.5" + ".": "0.11.6" } diff --git a/CHANGELOG.md b/CHANGELOG.md index 4088d956f..75bc4756f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,36 @@ # Changelog +## 0.11.6 (2026-05-29) + +Full Changelog: [v0.11.5...v0.11.6](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.5...v0.11.6) + +### Features + +* **api:** add cleaned_at field to task response types ([38ed338](https://github.com/scaleapi/scale-agentex-python/commit/38ed3384094f7f07f6b2482489f457fd1dc4f76d)) +* **deps:** bump openai-agents to >=0.14.3 for scale-sandbox oai_agents adapter ([#375](https://github.com/scaleapi/scale-agentex-python/issues/375)) ([e1b31d9](https://github.com/scaleapi/scale-agentex-python/commit/e1b31d91abadec572989b805592b788500d61994)) +* **lib:** expose data_converter kwarg on AgentexWorker and Temporal client APIs ([#372](https://github.com/scaleapi/scale-agentex-python/issues/372)) ([d04624e](https://github.com/scaleapi/scale-agentex-python/commit/d04624e6899e43a0429ef2deeb84509265b9f636)) + + +### Bug Fixes + +* **tutorials:** restore tutorial CI deps after agentex-sdk 0.11.5 (pytest + debugpy) ([#379](https://github.com/scaleapi/scale-agentex-python/issues/379)) ([0a2418c](https://github.com/scaleapi/scale-agentex-python/commit/0a2418cc9f9b06e3bdc46099106e50d226412fa0)) + + +### Performance Improvements + +* **tracing:** span queue linger + per-loop httpx keepalive ([#362](https://github.com/scaleapi/scale-agentex-python/issues/362)) ([feec842](https://github.com/scaleapi/scale-agentex-python/commit/feec8426f79e9f02533451d44997717655fd33f2)) + + +### Chores + +* back-merge release 0.11.5 into next ([#381](https://github.com/scaleapi/scale-agentex-python/issues/381)) ([ab5a7d9](https://github.com/scaleapi/scale-agentex-python/commit/ab5a7d9732a56d47efad469675c7630046106ef6)) +* **deps:** drop unused runtime deps and exclude tests from wheel ([#367](https://github.com/scaleapi/scale-agentex-python/issues/367)) ([f4303d1](https://github.com/scaleapi/scale-agentex-python/commit/f4303d1e7211783d19beca6554e44eb73bb29c42)) + + +### Refactors + +* **types:** promote protocol types to agentex.protocol.* ([#371](https://github.com/scaleapi/scale-agentex-python/issues/371)) ([6f1c14f](https://github.com/scaleapi/scale-agentex-python/commit/6f1c14fd61077da52038361642a9fbc4a0a56c8b)) + ## 0.11.5 (2026-05-29) Full Changelog: [v0.11.4...v0.11.5](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.4...v0.11.5) diff --git a/pyproject.toml b/pyproject.toml index 7cf27ff4e..3d669f0dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agentex-sdk" -version = "0.11.5" +version = "0.11.6" description = "The official Python library for the agentex API" dynamic = ["readme"] license = "Apache-2.0" diff --git a/src/agentex/_version.py b/src/agentex/_version.py index 6902d0b63..22dd4399f 100644 --- a/src/agentex/_version.py +++ b/src/agentex/_version.py @@ -1,4 +1,4 @@ # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. __title__ = "agentex" -__version__ = "0.11.5" # x-release-please-version +__version__ = "0.11.6" # x-release-please-version