Skip to content

Commit e83e080

Browse files
fix(tracing): re-enable HTTP keepalive in SGPAsyncTracingProcessor via lazy per-loop client
The previous implementation built AsyncSGPClient with `httpx.Limits(max_keepalive_connections=0)` to dodge "bound to a different event loop" errors that surfaced under sync-ACP / per-request event loops. Disabling keepalive paid a fresh TCP+TLS handshake on every span event. This change moves client construction out of __init__ and lazy-inits it on the running event loop on first use, with automatic recreation when the loop changes (tracked via `_client_owned_at_loop`). With the client always bound to the loop using it, httpx defaults can be left alone, so connection pooling and keepalive work normally. Behavior preserved: - Per-event upsert_batch(items=[one]) is unchanged in this PR. Batching is intentionally left for a follow-up so it can be reviewed independently. - Externally injected clients (`processor.sgp_async_client = X`) are always respected and never replaced; `_client_owned_at_loop` stays None for those. - Tests for memory-leak / span-lifecycle behavior pass unchanged. New tests: - test_client_constructed_without_disabling_keepalive — verifies AsyncSGPClient is no longer constructed with a custom http_client that disables keepalive. - test_owned_client_recreated_after_loop_swap — verifies that an owned client is replaced when the running loop changes. - test_injected_client_preserved — verifies that an externally assigned client is never auto-replaced. This is part 1 of the OVE-2 split. Part 2 (batching) builds on this.
1 parent 6509be1 commit e83e080

2 files changed

Lines changed: 172 additions & 31 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
from typing import override
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from typing import Optional, override
25

36
import scale_gp_beta.lib.tracing as tracing
47
from scale_gp_beta import SGPClient, AsyncSGPClient
@@ -90,27 +93,34 @@ def shutdown(self) -> None:
9093

9194

9295
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
96+
"""Async tracing processor.
97+
98+
The HTTP client is lazy-initialized on the running event loop on first
99+
use, and re-created when the loop changes. This avoids the "bound to a
100+
different event loop" errors that the previous implementation worked
101+
around by disabling HTTP keepalive (`max_keepalive_connections=0`),
102+
which paid a TCP+TLS handshake on every span event. With per-loop
103+
construction, the underlying httpx client can keep its default
104+
connection pool, so repeated calls on the same loop reuse connections.
105+
106+
Note: span events are still upserted per-event in this version. Batching
107+
is intentionally not part of this change so it can be reviewed
108+
independently.
109+
"""
110+
93111
def __init__(self, config: SGPTracingProcessorConfig):
112+
self._config = config
94113
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
95114
self._spans: dict[str, SGPSpan] = {}
96-
import httpx
97-
98-
# Disable keepalive so each HTTP call gets a fresh TCP connection,
99-
# avoiding "bound to a different event loop" errors in sync-ACP.
100-
self.sgp_async_client = (
101-
AsyncSGPClient(
102-
api_key=config.sgp_api_key,
103-
account_id=config.sgp_account_id,
104-
base_url=config.sgp_base_url,
105-
http_client=httpx.AsyncClient(
106-
limits=httpx.Limits(max_keepalive_connections=0),
107-
),
108-
)
109-
if not self.disabled
110-
else None
111-
)
112115
self.env_vars = EnvironmentVariables.refresh()
113116

117+
# Lazy-initialized on the running loop on first use.
118+
self.sgp_async_client: Optional[AsyncSGPClient] = None
119+
# Loop the *processor-owned* client was constructed on. Stays None
120+
# when the client was injected externally (e.g. by a test); in that
121+
# case we never replace it.
122+
self._client_owned_at_loop: Optional[asyncio.AbstractEventLoop] = None
123+
114124
def _add_source_to_span(self, span: Span) -> None:
115125
if span.data is None:
116126
span.data = {}
@@ -123,6 +133,31 @@ def _add_source_to_span(self, span: Span) -> None:
123133
if self.env_vars.AGENT_ID is not None:
124134
span.data["__agent_id__"] = self.env_vars.AGENT_ID
125135

136+
def _ensure_client(self) -> None:
137+
"""Initialize or recreate the AsyncSGPClient on the running loop.
138+
139+
Must be called from inside an async method so `get_running_loop()`
140+
is safe.
141+
"""
142+
if self.disabled:
143+
return
144+
loop = asyncio.get_running_loop()
145+
if self.sgp_async_client is None:
146+
self.sgp_async_client = AsyncSGPClient(
147+
api_key=self._config.sgp_api_key,
148+
account_id=self._config.sgp_account_id,
149+
base_url=self._config.sgp_base_url,
150+
)
151+
self._client_owned_at_loop = loop
152+
elif self._client_owned_at_loop is not None and self._client_owned_at_loop is not loop:
153+
# Owned client was bound to a now-stale loop. Replace it.
154+
self.sgp_async_client = AsyncSGPClient(
155+
api_key=self._config.sgp_api_key,
156+
account_id=self._config.sgp_account_id,
157+
base_url=self._config.sgp_base_url,
158+
)
159+
self._client_owned_at_loop = loop
160+
126161
@override
127162
async def on_span_start(self, span: Span) -> None:
128163
self._add_source_to_span(span)
@@ -141,6 +176,8 @@ async def on_span_start(self, span: Span) -> None:
141176
if self.disabled:
142177
logger.warning("SGP is disabled, skipping span upsert")
143178
return
179+
180+
self._ensure_client()
144181
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
145182
items=[sgp_span.to_request_params()]
146183
)
@@ -161,13 +198,28 @@ async def on_span_end(self, span: Span) -> None:
161198

162199
if self.disabled:
163200
return
201+
202+
self._ensure_client()
164203
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
165204
items=[sgp_span.to_request_params()]
166205
)
167206

168207
@override
169208
async def shutdown(self) -> None:
170-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
171-
items=[sgp_span.to_request_params() for sgp_span in self._spans.values()]
172-
)
209+
if self.disabled or self.sgp_async_client is None:
210+
self._spans.clear()
211+
return
212+
213+
items: list[dict] = []
214+
for sgp_span in self._spans.values():
215+
try:
216+
items.append(sgp_span.to_request_params())
217+
except Exception:
218+
logger.exception("Failed to build span params during shutdown; dropping span")
173219
self._spans.clear()
220+
221+
if items:
222+
try:
223+
await self.sgp_async_client.spans.upsert_batch(items=items) # type: ignore[arg-type]
224+
except Exception:
225+
logger.exception("Final span flush failed during shutdown")

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 100 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ def _make_processor():
4848
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
4949
mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span())
5050

51-
with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
52-
patch(f"{MODULE}.SGPClient"), \
53-
patch(f"{MODULE}.tracing"), \
54-
patch(f"{MODULE}.flush_queue"), \
55-
patch(f"{MODULE}.create_span", mock_create_span):
51+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.SGPClient"), patch(
52+
f"{MODULE}.tracing"
53+
), patch(f"{MODULE}.flush_queue"), patch(f"{MODULE}.create_span", mock_create_span):
5654
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
5755
SGPSyncTracingProcessor,
5856
)
@@ -113,19 +111,18 @@ def _make_processor():
113111
mock_async_client = MagicMock()
114112
mock_async_client.spans.upsert_batch = AsyncMock()
115113

116-
with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
117-
patch(f"{MODULE}.create_span", mock_create_span), \
118-
patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client):
114+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.create_span", mock_create_span), patch(
115+
f"{MODULE}.AsyncSGPClient", return_value=mock_async_client
116+
):
119117
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
120118
SGPAsyncTracingProcessor,
121119
)
122120

123121
processor = SGPAsyncTracingProcessor(_make_config())
124122

125-
# Wire up the mock client after construction (constructor stores it)
123+
# Wire up the mock client after construction. The processor lazy-inits
124+
# its own client on first async call; pre-setting it bypasses that path.
126125
processor.sgp_async_client = mock_async_client
127-
128-
# Keep create_span mock active for on_span_start calls
129126
return processor, mock_create_span
130127

131128
async def test_spans_not_leaked_after_completed_lifecycle(self):
@@ -162,3 +159,95 @@ async def test_span_end_for_unknown_span_is_noop(self):
162159
await processor.on_span_end(span)
163160

164161
assert len(processor._spans) == 0
162+
163+
164+
# ---------------------------------------------------------------------------
165+
# Async client lifecycle tests
166+
#
167+
# These cover the lazy / per-loop client construction and the dropped
168+
# `max_keepalive_connections=0` workaround (PR A of the OVE-2 split).
169+
# ---------------------------------------------------------------------------
170+
171+
172+
class TestSGPAsyncTracingProcessorClientLifecycle:
173+
async def test_client_constructed_without_disabling_keepalive(self):
174+
"""The previous implementation built `AsyncSGPClient` with
175+
`httpx.Limits(max_keepalive_connections=0)` to dodge cross-loop
176+
errors, paying a TCP+TLS handshake on every span event. The lazy
177+
per-loop pattern lets keepalive stay on."""
178+
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
179+
with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(
180+
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
181+
), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
182+
mock_client_cls.return_value = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))
183+
184+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
185+
SGPAsyncTracingProcessor,
186+
)
187+
188+
processor = SGPAsyncTracingProcessor(_make_config())
189+
await processor.on_span_start(_make_span())
190+
191+
mock_client_cls.assert_called_once()
192+
kwargs = mock_client_cls.call_args.kwargs
193+
assert "http_client" not in kwargs, (
194+
"AsyncSGPClient must not receive a custom http_client that disables keepalive"
195+
)
196+
197+
async def test_owned_client_recreated_after_loop_swap(self):
198+
"""When the running loop changes (sync-ACP / per-request loops),
199+
the processor's owned client must be recreated so it isn't bound to
200+
a dead loop. This is what lets us drop the keepalive workaround."""
201+
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
202+
with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(
203+
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
204+
), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
205+
first = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))
206+
second = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))
207+
mock_client_cls.side_effect = [first, second]
208+
209+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
210+
SGPAsyncTracingProcessor,
211+
)
212+
213+
processor = SGPAsyncTracingProcessor(_make_config())
214+
215+
await processor.on_span_start(_make_span())
216+
assert processor.sgp_async_client is first
217+
assert mock_client_cls.call_count == 1
218+
219+
# Simulate a loop swap: the processor's tracked loop is stale.
220+
# The next call must recreate the client.
221+
processor._client_owned_at_loop = MagicMock()
222+
223+
await processor.on_span_start(_make_span())
224+
assert processor.sgp_async_client is second, "Owned client must be recreated after loop swap"
225+
assert mock_client_cls.call_count == 2
226+
227+
async def test_injected_client_preserved(self):
228+
"""A client assigned externally (test mock or caller-built) must
229+
never be replaced by the processor. Contract:
230+
`_client_owned_at_loop=None` marks externally-managed clients and
231+
skips both the create-on-None branch and the replace-on-loop-change
232+
branch, so the injected client is preserved across any number of
233+
calls."""
234+
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
235+
injected = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))
236+
237+
with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(
238+
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
239+
), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
240+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
241+
SGPAsyncTracingProcessor,
242+
)
243+
244+
processor = SGPAsyncTracingProcessor(_make_config())
245+
processor.sgp_async_client = injected
246+
247+
for _ in range(3):
248+
await processor.on_span_start(_make_span())
249+
250+
assert processor.sgp_async_client is injected
251+
assert mock_client_cls.call_count == 0, (
252+
"Injected client must not be replaced (no AsyncSGPClient construction)"
253+
)

0 commit comments

Comments
 (0)