Skip to content

fix(tracing): batch span events and re-enable HTTP keepalive in SGPAsyncTracingProcessor#340

Closed
mohammadatallah-scale wants to merge 2 commits into
mainfrom
mohammad/ove-2-async-tracing-batching
Closed

fix(tracing): batch span events and re-enable HTTP keepalive in SGPAsyncTracingProcessor#340
mohammadatallah-scale wants to merge 2 commits into
mainfrom
mohammad/ove-2-async-tracing-batching

Conversation

@mohammadatallah-scale
Copy link
Copy Markdown

Closes OVE-2 (https://linear.app/scale-epd/issue/OVE-2).

Bug

SGPAsyncTracingProcessor previously called await client.spans.upsert_batch(items=[one_span]) on every on_span_start and every on_span_end, awaited synchronously on the agent's hot path, with httpx.Limits(max_keepalive_connections=0) to dodge "bound to a different event loop" errors. Per span you got 2 HTTP requests, each with a fresh TCP+TLS handshake, each going through identity-service auth, each landing on the batch endpoint with a single item. Surfaces as start-span timeouts under load.

SGPSyncTracingProcessor is unaffected — it inherits the SDK's TraceQueueManager, which does this correctly.

Fix

Mirror the SDK's queue manager in asyncio:

  • Events are enqueued in an asyncio.Queue (max 4000) on on_span_start / on_span_end.
  • A background asyncio.Task drains the queue into batches of up to 50, every 4s or when 200+ events are queued.
  • Retries match the SDK: 4 attempts with exponential backoff (0.4s → 20s capped).
  • The HTTP client, queue, and worker task are lazy-initialized on the running event loop on first use and re-created if the loop changes. That binds the client to the same loop that consumes it — which is what the original max_keepalive_connections=0 workaround was avoiding — and lets keepalive stay on so we don't pay a TLS handshake on every span event.
  • shutdown() signals the worker, drains the queue, and joins with a 10s timeout.

Constants and retry policy are mirrored from scale_gp_beta/lib/tracing/trace_queue_manager.py so the two processors batch the same way on the wire.

Behavior preserved

  • on_span_start / on_span_end / shutdown signatures unchanged.
  • _spans dict still tracks SGP span objects between start and end (for output / metadata / end_time mutation).
  • A pre-set processor.sgp_async_client is respected (test-injection support).

Tests

  • All 6 existing async/sync processor tests pass unchanged.
  • 3 new tests in TestSGPAsyncTracingProcessorBatching:
    • test_span_event_does_not_trigger_immediate_upsert — regression: a single event must not hit the network.
    • test_shutdown_flushes_queued_spans_in_one_batch — 5 lifecycles (10 events) coalesce into a single upsert_batch call.
    • test_async_client_constructed_without_disabling_keepaliveAsyncSGPClient is no longer built with a custom http_client that disables keepalive.

Test plan

  • pytest tests/lib/core/tracing/processors/test_sgp_tracing_processor.py (9/9 pass)
  • pytest tests/lib/core/tracing/ (16/16 pass)
  • ruff check clean
  • ruff format --check clean
  • python -c "import agentex" succeeds
  • CI passes
  • Manual verification under burst load that start-span latency drops materially (handoff to whoever has the agent / Datadog access)

Related

  • Issue: OVE-2
  • Temporal-side guidance (separate, already published in sgp-docs): scaleapi/sgp-docs#162

…yncTracingProcessor

The async tracing processor previously called
`await self.sgp_async_client.spans.upsert_batch(items=[one_span])` on
every `on_span_start` and `on_span_end`, awaited synchronously on the
agent's hot path, with `httpx.Limits(max_keepalive_connections=0)` to
dodge "bound to a different event loop" errors. That meant 2 HTTP
requests per span, each with a fresh TCP+TLS handshake, surfacing as
start-span timeouts under load.

This change mirrors the SDK's `TraceQueueManager` model in asyncio:
events are enqueued in an `asyncio.Queue` and a background task drains
the queue into batches of up to 50 every 4s or when 200+ events are
queued, with the same retry policy (4 attempts, 0.4s -> 20s backoff).

The HTTP client, queue, and worker are lazy-initialized on the running
event loop on first use and re-created if the loop changes (sync-ACP /
per-request loops). That removes the need to disable keepalive — the
client is bound to the same loop that uses it — so every span event
no longer pays a TLS handshake.

Behavior preserved:
- `on_span_start` / `on_span_end` signatures unchanged
- `_spans` dict still tracks SGP span objects between start and end
- Pre-set `processor.sgp_async_client` is respected (test injection)
- `shutdown()` drains the queue, then stops the worker

Tests:
- All 6 existing tests pass
- 3 new tests cover (a) no upsert on a single event, (b) shutdown
  coalesces 5 lifecycles into one batch of 10 items, (c) AsyncSGPClient
  is no longer constructed with a custom http_client that disables
  keepalive

Closes OVE-2.
Found four real issues in the initial implementation while doing a
correctness audit. All fixed in this commit.

1. Loop-change client preservation bug. Previously, on a loop swap the
   processor kept the old AsyncSGPClient even though it was bound to a
   dead loop, which would resurrect the original "bound to a different
   event loop" failure. Track the loop the *processor-owned* client was
   constructed on (`_client_owned_at_loop`); recreate the client when
   that loop is no longer the running loop. An externally injected
   client (e.g. test mock) is left alone because its
   `_client_owned_at_loop` stays None.

2. Worker-iteration error handling. A single bad batch (e.g. an
   unexpected exception in `_drain` or `_upsert_with_retry`) would
   propagate out of the while loop and kill the worker, leaving queued
   items unflushed until `shutdown()`. Wrap each iteration in a try /
   except so one bad batch does not stop the worker.

3. `_upsert_with_retry` only caught `APIError`. Any unexpected
   exception (TypeError, programming bug, etc.) would propagate up. Add
   a broad except that logs and drops the batch — we deliberately do
   not retry because we cannot tell whether the request reached the
   server, and the SDK already wraps transport failures as APIError.

4. `shutdown()` called `_ensure_started()` even when the processor
   had never been started, spinning up a worker just to tear it down.
   Early-return when `_worker is None`.

Tests added:
- test_owned_client_recreated_after_loop_swap — verifies issue (1).
- test_injected_client_preserved_across_reinit — verifies that an
  injected client is not replaced even on simulated loop swap.

All 18 tracing tests pass.
@mohammadatallah-scale
Copy link
Copy Markdown
Author

Splitting this into two stacked PRs to make review easier:

Part 1 is reviewable on its own. Part 2 is the actual behavior change, scoped to the buffer + worker logic, and its diff against #341 is small.

Closing this PR; the combined branch mohammad/ove-2-async-tracing-batching is left intact as a backup until both parts merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant