Skip to content

Conversation

Copilot
Copy link

@Copilot Copilot AI commented Oct 14, 2025

Problem

When using DSPy's streaming feature (dspy.streamify()) with async tools (like MCP tools or ReAct tool calls), the stream would freeze indefinitely after a tool completed execution. The stream never continued past the "Tool calling finished!" status message, causing the application to hang.

import dspy

async def async_tool(query: str):
    await asyncio.sleep(0.5)  # Simulate async operation
    return f"Results for {query}"

class MyProgram(dspy.Module):
    def __init__(self):
        self.tool = dspy.Tool(async_tool)
        self.predict = dspy.Predict("data->analysis")
    
    async def aforward(self, query: str):
        data = await self.tool.acall(query=query)
        return await self.predict.acall(data=data)

program = dspy.streamify(MyProgram(), is_async_program=True)
output = program(query="test")

async for value in output:
    print(value)  # Would freeze after "Tool calling finished!" message

Root Cause

The sync_send_to_stream function was using ThreadPoolExecutor to send messages when running in an event loop:

# OLD CODE - DEADLOCK PRONE
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(run_in_new_loop)
    return future.result()  # Blocks the event loop

The issue occurred because:

  1. Async tool callbacks run synchronously within the event loop context (via @with_callbacks decorator)
  2. They call sync_send_to_stream to send status messages
  3. The function created a new thread with its own event loop
  4. future.result() blocked the current thread, causing the event loop to deadlock

Solution

Replaced the ThreadPoolExecutor approach with anyio's send_nowait() method, which is a synchronous, non-blocking method available on MemoryObjectSendStream:

# NEW CODE - NON-BLOCKING
try:
    asyncio.get_running_loop()
    
    # If we're in an event loop, use send_nowait to avoid blocking
    stream.send_nowait(message)
    return None
except RuntimeError:
    # Not in an event loop, use syncify as before
    return syncify(_send)()

This allows callbacks to send status messages without blocking the event loop, fixing the deadlock while maintaining backward compatibility.

Changes

  • Modified sync_send_to_stream in dspy/streaming/messages.py to use stream.send_nowait()
  • Removed unused concurrent.futures import
  • Added regression test test_async_tool_streaming_no_deadlock to prevent future regressions

Testing

  • ✅ All existing streaming tests pass (7/7)
  • ✅ New regression test validates the fix with async tools
  • ✅ Verified with MCP-like async tool scenarios
  • ✅ No breaking changes to existing functionality

Impact

This fix enables:

  • MCP tool integration with streaming
  • ReAct agents with async tool calls
  • Any custom async tools in streamified programs

The change also improves performance by eliminating thread creation overhead.

Original prompt

This section details on the original issue you should resolve

<issue_title>Streaming deadlocks with async tools - sync_send_to_stream blocks event loop</issue_title>
<issue_description># Streaming deadlocks when using async tools/callbacks - sync_send_to_stream blocks event loop

🐛 Bug Description

When using DSPy's streaming feature (dspy.streamify()) with async tools (like MCP tools or ReAct tool calls), the streaming freezes indefinitely after a tool completes execution. The stream never continues past "Tool calling finished!" status message.

💥 Error Message

WARNING dspy.utils.callback: Error when applying callback <dspy.streaming.messages.StatusStreamingCallback object>'s end handler on function acall: Not running inside an AnyIO worker thread, and no event loop token was provided.

🔍 Root Cause

The sync_send_to_stream() function in dspy/streaming/messages.py blocks the event loop in two ways:

  1. When event loop IS running (lines 45-47):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(run_in_new_loop)
    return future.result()  # ← BLOCKS the main event loop!
  1. When event loop is NOT running (line 50):
return syncify(_send)()  # ← Raises AnyIO error without proper context

The blocking future.result() call prevents the stream from continuing, causing a deadlock when StatusStreamingCallback.on_tool_end() fires after async tool execution.

📋 Reproduction Steps

import asyncio
import dspy
from dspy.streaming import StreamListener

# 1. Set up streaming with async tool
lm = dspy.LM(model="openai/gpt-4o")

def my_async_tool(query: str) -> str:
    """An async tool that simulates work."""
    import time
    time.sleep(0.1)  # Simulate async work
    return f"Result for: {query}"

signature = dspy.Signature(
    {"input": (str, dspy.InputField()), "output": (str, dspy.OutputField())}
)

# 2. Create ReAct program with tools
program = dspy.ReAct(signature, tools=[my_async_tool], max_iters=2)

# 3. Wrap with streamify
streaming_task = dspy.streamify(
    program,
    is_async_program=True,
    stream_listeners=[StreamListener(signature_field_name="output")]
)

# 4. Execute - this will freeze after tool execution
async def run():
    with dspy.context(lm=lm):
        stream = streaming_task(input="test query")
        async for value in stream:
            print(value)

asyncio.run(run())
# ⚠️ Stream freezes at "Tool calling finished!" - never completes

🔧 Proposed Fix

Replace the blocking implementation with non-blocking fire-and-forget:

def sync_send_to_stream(stream, message):
    """Send message to stream without blocking the event loop."""

    async def _send():
        try:
            await stream.send(message)
        except Exception:
            pass  # Gracefully handle send failures

    try:
        loop = asyncio.get_running_loop()
        # Non-blocking: schedule as background task (fire-and-forget)
        loop.create_task(_send())
    except RuntimeError:
        # No event loop running - safe to create new one
        try:
            asyncio.run(_send())
        except Exception:
            pass  # Gracefully handle failures

Key changes:

  • Remove ThreadPoolExecutor + blocking future.result()
  • Use loop.create_task() for fire-and-forget pattern
  • Never block the calling thread
  • Gracefully handle errors

✅ Testing

We've tested this fix extensively with:

  • Multiple concurrent streams
  • MCP tool integrations
  • WebSocket broadcasting
  • Long-running tool executions

The fix eliminates the deadlock while maintaining all streaming functionality.

🌍 Impact

This bug affects any use case involving:

  • Async tools with streaming
  • ReAct agents with tool calls
  • Real-time status updates during tool execution
  • Multi-agent systems with parallel streaming

📦 Version Info

  • DSPy Version: 3.0.4b1 (also present in 3.0.0)
  • Python: 3.10+
  • Environment: Async event loop context

🤝 Contribution

We have a working implementation and can submit a PR with tests if this approach looks good to the maintainers.

Let me know if you need any additional details or reproduction examples!
</issue_description>

<agent_instructions>Can you investigate the deadlock issue and file a fix?</agent_instructions>

Comments on the Issue (you are @copilot in this section)

Fixes #8934


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

@Copilot Copilot AI changed the title [WIP] Fix streaming deadlocks with async tools Fix streaming deadlock with async tools by using send_nowait Oct 14, 2025
@Copilot Copilot AI requested a review from TomeHirata October 14, 2025 09:33
Copilot finished work on behalf of TomeHirata October 14, 2025 09:33
@TomeHirata
Copy link
Collaborator

@chenmoneygithub What do you think about the approach? If you agree, I can clean up the code and make this ready for review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming deadlocks with async tools - sync_send_to_stream blocks event loop

2 participants