fix(tracing) batch span events in SGPAsyncTracingProcessor#342
fix(tracing) batch span events in SGPAsyncTracingProcessor#342mohammadatallah-scale wants to merge 4 commits into
Conversation
26306bc to
dc129ef
Compare
`SGPAsyncTracingProcessor` previously `await client.spans.upsert_batch(items=[one_span])` on every `on_span_start` and every `on_span_end`, awaited synchronously on the agent's hot path. Under burst load this surfaces as start-span timeouts. The synchronous processor inherits the SDK's `TraceQueueManager` and is fine; this change brings the async one in line. Replace the per-event upsert with a buffer-plus-flush model in asyncio: - asyncio.Queue (max 4000) plus an asyncio.Task worker that drains in batches of up to 50, every 4s or when 200+ events queued. Constants mirror scale_gp_beta.lib.tracing.trace_queue_manager. - Retry policy mirrors the SDK: 4 attempts with exponential backoff (0.4s -> 20s capped). Unexpected exceptions are dropped, not retried. - Per-iteration try / except in the worker so one bad batch doesn't break subsequent flushes. - shutdown() drains the queue, signals the worker, joins with a 10s timeout. Spans whose end was never recorded are re-enqueued so they aren't silently lost. Behavior preserved: - on_span_start / on_span_end / shutdown signatures unchanged. - _spans dict still tracks SGPSpan objects between start and end. - Externally injected sgp_async_client is still respected. - The existing AsyncSGPClient construction in __init__, with the max_keepalive_connections=0 workaround, is kept untouched. Closes OVE-4.
dc129ef to
81e2021
Compare
Code clarity changes (no behavior change): - Split SGPAsyncTracingProcessor._run into _is_shutting_down, _wait_for_flush_signal, and _safe_drain helpers so the loop reads top-to-bottom: "while not shutting down, wait for trigger, drain." - Add docstrings on _enqueue, _ensure_started, _drain, and _upsert_with_retry covering inputs, side effects, and dropped-batch semantics. New tests (regression coverage that was missing): - test_drain_splits_into_multiple_batches_above_max_batch_size: 80 enqueued events split into multiple upsert_batch calls, each batch capped at MAX_BATCH_SIZE (50). - test_worker_continues_after_unexpected_exception_in_one_batch: a RuntimeError on one upsert drops that batch; the worker keeps flushing and a subsequent batch lands. Exercises the per-iteration try/except in _run.
…se tests
Two related changes addressing review feedback:
1. Restore observability for disabled processors. When the rewrite
moved the per-event 'SGP is disabled, skipping span upsert' warning
out of on_span_start it left the disabled state silent. The original
per-event log was spammy (one entry per span event at agent
throughput) and inconsistent (only on_span_start had it, on_span_end
did not). Replace it with a single warning at __init__ time.
2. Edge-case tests in TestSGPAsyncTracingProcessorEdgeCases:
- test_disabled_processor_never_enqueues_or_calls_upsert: confirms
a disabled processor builds no client, no queue, no worker, and
shutdown is a no-op.
- test_shutdown_is_safe_when_called_multiple_times: idempotency
regression. Second shutdown after the worker has already exited
does not re-flush or raise.
- test_shutdown_before_any_event_is_noop: shutdown invoked before
any span event must early-return without spinning up a worker.
- test_apierror_triggers_retry_then_drops_batch_on_exhaustion:
APIError is retried up to DEFAULT_RETRIES, batch is dropped after
exhaustion. asyncio.sleep is patched to keep the test fast.
CI pyright reports reportPrivateImportUsage on `from scale_gp_beta._exceptions import APIError` because the `_exceptions` submodule is private. Re-export of `APIError` is available at the package root, so use that instead.
| self._add_source_to_span(span) | ||
| sgp_span.input = span.input # type: ignore[assignment] | ||
| sgp_span.output = span.output # type: ignore[assignment] | ||
| sgp_span.metadata = span.data # type: ignore[assignment] | ||
| sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr] |
There was a problem hiding this comment.
sgp_span.input no longer updated on span end
on_span_end previously contained sgp_span.input = span.input, which allowed callers to augment the input between start and end (the canonical case being LLM conversation messages that grow as the turn completes). That line is absent from this PR; the end-enqueue only carries output, metadata, and end_time. Any caller that sets span.input after on_span_start — exactly the scenario exercised by test_sgp_span_input_updated_on_end — will have the stale start-time input persisted to SGP instead of the final value.
The test was updated to check batching mechanics but the assertion that the updated input reached the upsert was removed, hiding this regression. If the intent is to intentionally align with the sync processor (which also skips input on end), that should be documented in the PR description and the test should be renamed to remove the misleading "input updated on end" framing.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Line: 218-221
Comment:
**`sgp_span.input` no longer updated on span end**
`on_span_end` previously contained `sgp_span.input = span.input`, which allowed callers to augment the input between start and end (the canonical case being LLM conversation messages that grow as the turn completes). That line is absent from this PR; the end-enqueue only carries `output`, `metadata`, and `end_time`. Any caller that sets `span.input` after `on_span_start` — exactly the scenario exercised by `test_sgp_span_input_updated_on_end` — will have the stale start-time input persisted to SGP instead of the final value.
The test was updated to check batching mechanics but the assertion that the updated input reached the upsert was removed, hiding this regression. If the intent is to intentionally align with the sync processor (which also skips `input` on end), that should be documented in the PR description and the test should be renamed to remove the misleading "input updated on end" framing.
How can I resolve this? If you propose a fix, please make it concise.|
curious what the overlap here is with https://github.com/scaleapi/scale-agentex-python/pull/303/changes which added a background queue for processor.on_span_start calls. |
Closes OVE-4 and also a TODO in the code. Should help BP's agent that's running in start_span occasional timeouts as well.
Bug
SGPAsyncTracingProcessor.on_span_startandon_span_endeachawait client.spans.upsert_batch(items=[one_span])on the agent's hot path. The synchronous processor inherits the SDK'sTraceQueueManagerand is fine; the async one needs to match its behavior.Fix
Replace the per-event upsert with a buffer-plus-flush model in asyncio:
asyncio.Queue(max 4000) plus anasyncio.Taskworker that drains in batches of up to 50, every 4s or when 200+ events queued. Constants mirrorscale_gp_beta.lib.tracing.trace_queue_manager.0.4s -> 20scapped). Unexpected exceptions are dropped, not retried.try / exceptin the worker so one bad batch doesn't break subsequent flushes.shutdown()drains the queue, signals the worker, joins with a 10s timeout. Spans whose end was never recorded are re-enqueued so they aren't silently lost.Impact
on_span_startandon_span_endare now in-memory queue inserts (~µs) instead of awaited HTTP roundtrips (~50-200ms). The agent no longer waits on the SGP API per span event.Applies to any agent emitting spans through the async processor.
Code organization
_runis split into_is_shutting_down,_wait_for_flush_signal, and_safe_drainso the worker reads top-to-bottom: "while not shutting down, wait for trigger, drain. Final drain. Done."_enqueue,_ensure_started,_drain,_upsert_with_retry,_wait_for_flush_signal,_safe_drain) have docstrings explaining inputs, side effects, and dropped-batch semantics.Observability
Disabled processors (no
sgp_api_keyorsgp_account_id) now emit a singleWARNINGat__init__instead of one warning per span event. The previous per-event log would flood at agent throughput.Behavior preserved
on_span_start/on_span_end/shutdownsignatures unchanged._spansdict still tracksSGPSpanobjects between start and end.sgp_async_clientis still respected.test_sgp_span_input_updated_on_endupdated: no upsert on the hot path; the upsert is observed aftershutdown()instead.Tests
15 tests in
test_sgp_tracing_processor.py(6 pre-existing lifecycle/mem-leak tests + 9 added here):Batching (
TestSGPAsyncTracingProcessorBatching):test_span_event_does_not_trigger_immediate_upsert— single event must not hit the network.test_shutdown_flushes_queued_spans_in_one_batch— 5 lifecycles (10 events) coalesce into one upsert.test_drain_splits_into_multiple_batches_above_max_batch_size— 80 events split across multiple upserts, each capped at 50.test_worker_continues_after_unexpected_exception_in_one_batch—RuntimeErroron one upsert drops that batch; worker stays alive and processes the next one.Edge cases (
TestSGPAsyncTracingProcessorEdgeCases):test_disabled_processor_never_enqueues_or_calls_upsert— disabled processor builds no client, no queue, no worker.test_shutdown_is_safe_when_called_multiple_times— idempotent shutdown.test_shutdown_before_any_event_is_noop— early-return when worker was never started.test_apierror_triggers_retry_then_drops_batch_on_exhaustion—APIErrorretried up toDEFAULT_RETRIES, batch dropped on exhaustion.Test plan
pytest tests/lib/core/tracing/(31 passed, 2 skipped pre-existing)ruff checkcleanruff format --checkcleanpython -c "import agentex"succeedsRisks
Mitigated by this PR (should not be seen in practice)
try / exceptin_runplus a broad-except in_upsert_with_retrycatch any non-CancelledErrorexception, drop the offending batch, and keep the worker running.to_request_paramsfailures are also caught and drop only the affected span.CancelledErrorduring normal flow. Cancellation only happens viashutdown(), which drains before signalling cancel.Actually possible in production
shutdown()runs.Likelihood ranking, highest to lowest: live-tracing latency → hard kill loss → memory pressure → retry exhaustion → loop-swap → shutdown timeout.
Greptile Summary
This PR replaces the per-event
await upsert_batch(items=[one_span])inSGPAsyncTracingProcessorwith an asyncio buffer-plus-flush model (queue max 4000, worker batches up to 50, flushes every 4s or at 200+ queued), bringing the async processor in line with the SDK's syncTraceQueueManagerand removing awaited HTTP calls from the agent hot path.sgp_span.input = span.inputwas silently removed fromon_span_end. Callers that updatespan.inputbetween start and end (e.g., growing conversation contexts) will have only the start-time input persisted to SGP.test_sgp_span_input_updated_on_endwas simultaneously weakened to only assert upsert timing, not that the input was actually updated, masking this change.self._worker.cancel()is called butself._workeris never set toNone, so a stray_ensure_startedcall during the brief cancellation window would find the task not-yet-done and skip creating a replacement worker.Confidence Score: 3/5
Not safe to merge as-is — a silent behavioral regression drops updated span inputs that were set after on_span_start.
One P1 present:
sgp_span.inputis no longer updated inon_span_end, and the covering test was weakened to not assert the input value, so the regression ships silently. P1 ceiling is 4/5 but the fact that the test masking the regression was intentionally modified pulls confidence to 3/5.src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py — on_span_end input update and shutdown timeout cleanup; tests/lib/core/tracing/processors/test_sgp_tracing_processor.py — test_sgp_span_input_updated_on_end assertion coverage.
Important Files Changed
sgp_span.inputis no longer updated inon_span_end, silently dropping input mutations made after span start. Shutdown timeout path also leaves_workernon-None after cancel, though this is P2.test_sgp_span_input_updated_on_endwas weakened to check only upsert timing, no longer asserting that the input was actually updated on end, masking the P1 regression in the processor.Sequence Diagram
sequenceDiagram participant Agent participant Processor as SGPAsyncTracingProcessor participant Queue as asyncio.Queue participant Worker as _run() task participant SGP as SGP API Agent->>Processor: on_span_start(span) Processor->>Processor: _ensure_started() Processor->>Queue: put_nowait(sgp_span) Note over Queue: qsize >= 200? set _flush_event Agent->>Processor: on_span_end(span) Processor->>Queue: put_nowait(sgp_span) loop Every 4s or flush_event set Worker->>Worker: _wait_for_flush_signal() Worker->>Queue: drain up to 50 items Worker->>SGP: upsert_batch(items=[<=50]) Note over Worker,SGP: retry up to 4x on APIError end Agent->>Processor: shutdown() Processor->>Queue: re-enqueue unclosed spans Processor->>Worker: set _shutdown_event + _flush_event Worker->>Queue: final drain Worker->>SGP: upsert_batch(remaining items) Worker-->>Processor: done Processor-->>Agent: shutdown completePrompt To Fix All With AI
Reviews (5): Last reviewed commit: "fix(tracing): import APIError from scale..." | Re-trigger Greptile