fix(tracing): batch span events and re-enable HTTP keepalive in SGPAsyncTracingProcessor#340
Closed
mohammadatallah-scale wants to merge 2 commits into
Closed
fix(tracing): batch span events and re-enable HTTP keepalive in SGPAsyncTracingProcessor#340mohammadatallah-scale wants to merge 2 commits into
mohammadatallah-scale wants to merge 2 commits into
Conversation
…yncTracingProcessor The async tracing processor previously called `await self.sgp_async_client.spans.upsert_batch(items=[one_span])` on every `on_span_start` and `on_span_end`, awaited synchronously on the agent's hot path, with `httpx.Limits(max_keepalive_connections=0)` to dodge "bound to a different event loop" errors. That meant 2 HTTP requests per span, each with a fresh TCP+TLS handshake, surfacing as start-span timeouts under load. This change mirrors the SDK's `TraceQueueManager` model in asyncio: events are enqueued in an `asyncio.Queue` and a background task drains the queue into batches of up to 50 every 4s or when 200+ events are queued, with the same retry policy (4 attempts, 0.4s -> 20s backoff). The HTTP client, queue, and worker are lazy-initialized on the running event loop on first use and re-created if the loop changes (sync-ACP / per-request loops). That removes the need to disable keepalive — the client is bound to the same loop that uses it — so every span event no longer pays a TLS handshake. Behavior preserved: - `on_span_start` / `on_span_end` signatures unchanged - `_spans` dict still tracks SGP span objects between start and end - Pre-set `processor.sgp_async_client` is respected (test injection) - `shutdown()` drains the queue, then stops the worker Tests: - All 6 existing tests pass - 3 new tests cover (a) no upsert on a single event, (b) shutdown coalesces 5 lifecycles into one batch of 10 items, (c) AsyncSGPClient is no longer constructed with a custom http_client that disables keepalive Closes OVE-2.
Found four real issues in the initial implementation while doing a correctness audit. All fixed in this commit. 1. Loop-change client preservation bug. Previously, on a loop swap the processor kept the old AsyncSGPClient even though it was bound to a dead loop, which would resurrect the original "bound to a different event loop" failure. Track the loop the *processor-owned* client was constructed on (`_client_owned_at_loop`); recreate the client when that loop is no longer the running loop. An externally injected client (e.g. test mock) is left alone because its `_client_owned_at_loop` stays None. 2. Worker-iteration error handling. A single bad batch (e.g. an unexpected exception in `_drain` or `_upsert_with_retry`) would propagate out of the while loop and kill the worker, leaving queued items unflushed until `shutdown()`. Wrap each iteration in a try / except so one bad batch does not stop the worker. 3. `_upsert_with_retry` only caught `APIError`. Any unexpected exception (TypeError, programming bug, etc.) would propagate up. Add a broad except that logs and drops the batch — we deliberately do not retry because we cannot tell whether the request reached the server, and the SDK already wraps transport failures as APIError. 4. `shutdown()` called `_ensure_started()` even when the processor had never been started, spinning up a worker just to tear it down. Early-return when `_worker is None`. Tests added: - test_owned_client_recreated_after_loop_swap — verifies issue (1). - test_injected_client_preserved_across_reinit — verifies that an injected client is not replaced even on simulated loop swap. All 18 tracing tests pass.
This was referenced May 4, 2026
Author
|
Splitting this into two stacked PRs to make review easier:
Part 1 is reviewable on its own. Part 2 is the actual behavior change, scoped to the buffer + worker logic, and its diff against #341 is small. Closing this PR; the combined branch |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes OVE-2 (https://linear.app/scale-epd/issue/OVE-2).
Bug
SGPAsyncTracingProcessorpreviously calledawait client.spans.upsert_batch(items=[one_span])on everyon_span_startand everyon_span_end, awaited synchronously on the agent's hot path, withhttpx.Limits(max_keepalive_connections=0)to dodge "bound to a different event loop" errors. Per span you got 2 HTTP requests, each with a fresh TCP+TLS handshake, each going through identity-service auth, each landing on the batch endpoint with a single item. Surfaces as start-span timeouts under load.SGPSyncTracingProcessoris unaffected — it inherits the SDK'sTraceQueueManager, which does this correctly.Fix
Mirror the SDK's queue manager in asyncio:
asyncio.Queue(max 4000) onon_span_start/on_span_end.asyncio.Taskdrains the queue into batches of up to 50, every 4s or when 200+ events are queued.max_keepalive_connections=0workaround was avoiding — and lets keepalive stay on so we don't pay a TLS handshake on every span event.shutdown()signals the worker, drains the queue, and joins with a 10s timeout.Constants and retry policy are mirrored from
scale_gp_beta/lib/tracing/trace_queue_manager.pyso the two processors batch the same way on the wire.Behavior preserved
on_span_start/on_span_end/shutdownsignatures unchanged._spansdict still tracks SGP span objects between start and end (for output / metadata / end_time mutation).processor.sgp_async_clientis respected (test-injection support).Tests
TestSGPAsyncTracingProcessorBatching:test_span_event_does_not_trigger_immediate_upsert— regression: a single event must not hit the network.test_shutdown_flushes_queued_spans_in_one_batch— 5 lifecycles (10 events) coalesce into a singleupsert_batchcall.test_async_client_constructed_without_disabling_keepalive—AsyncSGPClientis no longer built with a customhttp_clientthat disables keepalive.Test plan
pytest tests/lib/core/tracing/processors/test_sgp_tracing_processor.py(9/9 pass)pytest tests/lib/core/tracing/(16/16 pass)ruff checkcleanruff format --checkcleanpython -c "import agentex"succeedsRelated