Skip to content

Commit dc129ef

Browse files
fix(tracing): batch span events and re-enable HTTP keepalive in SGPAsyncTracingProcessor
The async processor previously called `await client.spans.upsert_batch(items=[one_span])` on every `on_span_start` and every `on_span_end`, awaited synchronously on the agent's hot path, with `httpx.Limits(max_keepalive_connections=0)` to dodge "bound to a different event loop" errors. Per span you got 2 HTTP requests, each with full TCP+TLS handshake. Surfaces as start-span timeouts under load. This change mirrors the SDK's TraceQueueManager in asyncio: - asyncio.Queue (max 4000) plus an asyncio.Task worker that drains in batches of up to 50, every 4s or when 200+ events queued. - Retry policy mirrors the SDK: 4 attempts with exponential backoff (0.4s -> 20s capped). Unexpected exceptions are dropped, not retried. - Per-iteration try/except in the worker so one bad batch doesn't break subsequent flushes. - HTTP client, queue, and worker are lazy-initialized on the running event loop on first use, with automatic recreation when the loop changes (tracked via _client_owned_at_loop). With the client always bound to the loop using it, httpx defaults can stay, so connection pooling and keepalive work normally; max_keepalive_connections=0 workaround is removed. - shutdown() drains the queue, signals the worker, joins with a 10s timeout. Spans whose end was never recorded are re-enqueued so they aren't silently lost. Behavior preserved: - on_span_start / on_span_end / shutdown signatures unchanged. - _spans dict still tracks SGPSpan objects between start and end. - Externally injected sgp_async_client is still respected. Closes OVE-3 and OVE-4.
1 parent ed6fd5e commit dc129ef

2 files changed

Lines changed: 364 additions & 61 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 203 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
from typing import override
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from typing import Optional, override
25

36
import scale_gp_beta.lib.tracing as tracing
47
from scale_gp_beta import SGPClient, AsyncSGPClient
8+
from scale_gp_beta._exceptions import APIError
59
from scale_gp_beta.lib.tracing import create_span, flush_queue
610
from scale_gp_beta.lib.tracing.span import Span as SGPSpan
711

@@ -17,6 +21,19 @@
1721
logger = make_logger(__name__)
1822

1923

24+
# Mirrored from scale_gp_beta.lib.tracing.trace_queue_manager defaults so the
25+
# async processor batches and retries the same way the sync (daemon-thread)
26+
# path does.
27+
DEFAULT_MAX_QUEUE_SIZE = 4_000
28+
DEFAULT_TRIGGER_QUEUE_SIZE = 200
29+
DEFAULT_TRIGGER_CADENCE = 4.0
30+
DEFAULT_MAX_BATCH_SIZE = 50
31+
DEFAULT_RETRIES = 4
32+
INITIAL_BACKOFF = 0.4
33+
MAX_BACKOFF = 20.0
34+
SHUTDOWN_DRAIN_TIMEOUT = 10.0
35+
36+
2037
def _get_span_type(span: Span) -> str:
2138
"""Read span_type from span.data['__span_type__'], defaulting to STANDALONE."""
2239
if isinstance(span.data, dict):
@@ -90,27 +107,42 @@ def shutdown(self) -> None:
90107

91108

92109
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
110+
"""Async tracing processor that buffers spans and flushes them in batches.
111+
112+
Mirrors the buffer-plus-flush behavior of the SDK's synchronous
113+
`TraceQueueManager`, but uses asyncio primitives so it works inside an
114+
asyncio event loop without blocking it.
115+
116+
The HTTP client, queue, and worker task are lazy-initialized on the
117+
running event loop the first time a span event is recorded. This avoids
118+
the "bound to a different event loop" errors that occur when the
119+
processor is constructed on one loop but used on another (e.g. a worker
120+
that creates a fresh loop per request) and lets us re-enable HTTP
121+
keepalive on the underlying httpx client without paying a TCP+TLS
122+
handshake on every span event.
123+
"""
124+
93125
def __init__(self, config: SGPTracingProcessorConfig):
126+
self._config = config
94127
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
95128
self._spans: dict[str, SGPSpan] = {}
96-
import httpx
97-
98-
# Disable keepalive so each HTTP call gets a fresh TCP connection,
99-
# avoiding "bound to a different event loop" errors in sync-ACP.
100-
self.sgp_async_client = (
101-
AsyncSGPClient(
102-
api_key=config.sgp_api_key,
103-
account_id=config.sgp_account_id,
104-
base_url=config.sgp_base_url,
105-
http_client=httpx.AsyncClient(
106-
limits=httpx.Limits(max_keepalive_connections=0),
107-
),
108-
)
109-
if not self.disabled
110-
else None
111-
)
112129
self.env_vars = EnvironmentVariables.refresh()
113130

131+
# Lazy-initialized on the running loop on first use. Holding these
132+
# as attributes (rather than constructing eagerly in __init__) is
133+
# what lets the processor survive the loop on which it was created
134+
# being replaced — a common pattern in sync-ACP / per-request loops.
135+
self._loop: Optional[asyncio.AbstractEventLoop] = None
136+
self.sgp_async_client: Optional[AsyncSGPClient] = None
137+
# Loop the *processor-owned* client was constructed on. Remains
138+
# None when the client was injected externally (e.g. by a test);
139+
# in that case we never replace it.
140+
self._client_owned_at_loop: Optional[asyncio.AbstractEventLoop] = None
141+
self._queue: Optional[asyncio.Queue[SGPSpan]] = None
142+
self._worker: Optional[asyncio.Task[None]] = None
143+
self._shutdown_event: Optional[asyncio.Event] = None
144+
self._flush_event: Optional[asyncio.Event] = None
145+
114146
def _add_source_to_span(self, span: Span) -> None:
115147
if span.data is None:
116148
span.data = {}
@@ -123,9 +155,49 @@ def _add_source_to_span(self, span: Span) -> None:
123155
if self.env_vars.AGENT_ID is not None:
124156
span.data["__agent_id__"] = self.env_vars.AGENT_ID
125157

158+
def _ensure_started(self) -> None:
159+
"""Initialize per-loop state on first use, or after a loop swap.
160+
161+
Must be called from inside an async method so `get_running_loop()`
162+
is safe.
163+
"""
164+
if self.disabled:
165+
return
166+
loop = asyncio.get_running_loop()
167+
if self._loop is loop and self._worker is not None and not self._worker.done():
168+
return
169+
170+
self._loop = loop
171+
# We construct an httpx-backed client lazily on the running loop so
172+
# connection pooling and keepalive can be left at httpx defaults
173+
# without hitting "bound to a different event loop" errors when the
174+
# processor outlives its original loop. An externally injected
175+
# client (e.g. a test mock) is left alone — _client_owned_at_loop
176+
# stays None for those.
177+
if self.sgp_async_client is None:
178+
self.sgp_async_client = AsyncSGPClient(
179+
api_key=self._config.sgp_api_key,
180+
account_id=self._config.sgp_account_id,
181+
base_url=self._config.sgp_base_url,
182+
)
183+
self._client_owned_at_loop = loop
184+
elif self._client_owned_at_loop is not None and self._client_owned_at_loop is not loop:
185+
# We previously created a client on a now-stale loop. Replace it.
186+
self.sgp_async_client = AsyncSGPClient(
187+
api_key=self._config.sgp_api_key,
188+
account_id=self._config.sgp_account_id,
189+
base_url=self._config.sgp_base_url,
190+
)
191+
self._client_owned_at_loop = loop
192+
self._queue = asyncio.Queue(maxsize=DEFAULT_MAX_QUEUE_SIZE)
193+
self._shutdown_event = asyncio.Event()
194+
self._flush_event = asyncio.Event()
195+
self._worker = loop.create_task(self._run())
196+
126197
@override
127198
async def on_span_start(self, span: Span) -> None:
128199
self._add_source_to_span(span)
200+
129201
sgp_span = create_span(
130202
name=span.name,
131203
span_type=_get_span_type(span),
@@ -137,18 +209,13 @@ async def on_span_start(self, span: Span) -> None:
137209
metadata=span.data,
138210
)
139211
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
212+
self._spans[span.id] = sgp_span
140213

141214
if self.disabled:
142-
logger.warning("SGP is disabled, skipping span upsert")
143215
return
144-
# TODO(AGX1-198): Batch multiple spans into a single upsert_batch call
145-
# instead of one span per HTTP request.
146-
# https://linear.app/scale-epd/issue/AGX1-198/actually-use-sgp-batching-for-spans
147-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
148-
items=[sgp_span.to_request_params()]
149-
)
150216

151-
self._spans[span.id] = sgp_span
217+
self._ensure_started()
218+
self._enqueue(sgp_span)
152219

153220
@override
154221
async def on_span_end(self, span: Span) -> None:
@@ -158,20 +225,124 @@ async def on_span_end(self, span: Span) -> None:
158225
return
159226

160227
self._add_source_to_span(span)
161-
sgp_span.input = span.input # type: ignore[assignment]
162228
sgp_span.output = span.output # type: ignore[assignment]
163229
sgp_span.metadata = span.data # type: ignore[assignment]
164230
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
165231

166232
if self.disabled:
167233
return
168-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
169-
items=[sgp_span.to_request_params()]
170-
)
234+
235+
self._ensure_started()
236+
self._enqueue(sgp_span)
171237

172238
@override
173239
async def shutdown(self) -> None:
174-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
175-
items=[sgp_span.to_request_params() for sgp_span in self._spans.values()]
176-
)
240+
# Fast path when the processor was never started (disabled, or
241+
# shutdown called before any span event). Avoid spinning up a
242+
# worker just to tear it down.
243+
if self._worker is None:
244+
self._spans.clear()
245+
return
246+
247+
# Re-enqueue any spans whose end was never recorded so they aren't
248+
# silently lost. They were already enqueued at start, but on_span_end
249+
# is what mutates output / metadata / end_timestamp; without a
250+
# second enqueue, the server only sees the start payload for them.
251+
for sgp_span in list(self._spans.values()):
252+
self._enqueue(sgp_span)
177253
self._spans.clear()
254+
255+
assert self._shutdown_event is not None
256+
self._shutdown_event.set()
257+
if self._flush_event is not None:
258+
self._flush_event.set()
259+
260+
try:
261+
await asyncio.wait_for(self._worker, timeout=SHUTDOWN_DRAIN_TIMEOUT)
262+
except asyncio.TimeoutError:
263+
logger.warning(f"Async tracing worker did not exit within {SHUTDOWN_DRAIN_TIMEOUT}s; cancelling")
264+
self._worker.cancel()
265+
266+
def _enqueue(self, sgp_span: SGPSpan) -> None:
267+
if self._queue is None:
268+
return
269+
try:
270+
self._queue.put_nowait(sgp_span)
271+
except asyncio.QueueFull:
272+
logger.warning(f"Tracing queue full; dropping span {sgp_span.span_id}")
273+
return
274+
if self._flush_event is not None and self._queue.qsize() >= DEFAULT_TRIGGER_QUEUE_SIZE:
275+
self._flush_event.set()
276+
277+
async def _run(self) -> None:
278+
try:
279+
while not (self._shutdown_event and self._shutdown_event.is_set()):
280+
# Wake on either an early-flush signal or the cadence timer.
281+
assert self._flush_event is not None
282+
try:
283+
await asyncio.wait_for(self._flush_event.wait(), timeout=DEFAULT_TRIGGER_CADENCE)
284+
except asyncio.TimeoutError:
285+
pass
286+
self._flush_event.clear()
287+
# Per-iteration guard: an unexpected error during one drain
288+
# must not kill the worker, otherwise queued items stay
289+
# unflushed until shutdown.
290+
try:
291+
await self._drain()
292+
except asyncio.CancelledError:
293+
raise
294+
except Exception:
295+
logger.exception("Tracing worker iteration failed; continuing")
296+
297+
# Final drain on shutdown.
298+
try:
299+
await self._drain()
300+
except Exception:
301+
logger.exception("Final tracing drain failed; some spans may be lost")
302+
except asyncio.CancelledError:
303+
raise
304+
except Exception:
305+
logger.exception("Async tracing worker crashed")
306+
307+
async def _drain(self) -> None:
308+
if self._queue is None or self.sgp_async_client is None:
309+
return
310+
while not self._queue.empty():
311+
batch: list[dict] = []
312+
while len(batch) < DEFAULT_MAX_BATCH_SIZE and not self._queue.empty():
313+
try:
314+
sgp_span = self._queue.get_nowait()
315+
except asyncio.QueueEmpty:
316+
break
317+
try:
318+
batch.append(sgp_span.to_request_params())
319+
except Exception:
320+
logger.exception("Failed to build span params; dropping span")
321+
if not batch:
322+
continue
323+
await self._upsert_with_retry(batch)
324+
325+
async def _upsert_with_retry(self, batch: list[dict]) -> None:
326+
if self.sgp_async_client is None:
327+
return
328+
backoff = INITIAL_BACKOFF
329+
for attempt in range(DEFAULT_RETRIES):
330+
try:
331+
await self.sgp_async_client.spans.upsert_batch(items=batch) # type: ignore[arg-type]
332+
return
333+
except APIError as exc:
334+
if attempt == DEFAULT_RETRIES - 1:
335+
logger.error(f"Failed to export {len(batch)} spans after {DEFAULT_RETRIES} attempts: {exc.message}")
336+
return
337+
logger.warning(f"Span export failed ({exc.message}); retrying in {backoff:.1f}s")
338+
await asyncio.sleep(backoff)
339+
backoff = min(backoff * 2, MAX_BACKOFF)
340+
except asyncio.CancelledError:
341+
raise
342+
except Exception:
343+
# Unexpected error (not APIError, not cancellation): log and
344+
# drop the batch. We deliberately do not retry because we
345+
# don't know whether the request reached the server, and
346+
# the SDK already surfaces transport failures as APIError.
347+
logger.exception(f"Unexpected error exporting {len(batch)} spans; dropping batch")
348+
return

0 commit comments

Comments
 (0)