-
Notifications
You must be signed in to change notification settings - Fork 5.4k
π Fix and Feature: Improve CrewAI Execution Stability and Add Streaming Support #3856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
SuZeAI
wants to merge
3
commits into
crewAIInc:main
Choose a base branch
from
SuZeAI:fix/early_stop_crew
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| """ | ||
| CrewAI Streaming Module | ||
|
|
||
| Provides easy-to-use streaming functionality for CrewAI executions. | ||
|
|
||
| Usage: | ||
| # Simple usage | ||
| from crewai.streaming import stream_crew_execution, CrewStreamer | ||
|
|
||
| async for token in stream_crew_execution(crew_instance, inputs): | ||
| print(token, end="", flush=True) | ||
| """ | ||
|
|
||
| from .streaming import ( | ||
| CrewStreamer, | ||
| CrewStreamListener, | ||
| stream_crew_execution, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| # Basic streaming | ||
| "CrewStreamer", | ||
| "CrewStreamListener", | ||
| "stream_crew_execution", | ||
| ] | ||
|
|
||
| __version__ = "2.1.0" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,217 @@ | ||
| import asyncio | ||
| import logging | ||
| from typing import Any, Dict, List, Optional, AsyncGenerator | ||
| from crewai.events import ( | ||
| AgentExecutionCompletedEvent, | ||
| BaseEventListener, | ||
| CrewKickoffCompletedEvent, | ||
| LLMCallCompletedEvent, | ||
| LLMStreamChunkEvent, | ||
| TaskCompletedEvent | ||
| ) | ||
| from crewai.agents.constants import FINAL_ANSWER_ACTION | ||
|
|
||
| # Configure logger | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class CrewStreamListener(BaseEventListener): | ||
| """Event listener for streaming CrewAI execution tokens.""" | ||
|
|
||
| END_OF_STREAM = "END_OF_STREAM" | ||
|
|
||
| def __init__(self, target_agent_ids: List[str], crew_id: Optional[str] = None): | ||
| """ | ||
| Initialize the stream listener. | ||
|
|
||
| Args: | ||
| target_agent_ids: List of agent IDs to monitor for streaming | ||
| crew_id: Optional crew ID to monitor for completion | ||
| """ | ||
| super().__init__() | ||
| self.target_agent_ids = [str(agent_id) for agent_id in target_agent_ids] | ||
| self.crew_id = crew_id | ||
| self.event_queue = asyncio.Queue() | ||
| self._is_streaming = False | ||
|
|
||
| def setup_listeners(self, event_bus): | ||
| """Setup event listeners on the CrewAI event bus.""" | ||
|
|
||
| @event_bus.on(LLMStreamChunkEvent) | ||
| def on_llm_stream_chunk(source: Any, event: LLMStreamChunkEvent): | ||
| """Handle LLM stream chunk events.""" | ||
| if str(event.agent_id) in self.target_agent_ids: | ||
| logger.debug(f"Received stream chunk from agent {event.agent_id}") | ||
| self.event_queue.put_nowait(event.chunk) | ||
|
|
||
| @event_bus.on(CrewKickoffCompletedEvent) | ||
| def on_crew_kickoff_complete(source: Any, event: CrewKickoffCompletedEvent): | ||
| """Handle crew completion events.""" | ||
| if self.crew_id and source.fingerprint.uuid_str == self.crew_id: | ||
| logger.info(f"Crew {self.crew_id} completed") | ||
| self.event_queue.put_nowait(self.END_OF_STREAM) | ||
| elif not self.crew_id: | ||
| # If no specific crew_id, end stream on any crew completion | ||
| logger.info("Crew execution completed") | ||
| self.event_queue.put_nowait(self.END_OF_STREAM) | ||
|
|
||
| async def get_tokens(self) -> AsyncGenerator[str, None]: | ||
| """ | ||
| Get streaming tokens as they arrive. | ||
|
|
||
| Yields: | ||
| str: Stream tokens from the target agents | ||
| """ | ||
| self._is_streaming = True | ||
| try: | ||
| while self._is_streaming: | ||
| token = await self.event_queue.get() | ||
|
|
||
| if token == self.END_OF_STREAM: | ||
| logger.info("Stream ended") | ||
| break | ||
|
|
||
| yield token | ||
| self.event_queue.task_done() | ||
| except Exception as e: | ||
| logger.error(f"Error in token streaming: {e}") | ||
| raise | ||
| finally: | ||
| self._is_streaming = False | ||
|
|
||
| def stop_streaming(self): | ||
| """Stop the streaming manually.""" | ||
| self._is_streaming = False | ||
| self.event_queue.put_nowait(self.END_OF_STREAM) | ||
|
|
||
|
|
||
| class CrewStreamer: | ||
| """ | ||
| High-level interface for streaming CrewAI execution. | ||
|
|
||
| This class provides a simple way to stream tokens from CrewAI agents | ||
| without needing to understand the underlying event system. | ||
| """ | ||
|
|
||
| def __init__(self, crew_instance, agent_ids: Optional[List[str]] = None): | ||
| """ | ||
| Initialize the CrewStreamer. | ||
|
|
||
| Args: | ||
| crew_instance: The CrewAI crew instance to stream from | ||
| agent_ids: Optional list of specific agent IDs to monitor. | ||
| If None, will monitor all agents in the crew. | ||
| """ | ||
| self.crew_instance = crew_instance | ||
| self.crew = crew_instance.crew() | ||
|
|
||
| # If no agent_ids specified, get all agent IDs from the crew | ||
| if agent_ids is None: | ||
| self.agent_ids = [str(agent.id) for agent in self.crew.agents] | ||
| else: | ||
| self.agent_ids = [str(agent_id) for agent_id in agent_ids] | ||
|
|
||
| self.crew_id = self.crew.fingerprint.uuid_str | ||
| self.listener = None | ||
|
|
||
| async def stream_execution( | ||
| self, | ||
| inputs: Dict[str, Any], | ||
| sleep_time: float = 0.01, | ||
| wait_for_final_answer: bool = True | ||
| ) -> AsyncGenerator[str, None]: | ||
| """ | ||
| Stream the crew execution with real-time tokens. | ||
|
|
||
| Args: | ||
| inputs: Input data for the crew execution | ||
| sleep_time: Sleep time between token yields (for rate limiting) | ||
| wait_for_final_answer: If True, only start yielding tokens after "Final Answer:" appears | ||
|
|
||
| Yields: | ||
| str: Streaming tokens from the crew execution | ||
|
|
||
| Raises: | ||
| Exception: Any errors during crew execution or streaming | ||
| """ | ||
| try: | ||
| # Create and setup the listener | ||
| self.listener = CrewStreamListener( | ||
| target_agent_ids=self.agent_ids, | ||
| crew_id=self.crew_id | ||
| ) | ||
|
|
||
| # Start the crew execution task | ||
| execution_task = asyncio.create_task( | ||
| self.crew.kickoff_async(inputs=inputs) | ||
| ) | ||
|
|
||
| # Stream tokens | ||
| accumulated_result = "" | ||
| final_answer_reached = not wait_for_final_answer | ||
|
|
||
| async for token in self.listener.get_tokens(): | ||
| accumulated_result += token | ||
|
|
||
| # Check if we should start yielding tokens | ||
| if not final_answer_reached and FINAL_ANSWER_ACTION in accumulated_result: | ||
| final_answer_reached = True | ||
| logger.info("Final Answer section reached, starting token stream") | ||
|
|
||
| if final_answer_reached: | ||
| yield token | ||
| if sleep_time > 0: | ||
| await asyncio.sleep(sleep_time) | ||
|
|
||
| # Ensure the execution task completes | ||
| if not execution_task.done(): | ||
| logger.info("Waiting for crew execution to complete...") | ||
| await execution_task | ||
|
|
||
| except asyncio.CancelledError: | ||
| logger.info("Streaming was cancelled") | ||
| if hasattr(self, 'listener') and self.listener: | ||
| self.listener.stop_streaming() | ||
| raise | ||
| except Exception as e: | ||
| logger.error(f"Error during crew streaming: {e}") | ||
| if hasattr(self, 'listener') and self.listener: | ||
| self.listener.stop_streaming() | ||
| raise | ||
|
|
||
| def stop(self): | ||
| """Stop the streaming manually.""" | ||
| if self.listener: | ||
| self.listener.stop_streaming() | ||
|
|
||
|
|
||
| # Convenience function for backward compatibility and simple usage | ||
| async def stream_crew_execution( | ||
| crew_instance, | ||
| inputs: Dict[str, Any], | ||
| agent_ids: Optional[List[str]] = None, | ||
| sleep_time: float = 0.01, | ||
| wait_for_final_answer: bool = True | ||
| ) -> AsyncGenerator[str, None]: | ||
| """ | ||
| Convenience function to stream crew execution. | ||
|
|
||
| Args: | ||
| crew_instance: The CrewAI crew instance | ||
| inputs: Input data for the crew execution | ||
| agent_ids: Optional list of agent IDs to monitor | ||
| sleep_time: Sleep time between tokens | ||
| wait_for_final_answer: Whether to wait for "Final Answer:" before streaming | ||
|
|
||
| Yields: | ||
| str: Streaming tokens | ||
|
|
||
| Example: | ||
| ```python | ||
| async for token in stream_crew_execution(my_crew, {"input": "Hello"}): | ||
| print(token, end="", flush=True) | ||
| ``` | ||
| """ | ||
| streamer = CrewStreamer(crew_instance, agent_ids) | ||
| async for token in streamer.stream_execution(inputs, sleep_time, wait_for_final_answer): | ||
| yield token | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.