Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 72 additions & 20 deletions src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import override
from __future__ import annotations

import asyncio
from typing import Optional, override

import scale_gp_beta.lib.tracing as tracing
from scale_gp_beta import SGPClient, AsyncSGPClient
Expand Down Expand Up @@ -90,27 +93,34 @@ def shutdown(self) -> None:


class SGPAsyncTracingProcessor(AsyncTracingProcessor):
"""Async tracing processor.

The HTTP client is lazy-initialized on the running event loop on first
use, and re-created when the loop changes. This avoids the "bound to a
different event loop" errors that the previous implementation worked
around by disabling HTTP keepalive (`max_keepalive_connections=0`),
which paid a TCP+TLS handshake on every span event. With per-loop
construction, the underlying httpx client can keep its default
connection pool, so repeated calls on the same loop reuse connections.

Note: span events are still upserted per-event in this version. Batching
is intentionally not part of this change so it can be reviewed
independently.
"""

def __init__(self, config: SGPTracingProcessorConfig):
self._config = config
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
self._spans: dict[str, SGPSpan] = {}
import httpx

# Disable keepalive so each HTTP call gets a fresh TCP connection,
# avoiding "bound to a different event loop" errors in sync-ACP.
self.sgp_async_client = (
AsyncSGPClient(
api_key=config.sgp_api_key,
account_id=config.sgp_account_id,
base_url=config.sgp_base_url,
http_client=httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=0),
),
)
if not self.disabled
else None
)
self.env_vars = EnvironmentVariables.refresh()

# Lazy-initialized on the running loop on first use.
self.sgp_async_client: Optional[AsyncSGPClient] = None
# Loop the *processor-owned* client was constructed on. Stays None
# when the client was injected externally (e.g. by a test); in that
# case we never replace it.
self._client_owned_at_loop: Optional[asyncio.AbstractEventLoop] = None

def _add_source_to_span(self, span: Span) -> None:
if span.data is None:
span.data = {}
Expand All @@ -123,6 +133,31 @@ def _add_source_to_span(self, span: Span) -> None:
if self.env_vars.AGENT_ID is not None:
span.data["__agent_id__"] = self.env_vars.AGENT_ID

def _ensure_client(self) -> None:
"""Initialize or recreate the AsyncSGPClient on the running loop.

Must be called from inside an async method so `get_running_loop()`
is safe.
"""
if self.disabled:
return
loop = asyncio.get_running_loop()
if self.sgp_async_client is None:
self.sgp_async_client = AsyncSGPClient(
api_key=self._config.sgp_api_key,
account_id=self._config.sgp_account_id,
base_url=self._config.sgp_base_url,
)
self._client_owned_at_loop = loop
elif self._client_owned_at_loop is not None and self._client_owned_at_loop is not loop:
# Owned client was bound to a now-stale loop. Replace it.
self.sgp_async_client = AsyncSGPClient(
api_key=self._config.sgp_api_key,
account_id=self._config.sgp_account_id,
base_url=self._config.sgp_base_url,
)
self._client_owned_at_loop = loop

@override
async def on_span_start(self, span: Span) -> None:
self._add_source_to_span(span)
Expand All @@ -141,6 +176,8 @@ async def on_span_start(self, span: Span) -> None:
if self.disabled:
logger.warning("SGP is disabled, skipping span upsert")
return

self._ensure_client()
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
items=[sgp_span.to_request_params()]
)
Expand All @@ -161,13 +198,28 @@ async def on_span_end(self, span: Span) -> None:

if self.disabled:
return

self._ensure_client()
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
items=[sgp_span.to_request_params()]
)

@override
async def shutdown(self) -> None:
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
items=[sgp_span.to_request_params() for sgp_span in self._spans.values()]
)
if self.disabled or self.sgp_async_client is None:
self._spans.clear()
return

items: list[dict] = []
for sgp_span in self._spans.values():
try:
items.append(sgp_span.to_request_params())
except Exception:
logger.exception("Failed to build span params during shutdown; dropping span")
self._spans.clear()

if items:
try:
await self.sgp_async_client.spans.upsert_batch(items=items) # type: ignore[arg-type]
except Exception:
logger.exception("Final span flush failed during shutdown")
111 changes: 100 additions & 11 deletions tests/lib/core/tracing/processors/test_sgp_tracing_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ def _make_processor():
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span())

with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
patch(f"{MODULE}.SGPClient"), \
patch(f"{MODULE}.tracing"), \
patch(f"{MODULE}.flush_queue"), \
patch(f"{MODULE}.create_span", mock_create_span):
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.SGPClient"), patch(
f"{MODULE}.tracing"
), patch(f"{MODULE}.flush_queue"), patch(f"{MODULE}.create_span", mock_create_span):
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
SGPSyncTracingProcessor,
)
Expand Down Expand Up @@ -113,19 +111,18 @@ def _make_processor():
mock_async_client = MagicMock()
mock_async_client.spans.upsert_batch = AsyncMock()

with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
patch(f"{MODULE}.create_span", mock_create_span), \
patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client):
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.create_span", mock_create_span), patch(
f"{MODULE}.AsyncSGPClient", return_value=mock_async_client
):
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
SGPAsyncTracingProcessor,
)

processor = SGPAsyncTracingProcessor(_make_config())

# Wire up the mock client after construction (constructor stores it)
# Wire up the mock client after construction. The processor lazy-inits
# its own client on first async call; pre-setting it bypasses that path.
processor.sgp_async_client = mock_async_client

# Keep create_span mock active for on_span_start calls
return processor, mock_create_span

async def test_spans_not_leaked_after_completed_lifecycle(self):
Expand Down Expand Up @@ -162,3 +159,95 @@ async def test_span_end_for_unknown_span_is_noop(self):
await processor.on_span_end(span)

assert len(processor._spans) == 0


# ---------------------------------------------------------------------------
# Async client lifecycle tests
#
# These cover the lazy / per-loop client construction and the dropped
# `max_keepalive_connections=0` workaround (PR A of the OVE-2 split).
# ---------------------------------------------------------------------------


class TestSGPAsyncTracingProcessorClientLifecycle:
async def test_client_constructed_without_disabling_keepalive(self):
"""The previous implementation built `AsyncSGPClient` with
`httpx.Limits(max_keepalive_connections=0)` to dodge cross-loop
errors, paying a TCP+TLS handshake on every span event. The lazy
per-loop pattern lets keepalive stay on."""
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
mock_client_cls.return_value = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))

from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
SGPAsyncTracingProcessor,
)

processor = SGPAsyncTracingProcessor(_make_config())
await processor.on_span_start(_make_span())

mock_client_cls.assert_called_once()
kwargs = mock_client_cls.call_args.kwargs
assert "http_client" not in kwargs, (
"AsyncSGPClient must not receive a custom http_client that disables keepalive"
)

async def test_owned_client_recreated_after_loop_swap(self):
"""When the running loop changes (sync-ACP / per-request loops),
the processor's owned client must be recreated so it isn't bound to
a dead loop. This is what lets us drop the keepalive workaround."""
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
first = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))
second = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))
mock_client_cls.side_effect = [first, second]

from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
SGPAsyncTracingProcessor,
)

processor = SGPAsyncTracingProcessor(_make_config())

await processor.on_span_start(_make_span())
assert processor.sgp_async_client is first
assert mock_client_cls.call_count == 1

# Simulate a loop swap: the processor's tracked loop is stale.
# The next call must recreate the client.
processor._client_owned_at_loop = MagicMock()

await processor.on_span_start(_make_span())
assert processor.sgp_async_client is second, "Owned client must be recreated after loop swap"
assert mock_client_cls.call_count == 2

async def test_injected_client_preserved(self):
"""A client assigned externally (test mock or caller-built) must
never be replaced by the processor. Contract:
`_client_owned_at_loop=None` marks externally-managed clients and
skips both the create-on-None branch and the replace-on-loop-change
branch, so the injected client is preserved across any number of
calls."""
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
injected = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))

with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
SGPAsyncTracingProcessor,
)

processor = SGPAsyncTracingProcessor(_make_config())
processor.sgp_async_client = injected

for _ in range(3):
await processor.on_span_start(_make_span())

assert processor.sgp_async_client is injected
assert mock_client_cls.call_count == 0, (
"Injected client must not be replaced (no AsyncSGPClient construction)"
)