Skip to content

Python: OTel _finalize_stream triggers result hooks on streaming error, causing after_run providers to fire incorrectly #5231

@droideronline

Description

@droideronline

Summary

AgentTelemetryLayer._trace_agent_invocation registers _finalize_stream as a cleanup hook on the ResponseStream. Cleanup hooks run on both normal completion and streaming errors. Inside _finalize_stream, get_final_response() is called unconditionally to capture telemetry for the OTel span.

The problem: get_final_response() always runs result hooks registered on the stream. One such result hook is _post_hook (in _agents.py) which calls _run_after_providers. This means after_run context providers fire even when the stream errored — which is incorrect behaviour.

Root cause

In observability.py:

async def _finalize_stream() -> None:
    try:
        response = await result_stream.get_final_response()  # ← always called, even on error
        # ... capture telemetry
    except Exception as exception:
        capture_exception(span=span, exception=exception, timestamp=time_ns())
    finally:
        _close_span()

ResponseStream.__anext__ runs cleanup hooks from two branches:

except StopAsyncIteration:       # normal completion
    self._consumed = True
    await self._run_cleanup_hooks()
    await self.get_final_response()
    raise

except Exception:                # streaming error
    await self._run_cleanup_hooks()   # _finalize_stream runs here too
    raise

On the error path _consumed is False. _finalize_stream has no way to distinguish the two paths, so it calls get_final_response() regardless — which fires result hooks such as _post_hook / _after_run_hook.

Impact

Affects all agents that use AgentTelemetryLayer (i.e., any agent with OTel enabled). After_run context providers are invoked after a streaming failure, which can cause unexpected side effects (e.g., session persistence, audit logging) to run on error responses.

The issue was discovered via test_after_run_not_called_on_streaming_error in the github_copilot package, which fails when ENABLE_INSTRUMENTATION=true is set (or leaks from another test worker in a parallel xdist run).

Proposed fix

In _finalize_stream, skip get_final_response() when the stream did not complete normally. result_stream._consumed is True only after StopAsyncIteration (normal end-of-stream), so it can serve as the guard:

async def _finalize_stream() -> None:
    try:
        if not result_stream._consumed:
            # Stream errored before completing; skip get_final_response() to
            # avoid firing result hooks (e.g., after_run providers) on error paths.
            return
        response = await result_stream.get_final_response()
        # ... capture telemetry
    except Exception as exception:
        capture_exception(span=span, exception=exception, timestamp=time_ns())
    finally:
        INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(...)
        INNER_ACCUMULATED_USAGE.reset(...)
        _close_span()

The span is still closed via the finally block. The caller's OTel span (which wraps the iteration loop) will capture the exception on the error path.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions