diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index 2f94e7f87..1f7450881 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -1,4 +1,7 @@ -from typing import override +from __future__ import annotations + +import asyncio +from typing import Optional, override import scale_gp_beta.lib.tracing as tracing from scale_gp_beta import SGPClient, AsyncSGPClient @@ -90,27 +93,34 @@ def shutdown(self) -> None: class SGPAsyncTracingProcessor(AsyncTracingProcessor): + """Async tracing processor. + + The HTTP client is lazy-initialized on the running event loop on first + use, and re-created when the loop changes. This avoids the "bound to a + different event loop" errors that the previous implementation worked + around by disabling HTTP keepalive (`max_keepalive_connections=0`), + which paid a TCP+TLS handshake on every span event. With per-loop + construction, the underlying httpx client can keep its default + connection pool, so repeated calls on the same loop reuse connections. + + Note: span events are still upserted per-event in this version. Batching + is intentionally not part of this change so it can be reviewed + independently. + """ + def __init__(self, config: SGPTracingProcessorConfig): + self._config = config self.disabled = config.sgp_api_key == "" or config.sgp_account_id == "" self._spans: dict[str, SGPSpan] = {} - import httpx - - # Disable keepalive so each HTTP call gets a fresh TCP connection, - # avoiding "bound to a different event loop" errors in sync-ACP. - self.sgp_async_client = ( - AsyncSGPClient( - api_key=config.sgp_api_key, - account_id=config.sgp_account_id, - base_url=config.sgp_base_url, - http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), - ), - ) - if not self.disabled - else None - ) self.env_vars = EnvironmentVariables.refresh() + # Lazy-initialized on the running loop on first use. + self.sgp_async_client: Optional[AsyncSGPClient] = None + # Loop the *processor-owned* client was constructed on. Stays None + # when the client was injected externally (e.g. by a test); in that + # case we never replace it. + self._client_owned_at_loop: Optional[asyncio.AbstractEventLoop] = None + def _add_source_to_span(self, span: Span) -> None: if span.data is None: span.data = {} @@ -123,6 +133,31 @@ def _add_source_to_span(self, span: Span) -> None: if self.env_vars.AGENT_ID is not None: span.data["__agent_id__"] = self.env_vars.AGENT_ID + def _ensure_client(self) -> None: + """Initialize or recreate the AsyncSGPClient on the running loop. + + Must be called from inside an async method so `get_running_loop()` + is safe. + """ + if self.disabled: + return + loop = asyncio.get_running_loop() + if self.sgp_async_client is None: + self.sgp_async_client = AsyncSGPClient( + api_key=self._config.sgp_api_key, + account_id=self._config.sgp_account_id, + base_url=self._config.sgp_base_url, + ) + self._client_owned_at_loop = loop + elif self._client_owned_at_loop is not None and self._client_owned_at_loop is not loop: + # Owned client was bound to a now-stale loop. Replace it. + self.sgp_async_client = AsyncSGPClient( + api_key=self._config.sgp_api_key, + account_id=self._config.sgp_account_id, + base_url=self._config.sgp_base_url, + ) + self._client_owned_at_loop = loop + @override async def on_span_start(self, span: Span) -> None: self._add_source_to_span(span) @@ -141,6 +176,8 @@ async def on_span_start(self, span: Span) -> None: if self.disabled: logger.warning("SGP is disabled, skipping span upsert") return + + self._ensure_client() await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] items=[sgp_span.to_request_params()] ) @@ -161,13 +198,28 @@ async def on_span_end(self, span: Span) -> None: if self.disabled: return + + self._ensure_client() await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] items=[sgp_span.to_request_params()] ) @override async def shutdown(self) -> None: - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[sgp_span.to_request_params() for sgp_span in self._spans.values()] - ) + if self.disabled or self.sgp_async_client is None: + self._spans.clear() + return + + items: list[dict] = [] + for sgp_span in self._spans.values(): + try: + items.append(sgp_span.to_request_params()) + except Exception: + logger.exception("Failed to build span params during shutdown; dropping span") self._spans.clear() + + if items: + try: + await self.sgp_async_client.spans.upsert_batch(items=items) # type: ignore[arg-type] + except Exception: + logger.exception("Final span flush failed during shutdown") diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 1acafa527..6b738c349 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -48,11 +48,9 @@ def _make_processor(): mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span()) - with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ - patch(f"{MODULE}.SGPClient"), \ - patch(f"{MODULE}.tracing"), \ - patch(f"{MODULE}.flush_queue"), \ - patch(f"{MODULE}.create_span", mock_create_span): + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.SGPClient"), patch( + f"{MODULE}.tracing" + ), patch(f"{MODULE}.flush_queue"), patch(f"{MODULE}.create_span", mock_create_span): from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( SGPSyncTracingProcessor, ) @@ -113,19 +111,18 @@ def _make_processor(): mock_async_client = MagicMock() mock_async_client.spans.upsert_batch = AsyncMock() - with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ - patch(f"{MODULE}.create_span", mock_create_span), \ - patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client): + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.create_span", mock_create_span), patch( + f"{MODULE}.AsyncSGPClient", return_value=mock_async_client + ): from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( SGPAsyncTracingProcessor, ) processor = SGPAsyncTracingProcessor(_make_config()) - # Wire up the mock client after construction (constructor stores it) + # Wire up the mock client after construction. The processor lazy-inits + # its own client on first async call; pre-setting it bypasses that path. processor.sgp_async_client = mock_async_client - - # Keep create_span mock active for on_span_start calls return processor, mock_create_span async def test_spans_not_leaked_after_completed_lifecycle(self): @@ -162,3 +159,95 @@ async def test_span_end_for_unknown_span_is_noop(self): await processor.on_span_end(span) assert len(processor._spans) == 0 + + +# --------------------------------------------------------------------------- +# Async client lifecycle tests +# +# These cover the lazy / per-loop client construction and the dropped +# `max_keepalive_connections=0` workaround (PR A of the OVE-2 split). +# --------------------------------------------------------------------------- + + +class TestSGPAsyncTracingProcessorClientLifecycle: + async def test_client_constructed_without_disabling_keepalive(self): + """The previous implementation built `AsyncSGPClient` with + `httpx.Limits(max_keepalive_connections=0)` to dodge cross-loop + errors, paying a TCP+TLS handshake on every span event. The lazy + per-loop pattern lets keepalive stay on.""" + env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None))) + with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch( + f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span() + ), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls: + mock_client_cls.return_value = MagicMock(spans=MagicMock(upsert_batch=AsyncMock())) + + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + await processor.on_span_start(_make_span()) + + mock_client_cls.assert_called_once() + kwargs = mock_client_cls.call_args.kwargs + assert "http_client" not in kwargs, ( + "AsyncSGPClient must not receive a custom http_client that disables keepalive" + ) + + async def test_owned_client_recreated_after_loop_swap(self): + """When the running loop changes (sync-ACP / per-request loops), + the processor's owned client must be recreated so it isn't bound to + a dead loop. This is what lets us drop the keepalive workaround.""" + env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None))) + with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch( + f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span() + ), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls: + first = MagicMock(spans=MagicMock(upsert_batch=AsyncMock())) + second = MagicMock(spans=MagicMock(upsert_batch=AsyncMock())) + mock_client_cls.side_effect = [first, second] + + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + await processor.on_span_start(_make_span()) + assert processor.sgp_async_client is first + assert mock_client_cls.call_count == 1 + + # Simulate a loop swap: the processor's tracked loop is stale. + # The next call must recreate the client. + processor._client_owned_at_loop = MagicMock() + + await processor.on_span_start(_make_span()) + assert processor.sgp_async_client is second, "Owned client must be recreated after loop swap" + assert mock_client_cls.call_count == 2 + + async def test_injected_client_preserved(self): + """A client assigned externally (test mock or caller-built) must + never be replaced by the processor. Contract: + `_client_owned_at_loop=None` marks externally-managed clients and + skips both the create-on-None branch and the replace-on-loop-change + branch, so the injected client is preserved across any number of + calls.""" + env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None))) + injected = MagicMock(spans=MagicMock(upsert_batch=AsyncMock())) + + with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch( + f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span() + ), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls: + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + processor.sgp_async_client = injected + + for _ in range(3): + await processor.on_span_start(_make_span()) + + assert processor.sgp_async_client is injected + assert mock_client_cls.call_count == 0, ( + "Injected client must not be replaced (no AsyncSGPClient construction)" + )