Skip to content

Commit 89b980f

Browse files
Fix async streaming support for Agno Team.arun() - Resolves #1262 (#1265)
* Fix async streaming support for Agno Team.arun() Fixes #1262 The issue was that Team.arun() with stream=True returns an async generator directly, not a coroutine. The team async wrapper was defined as 'async def' which caused it to wrap the async generator in a coroutine, leading to the error: 'async for' requires an object with __aiter__ method, got coroutine. Changes: - Added AsyncStreamingResultWrapper class to properly wrap async generators - Modified create_team_async_wrapper to be a regular function (not async) - Added inspect.isasyncgen() check to detect async generators - Updated create_streaming_agent_async_wrapper to check for __aiter__ first The fix ensures that async generators are returned directly without being wrapped in a coroutine, allowing proper async iteration with 'async for'. Co-Authored-By: Alex <[email protected]> * Fix Agent async streaming support Also fixes Agent.arun() async streaming in addition to Team.arun(). Changes: - Changed create_streaming_agent_async_wrapper from async def to def - Added inspect.isasyncgen() check for detecting async generators - For non-streaming case, created inner async_wrapper function - Both Team and Agent async streaming now work correctly Co-Authored-By: Alex <[email protected]> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alex <[email protected]>
1 parent 3b1481f commit 89b980f

File tree

1 file changed

+84
-34
lines changed

1 file changed

+84
-34
lines changed

agentops/instrumentation/agentic/agno/instrumentor.py

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,42 @@ def __getattr__(self, name):
115115
return getattr(self.original_result, name)
116116

117117

118+
class AsyncStreamingResultWrapper:
119+
"""Wrapper for async streaming results that maintains agent span as active throughout iteration."""
120+
121+
def __init__(self, original_result, span, agent_id, agent_context, streaming_context_manager):
122+
self.original_result = original_result
123+
self.span = span
124+
self.agent_id = agent_id
125+
self.agent_context = agent_context
126+
self.streaming_context_manager = streaming_context_manager
127+
self._consumed = False
128+
129+
def __aiter__(self):
130+
"""Return async iterator that keeps agent span active during iteration."""
131+
return self
132+
133+
async def __anext__(self):
134+
"""Async iteration that keeps agent span active."""
135+
context_token = otel_context.attach(self.agent_context)
136+
try:
137+
item = await self.original_result.__anext__()
138+
return item
139+
except StopAsyncIteration:
140+
# Clean up when iteration is complete
141+
if not self._consumed:
142+
self._consumed = True
143+
self.span.end()
144+
self.streaming_context_manager.remove_context(self.agent_id)
145+
raise
146+
finally:
147+
otel_context.detach(context_token)
148+
149+
def __getattr__(self, name):
150+
"""Delegate attribute access to the original result."""
151+
return getattr(self.original_result, name)
152+
153+
118154
def create_streaming_workflow_wrapper(tracer, streaming_context_manager):
119155
"""Create a streaming-aware wrapper for workflow run methods."""
120156

@@ -391,7 +427,9 @@ def wrapper(wrapped, instance, args, kwargs):
391427
def create_streaming_agent_async_wrapper(tracer, streaming_context_manager):
392428
"""Create a streaming-aware async wrapper for agent run methods."""
393429

394-
async def wrapper(wrapped, instance, args, kwargs):
430+
def wrapper(wrapped, instance, args, kwargs):
431+
import inspect
432+
395433
# Get agent ID for context storage
396434
agent_id = getattr(instance, "agent_id", None) or getattr(instance, "id", None) or id(instance)
397435
agent_id = str(agent_id)
@@ -427,7 +465,7 @@ async def wrapper(wrapped, instance, args, kwargs):
427465
# Execute the original function within agent context
428466
context_token = otel_context.attach(current_context)
429467
try:
430-
result = await wrapped(*args, **kwargs)
468+
result = wrapped(*args, **kwargs)
431469
finally:
432470
otel_context.detach(context_token)
433471

@@ -442,7 +480,11 @@ async def wrapper(wrapped, instance, args, kwargs):
442480
span.set_status(Status(StatusCode.OK))
443481

444482
# Wrap the result to maintain context and end span when complete
445-
if hasattr(result, "__iter__"):
483+
if inspect.isasyncgen(result):
484+
return AsyncStreamingResultWrapper(
485+
result, span, agent_id, current_context, streaming_context_manager
486+
)
487+
elif hasattr(result, "__iter__"):
446488
return StreamingResultWrapper(result, span, agent_id, current_context, streaming_context_manager)
447489
else:
448490
# Not actually streaming, clean up immediately
@@ -457,32 +499,35 @@ async def wrapper(wrapped, instance, args, kwargs):
457499
streaming_context_manager.remove_context(agent_id)
458500
raise
459501
else:
460-
# For non-streaming, use normal context manager
461-
with tracer.start_as_current_span(span_name) as span:
462-
try:
463-
# Set agent attributes
464-
attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs)
465-
for key, value in attributes.items():
466-
span.set_attribute(key, value)
502+
# For non-streaming, need to handle async call
503+
async def async_wrapper():
504+
with tracer.start_as_current_span(span_name) as span:
505+
try:
506+
# Set agent attributes
507+
attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs)
508+
for key, value in attributes.items():
509+
span.set_attribute(key, value)
467510

468-
# Execute the original function
469-
result = await wrapped(*args, **kwargs)
511+
# Execute the original function
512+
result = await wrapped(*args, **kwargs)
470513

471-
# Set result attributes
472-
result_attributes = get_agent_run_attributes(
473-
args=(instance,) + args, kwargs=kwargs, return_value=result
474-
)
475-
for key, value in result_attributes.items():
476-
if key not in attributes: # Avoid duplicates
477-
span.set_attribute(key, value)
514+
# Set result attributes
515+
result_attributes = get_agent_run_attributes(
516+
args=(instance,) + args, kwargs=kwargs, return_value=result
517+
)
518+
for key, value in result_attributes.items():
519+
if key not in attributes: # Avoid duplicates
520+
span.set_attribute(key, value)
478521

479-
span.set_status(Status(StatusCode.OK))
480-
return result
522+
span.set_status(Status(StatusCode.OK))
523+
return result
481524

482-
except Exception as e:
483-
span.set_status(Status(StatusCode.ERROR, str(e)))
484-
span.record_exception(e)
485-
raise
525+
except Exception as e:
526+
span.set_status(Status(StatusCode.ERROR, str(e)))
527+
span.record_exception(e)
528+
raise
529+
530+
return async_wrapper()
486531

487532
return wrapper
488533

@@ -835,7 +880,9 @@ def wrapper(wrapped, instance, args, kwargs):
835880
def create_team_async_wrapper(tracer, streaming_context_manager):
836881
"""Create an async wrapper for Team methods that establishes the team context."""
837882

838-
async def wrapper(wrapped, instance, args, kwargs):
883+
def wrapper(wrapped, instance, args, kwargs):
884+
import inspect
885+
839886
# Get team ID for context storage
840887
team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance)
841888
team_id = str(team_id)
@@ -863,17 +910,20 @@ async def wrapper(wrapped, instance, args, kwargs):
863910
# Execute the original function within team context
864911
context_token = otel_context.attach(current_context)
865912
try:
866-
result = await wrapped(*args, **kwargs)
867-
868-
# For non-streaming, close the span
869-
if not is_streaming:
870-
span.end()
871-
streaming_context_manager.remove_context(team_id)
872-
873-
return result
913+
result = wrapped(*args, **kwargs)
874914
finally:
875915
otel_context.detach(context_token)
876916

917+
# For streaming, wrap the result to maintain context
918+
if is_streaming and inspect.isasyncgen(result):
919+
return AsyncStreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager)
920+
elif hasattr(result, "__iter__"):
921+
return StreamingResultWrapper(result, span, team_id, current_context, streaming_context_manager)
922+
else:
923+
span.end()
924+
streaming_context_manager.remove_context(team_id)
925+
return result
926+
877927
except Exception as e:
878928
span.set_status(Status(StatusCode.ERROR, str(e)))
879929
span.record_exception(e)

0 commit comments

Comments
 (0)