-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[Feat] Support more running mode in workforce #3157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
graph TD
A[Task: Generate 5 AI/ML Papers] --> B[Literature Researcher]
B --> C{Fork: 5 Parallel Tasks}
C --> D1[Summary Specialist 1<br/>Summarize Paper 1]
C --> D2[Summary Specialist 2<br/>Summarize Paper 2]
C --> D3[Summary Specialist 3<br/>Summarize Paper 3]
C --> D4[Summary Specialist 4<br/>Summarize Paper 4]
C --> D5[Summary Specialist 5<br/>Summarize Paper 5]
D1 --> E{Join: Collect Summaries}
D2 --> E
D3 --> E
D4 --> E
D5 --> E
E --> F[Research Synthesizer<br/>Analyze AI/ML Trends]
F --> G[Final Result]
style A fill:#e1f5fe
style B fill:#fff3e0
style D1 fill:#f3e5f5
style D2 fill:#f3e5f5
style D3 fill:#f3e5f5
style D4 fill:#f3e5f5
style D5 fill:#f3e5f5
style F fill:#e8f5e8
style G fill:#fff9c4
|
graph TD
A[Task: Generate 4 RESTful API Types] --> B[API Researcher]
B --> C{Fork: 4 Parallel Tasks}
C --> D1[API Analyst 1<br/>Analyze API 1]
C --> D2[API Analyst 2<br/>Analyze API 2]
C --> D3[API Analyst 3<br/>Analyze API 3]
C --> D4[API Analyst 4<br/>Analyze API 4]
D1 --> E{Join: Collect Analyses}
D2 --> E
D3 --> E
D4 --> E
E --> F[Documentation Writer<br/>Generate API Usage Guide]
F --> G[Final Result]
style A fill:#e1f5fe
style B fill:#fff3e0
style D1 fill:#f3e5f5
style D2 fill:#f3e5f5
style D3 fill:#f3e5f5
style D4 fill:#f3e5f5
style F fill:#e8f5e8
style G fill:#fff9c4
|
graph TD
A[Task: Code Review Analysis] --> B[Code Scanner]
B --> C{Fork: 3 Parallel Tasks}
C --> D1[Code Reviewer 1<br/>Review File 1]
C --> D2[Code Reviewer 2<br/>Review File 2]
C --> D3[Code Reviewer 3<br/>Review File 3]
D1 --> E{Join: Collect Reviews}
D2 --> E
D3 --> E
E --> F[Review Summarizer<br/>Generate Comprehensive Report]
F --> G[Final Result]
style A fill:#e1f5fe
style B fill:#fff3e0
style D1 fill:#f3e5f5
style D2 fill:#f3e5f5
style D3 fill:#f3e5f5
style F fill:#e8f5e8
style G fill:#fff9c4
|
Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode. |
thanks @Ol1ver0413 ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comprehensive PR @Ol1ver0413. I added a comment. Also I think the method namings could be a bit more consistent and remembrable.
# before
workforce.add_pipeline_task()
workforce.fork_pipeline()
workforce.add_parallel_pipeline_tasks()
builder.fork()
builder.add_parallel_tasks()
# after
workforce.pipeline_add()
workforce.pipeline_fork()
workforce.pipeline_join()
workforce.pipeline_build()
builder.add()
builder.fork()
builder.join()
builder.build()
"""Simulates a database connection.""" | ||
def __init__(self, host, port, database): | ||
self.host = host | ||
self.port = port | ||
self.database = database | ||
self.connection = None | ||
def connect(self): | ||
"""Simulate establishing a database connection.""" | ||
self.connection = f"connected to {self.host}:{self.port}/{self.database}" | ||
return True | ||
def disconnect(self): | ||
"""Simulate closing a database connection.""" | ||
self.connection = None | ||
def execute_query(self, query): | ||
"""Simulate executing a database query.""" | ||
if not self.connection: | ||
raise ConnectionError("Not connected to database") | ||
return f"Executed: {query}" | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the docstrings are rendered out of the comment
expected_task_ids = {task.id for task in self._pending_tasks} | ||
expected_task_ids.update(task.id for task in self._completed_tasks) | ||
|
||
completed_successful_ids = { | ||
task.id for task in self._completed_tasks | ||
if task.state == TaskState.DONE | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?
Thanks @hesamsheikh ! Maybe I need to add some branch task failure handling mechanisms into the pipeline.
|
||
# Clear existing tasks and dependencies | ||
self._pending_tasks.clear() | ||
self._task_dependencies.clear() | ||
self._assignees.clear() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()
Description
Add pipeline running mode in workforce. #1663
Checklist
Go over all the following points, and put an
x
in all the boxes that apply.Fixes #issue-number
in the PR description (required)pyproject.toml
anduv lock
If you are unsure about any of these, don't hesitate to ask. We are here to help!