diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index 2f94e7f87..1aefadc2d 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -1,7 +1,11 @@ -from typing import override +from __future__ import annotations + +import asyncio +from typing import Optional, override import scale_gp_beta.lib.tracing as tracing from scale_gp_beta import SGPClient, AsyncSGPClient +from scale_gp_beta._exceptions import APIError from scale_gp_beta.lib.tracing import create_span, flush_queue from scale_gp_beta.lib.tracing.span import Span as SGPSpan @@ -17,6 +21,19 @@ logger = make_logger(__name__) +# Mirrored from scale_gp_beta.lib.tracing.trace_queue_manager defaults so the +# async processor batches and retries the same way the sync (daemon-thread) +# path does. +DEFAULT_MAX_QUEUE_SIZE = 4_000 +DEFAULT_TRIGGER_QUEUE_SIZE = 200 +DEFAULT_TRIGGER_CADENCE = 4.0 +DEFAULT_MAX_BATCH_SIZE = 50 +DEFAULT_RETRIES = 4 +INITIAL_BACKOFF = 0.4 +MAX_BACKOFF = 20.0 +SHUTDOWN_DRAIN_TIMEOUT = 10.0 + + def _get_span_type(span: Span) -> str: """Read span_type from span.data['__span_type__'], defaulting to STANDALONE.""" if isinstance(span.data, dict): @@ -90,27 +107,42 @@ def shutdown(self) -> None: class SGPAsyncTracingProcessor(AsyncTracingProcessor): + """Async tracing processor that buffers spans and flushes them in batches. + + Mirrors the buffer-plus-flush behavior of the SDK's synchronous + `TraceQueueManager`, but uses asyncio primitives so it works inside an + asyncio event loop without blocking it. + + The HTTP client, queue, and worker task are lazy-initialized on the + running event loop the first time a span event is recorded. This avoids + the "bound to a different event loop" errors that occur when the + processor is constructed on one loop but used on another (e.g. a worker + that creates a fresh loop per request) and lets us re-enable HTTP + keepalive on the underlying httpx client without paying a TCP+TLS + handshake on every span event. + """ + def __init__(self, config: SGPTracingProcessorConfig): + self._config = config self.disabled = config.sgp_api_key == "" or config.sgp_account_id == "" self._spans: dict[str, SGPSpan] = {} - import httpx - - # Disable keepalive so each HTTP call gets a fresh TCP connection, - # avoiding "bound to a different event loop" errors in sync-ACP. - self.sgp_async_client = ( - AsyncSGPClient( - api_key=config.sgp_api_key, - account_id=config.sgp_account_id, - base_url=config.sgp_base_url, - http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), - ), - ) - if not self.disabled - else None - ) self.env_vars = EnvironmentVariables.refresh() + # Lazy-initialized on the running loop on first use. Holding these + # as attributes (rather than constructing eagerly in __init__) is + # what lets the processor survive the loop on which it was created + # being replaced — a common pattern in sync-ACP / per-request loops. + self._loop: Optional[asyncio.AbstractEventLoop] = None + self.sgp_async_client: Optional[AsyncSGPClient] = None + # Loop the *processor-owned* client was constructed on. Remains + # None when the client was injected externally (e.g. by a test); + # in that case we never replace it. + self._client_owned_at_loop: Optional[asyncio.AbstractEventLoop] = None + self._queue: Optional[asyncio.Queue[SGPSpan]] = None + self._worker: Optional[asyncio.Task[None]] = None + self._shutdown_event: Optional[asyncio.Event] = None + self._flush_event: Optional[asyncio.Event] = None + def _add_source_to_span(self, span: Span) -> None: if span.data is None: span.data = {} @@ -123,9 +155,49 @@ def _add_source_to_span(self, span: Span) -> None: if self.env_vars.AGENT_ID is not None: span.data["__agent_id__"] = self.env_vars.AGENT_ID + def _ensure_started(self) -> None: + """Initialize per-loop state on first use, or after a loop swap. + + Must be called from inside an async method so `get_running_loop()` + is safe. + """ + if self.disabled: + return + loop = asyncio.get_running_loop() + if self._loop is loop and self._worker is not None and not self._worker.done(): + return + + self._loop = loop + # We construct an httpx-backed client lazily on the running loop so + # connection pooling and keepalive can be left at httpx defaults + # without hitting "bound to a different event loop" errors when the + # processor outlives its original loop. An externally injected + # client (e.g. a test mock) is left alone — _client_owned_at_loop + # stays None for those. + if self.sgp_async_client is None: + self.sgp_async_client = AsyncSGPClient( + api_key=self._config.sgp_api_key, + account_id=self._config.sgp_account_id, + base_url=self._config.sgp_base_url, + ) + self._client_owned_at_loop = loop + elif self._client_owned_at_loop is not None and self._client_owned_at_loop is not loop: + # We previously created a client on a now-stale loop. Replace it. + self.sgp_async_client = AsyncSGPClient( + api_key=self._config.sgp_api_key, + account_id=self._config.sgp_account_id, + base_url=self._config.sgp_base_url, + ) + self._client_owned_at_loop = loop + self._queue = asyncio.Queue(maxsize=DEFAULT_MAX_QUEUE_SIZE) + self._shutdown_event = asyncio.Event() + self._flush_event = asyncio.Event() + self._worker = loop.create_task(self._run()) + @override async def on_span_start(self, span: Span) -> None: self._add_source_to_span(span) + sgp_span = create_span( name=span.name, span_type=_get_span_type(span), @@ -137,15 +209,13 @@ async def on_span_start(self, span: Span) -> None: metadata=span.data, ) sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr] + self._spans[span.id] = sgp_span if self.disabled: - logger.warning("SGP is disabled, skipping span upsert") return - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[sgp_span.to_request_params()] - ) - self._spans[span.id] = sgp_span + self._ensure_started() + self._enqueue(sgp_span) @override async def on_span_end(self, span: Span) -> None: @@ -161,13 +231,118 @@ async def on_span_end(self, span: Span) -> None: if self.disabled: return - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[sgp_span.to_request_params()] - ) + + self._ensure_started() + self._enqueue(sgp_span) @override async def shutdown(self) -> None: - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[sgp_span.to_request_params() for sgp_span in self._spans.values()] - ) + # Fast path when the processor was never started (disabled, or + # shutdown called before any span event). Avoid spinning up a + # worker just to tear it down. + if self._worker is None: + self._spans.clear() + return + + # Re-enqueue any spans whose end was never recorded so they aren't + # silently lost. They were already enqueued at start, but on_span_end + # is what mutates output / metadata / end_timestamp; without a + # second enqueue, the server only sees the start payload for them. + for sgp_span in list(self._spans.values()): + self._enqueue(sgp_span) self._spans.clear() + + assert self._shutdown_event is not None + self._shutdown_event.set() + if self._flush_event is not None: + self._flush_event.set() + + try: + await asyncio.wait_for(self._worker, timeout=SHUTDOWN_DRAIN_TIMEOUT) + except asyncio.TimeoutError: + logger.warning(f"Async tracing worker did not exit within {SHUTDOWN_DRAIN_TIMEOUT}s; cancelling") + self._worker.cancel() + + def _enqueue(self, sgp_span: SGPSpan) -> None: + if self._queue is None: + return + try: + self._queue.put_nowait(sgp_span) + except asyncio.QueueFull: + logger.warning(f"Tracing queue full; dropping span {sgp_span.span_id}") + return + if self._flush_event is not None and self._queue.qsize() >= DEFAULT_TRIGGER_QUEUE_SIZE: + self._flush_event.set() + + async def _run(self) -> None: + try: + while not (self._shutdown_event and self._shutdown_event.is_set()): + # Wake on either an early-flush signal or the cadence timer. + assert self._flush_event is not None + try: + await asyncio.wait_for(self._flush_event.wait(), timeout=DEFAULT_TRIGGER_CADENCE) + except asyncio.TimeoutError: + pass + self._flush_event.clear() + # Per-iteration guard: an unexpected error during one drain + # must not kill the worker, otherwise queued items stay + # unflushed until shutdown. + try: + await self._drain() + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Tracing worker iteration failed; continuing") + + # Final drain on shutdown. + try: + await self._drain() + except Exception: + logger.exception("Final tracing drain failed; some spans may be lost") + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Async tracing worker crashed") + + async def _drain(self) -> None: + if self._queue is None or self.sgp_async_client is None: + return + while not self._queue.empty(): + batch: list[dict] = [] + while len(batch) < DEFAULT_MAX_BATCH_SIZE and not self._queue.empty(): + try: + sgp_span = self._queue.get_nowait() + except asyncio.QueueEmpty: + break + try: + batch.append(sgp_span.to_request_params()) + except Exception: + logger.exception("Failed to build span params; dropping span") + if not batch: + continue + await self._upsert_with_retry(batch) + + async def _upsert_with_retry(self, batch: list[dict]) -> None: + if self.sgp_async_client is None: + return + backoff = INITIAL_BACKOFF + for attempt in range(DEFAULT_RETRIES): + try: + await self.sgp_async_client.spans.upsert_batch(items=batch) # type: ignore[arg-type] + return + except APIError as exc: + if attempt == DEFAULT_RETRIES - 1: + logger.error(f"Failed to export {len(batch)} spans after {DEFAULT_RETRIES} attempts: {exc.message}") + return + logger.warning(f"Span export failed ({exc.message}); retrying in {backoff:.1f}s") + await asyncio.sleep(backoff) + backoff = min(backoff * 2, MAX_BACKOFF) + except asyncio.CancelledError: + raise + except Exception: + # Unexpected error (not APIError, not cancellation): log and + # drop the batch. We deliberately do not retry because we + # don't know whether the request reached the server, and + # the SDK already surfaces transport failures as APIError. + logger.exception(f"Unexpected error exporting {len(batch)} spans; dropping batch") + return diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 1acafa527..cc10e865b 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -48,11 +48,9 @@ def _make_processor(): mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span()) - with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ - patch(f"{MODULE}.SGPClient"), \ - patch(f"{MODULE}.tracing"), \ - patch(f"{MODULE}.flush_queue"), \ - patch(f"{MODULE}.create_span", mock_create_span): + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.SGPClient"), patch( + f"{MODULE}.tracing" + ), patch(f"{MODULE}.flush_queue"), patch(f"{MODULE}.create_span", mock_create_span): from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( SGPSyncTracingProcessor, ) @@ -113,9 +111,9 @@ def _make_processor(): mock_async_client = MagicMock() mock_async_client.spans.upsert_batch = AsyncMock() - with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ - patch(f"{MODULE}.create_span", mock_create_span), \ - patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client): + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.create_span", mock_create_span), patch( + f"{MODULE}.AsyncSGPClient", return_value=mock_async_client + ): from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( SGPAsyncTracingProcessor, ) @@ -162,3 +160,155 @@ async def test_span_end_for_unknown_span_is_noop(self): await processor.on_span_end(span) assert len(processor._spans) == 0 + + +# --------------------------------------------------------------------------- +# Async processor batching tests (regression for OVE-2) +# +# Previously, on_span_start and on_span_end each issued an awaited +# upsert_batch(items=[one]) call on the agent's hot path with HTTP keepalive +# disabled. The processor now buffers events and flushes them in batches +# from a background asyncio.Task, mirroring the SDK's TraceQueueManager. +# --------------------------------------------------------------------------- + + +class TestSGPAsyncTracingProcessorBatching: + @staticmethod + def _make_processor(): + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + mock_async_client = MagicMock() + mock_async_client.spans.upsert_batch = AsyncMock() + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch( + f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span() + ), patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + processor.sgp_async_client = mock_async_client + return processor, mock_async_client + + async def test_span_event_does_not_trigger_immediate_upsert(self): + """Regression: a single span event must not result in an upsert call + on the hot path. Events must be enqueued and flushed by the worker.""" + processor, client = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + span = _make_span() + await processor.on_span_start(span) + + assert client.spans.upsert_batch.call_count == 0, "on_span_start should enqueue, not trigger a network call" + + async def test_shutdown_flushes_queued_spans_in_one_batch(self): + """Many span events should be coalesced into a single upsert_batch + call when the buffer fits under MAX_BATCH_SIZE (50).""" + processor, client = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + for _ in range(5): + span = _make_span() + await processor.on_span_start(span) + span.end_time = datetime.now(UTC) + await processor.on_span_end(span) + + await processor.shutdown() + + assert client.spans.upsert_batch.call_count == 1, ( + f"Expected a single batched upsert, got {client.spans.upsert_batch.call_count}" + ) + items = client.spans.upsert_batch.call_args.kwargs["items"] + # 5 starts + 5 ends = 10 enqueued items, well under MAX_BATCH_SIZE. + assert len(items) == 10, f"Expected 10 items in the batch, got {len(items)}" + + async def test_owned_client_recreated_after_loop_swap(self): + """When the running loop changes (sync-ACP / per-request loops), + the processor's owned client must be recreated so it isn't bound to + a dead loop. This is the reason the original implementation disabled + keepalive — re-creating the client on the new loop lets us keep + keepalive on instead. + """ + env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None))) + with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch( + f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span() + ), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls: + first = MagicMock(spans=MagicMock(upsert_batch=AsyncMock())) + second = MagicMock(spans=MagicMock(upsert_batch=AsyncMock())) + mock_client_cls.side_effect = [first, second] + + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + await processor.on_span_start(_make_span()) + assert processor.sgp_async_client is first + assert mock_client_cls.call_count == 1 + + # Simulate a loop swap: processor's tracked loop is stale and + # the worker is gone. Re-initialization must recreate the + # owned client (since `_client_owned_at_loop` no longer matches + # the running loop). + stale_loop = MagicMock() + processor._loop = stale_loop + processor._client_owned_at_loop = stale_loop + processor._worker = None + + await processor.on_span_start(_make_span()) + assert processor.sgp_async_client is second, "Owned client must be recreated after loop swap" + assert mock_client_cls.call_count == 2 + + await processor.shutdown() + + async def test_injected_client_preserved_across_reinit(self): + """A client assigned externally (e.g. a test mock or a caller-built + client) must not be replaced by the processor, even on simulated + loop swaps.""" + processor, original_client = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + await processor.on_span_start(_make_span()) + assert processor.sgp_async_client is original_client + + # Simulate a loop swap. Because the client was injected + # (`_client_owned_at_loop` stays None), it must be preserved. + processor._loop = MagicMock() + processor._worker = None + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + await processor.on_span_start(_make_span()) + assert processor.sgp_async_client is original_client, "Injected client must not be replaced" + + await processor.shutdown() + + async def test_async_client_constructed_without_disabling_keepalive(self): + """Regression: the previous implementation built AsyncSGPClient with + httpx.Limits(max_keepalive_connections=0) to dodge cross-loop errors, + paying a TCP+TLS handshake on every span event. The lazy-init pattern + binds the client to the running loop, so keepalive can stay on.""" + with patch( + f"{MODULE}.EnvironmentVariables", + MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None))), + ), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls: + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + await processor.on_span_start(_make_span()) + + mock_client_cls.assert_called_once() + kwargs = mock_client_cls.call_args.kwargs + # The fix: do not pass an http_client overriding keepalive. + assert "http_client" not in kwargs, ( + "AsyncSGPClient should not receive a custom http_client that disables keepalive" + ) + # Cleanup: cancel the worker so we don't leak a task across tests. + await processor.shutdown()