Skip to content

Conversation

Ol1ver0413
Copy link
Collaborator

@Ol1ver0413 Ol1ver0413 commented Sep 18, 2025

Description

Add pipeline running mode in workforce. #1663

Checklist

Go over all the following points, and put an x in all the boxes that apply.

  • I have read the CONTRIBUTION guide (required)
  • I have linked this PR to an issue using the Development section on the right sidebar or by adding Fixes #issue-number in the PR description (required)
  • I have checked if any dependencies need to be added or updated in pyproject.toml and uv lock
  • I have updated the tests accordingly (required for a bug fix or a new feature)
  • I have updated the documentation if needed:
  • I have added examples if this is a new feature

If you are unsure about any of these, don't hesitate to ask. We are here to help!

Copy link
Contributor

coderabbitai bot commented Sep 18, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch workforce_pipeline

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@Ol1ver0413 Ol1ver0413 self-assigned this Sep 18, 2025
@Ol1ver0413
Copy link
Collaborator Author

graph TD
    A[Task: Generate 5 AI/ML Papers] --> B[Literature Researcher]
    B --> C{Fork: 5 Parallel Tasks}
    
    C --> D1[Summary Specialist 1<br/>Summarize Paper 1]
    C --> D2[Summary Specialist 2<br/>Summarize Paper 2]
    C --> D3[Summary Specialist 3<br/>Summarize Paper 3]
    C --> D4[Summary Specialist 4<br/>Summarize Paper 4]
    C --> D5[Summary Specialist 5<br/>Summarize Paper 5]
    
    D1 --> E{Join: Collect Summaries}
    D2 --> E
    D3 --> E
    D4 --> E
    D5 --> E
    
    E --> F[Research Synthesizer<br/>Analyze AI/ML Trends]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style D4 fill:#f3e5f5
    style D5 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Sep 21, 2025

graph TD
    A[Task: Generate 4 RESTful API Types] --> B[API Researcher]
    B --> C{Fork: 4 Parallel Tasks}
    
    C --> D1[API Analyst 1<br/>Analyze API 1]
    C --> D2[API Analyst 2<br/>Analyze API 2]
    C --> D3[API Analyst 3<br/>Analyze API 3]
    C --> D4[API Analyst 4<br/>Analyze API 4]
    
    D1 --> E{Join: Collect Analyses}
    D2 --> E
    D3 --> E
    D4 --> E
    
    E --> F[Documentation Writer<br/>Generate API Usage Guide]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style D4 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413
Copy link
Collaborator Author

Ol1ver0413 commented Sep 21, 2025

graph TD
    A[Task: Code Review Analysis] --> B[Code Scanner]
    B --> C{Fork: 3 Parallel Tasks}
    
    C --> D1[Code Reviewer 1<br/>Review File 1]
    C --> D2[Code Reviewer 2<br/>Review File 2]
    C --> D3[Code Reviewer 3<br/>Review File 3]
    
    D1 --> E{Join: Collect Reviews}
    D2 --> E
    D3 --> E
    
    E --> F[Review Summarizer<br/>Generate Comprehensive Report]
    F --> G[Final Result]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style D1 fill:#f3e5f5
    style D2 fill:#f3e5f5
    style D3 fill:#f3e5f5
    style F fill:#e8f5e8
    style G fill:#fff9c4
Loading

@Ol1ver0413 Ol1ver0413 marked this pull request as ready for review September 21, 2025 15:01
@Ol1ver0413
Copy link
Collaborator Author

Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode.

@Wendong-Fan
Copy link
Member

Hey @Wendong-Fan @fengju0213 ! I've already add pipeline mode in workforce, and maybe we can try more cases to test this mode.

thanks @Ol1ver0413 !

@Wendong-Fan Wendong-Fan added the Review Required PR need to be reviewed label Sep 29, 2025
@Wendong-Fan Wendong-Fan added this to the Sprint 38 milestone Sep 29, 2025
@Wendong-Fan Wendong-Fan linked an issue Sep 29, 2025 that may be closed by this pull request
2 tasks
Copy link
Collaborator

@hesamsheikh hesamsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comprehensive PR @Ol1ver0413. I added a comment. Also I think the method namings could be a bit more consistent and remembrable.

  # before
  workforce.add_pipeline_task()
  workforce.fork_pipeline()
  workforce.add_parallel_pipeline_tasks()
  builder.fork()  
  builder.add_parallel_tasks() 

  # after
  workforce.pipeline_add()
  workforce.pipeline_fork()
  workforce.pipeline_join()
  workforce.pipeline_build()
  builder.add()
  builder.fork()
  builder.join()
  builder.build()

Comment on lines +1018 to +1040
"""Simulates a database connection."""
def __init__(self, host, port, database):
self.host = host
self.port = port
self.database = database
self.connection = None
def connect(self):
"""Simulate establishing a database connection."""
self.connection = f"connected to {self.host}:{self.port}/{self.database}"
return True
def disconnect(self):
"""Simulate closing a database connection."""
self.connection = None
def execute_query(self, query):
"""Simulate executing a database query."""
if not self.connection:
raise ConnectionError("Not connected to database")
return f"Executed: {query}"
```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the docstrings are rendered out of the comment

Comment on lines +1533 to +1540
expected_task_ids = {task.id for task in self._pending_tasks}
expected_task_ids.update(task.id for task in self._completed_tasks)

completed_successful_ids = {
task.id for task in self._completed_tasks
if task.state == TaskState.DONE
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this create a situation if which if any of the branched tasks fails, the whole pipeline is marked as failed and the join task never executes?

Thanks @hesamsheikh ! Maybe I need to add some branch task failure handling mechanisms into the pipeline.

Comment on lines +702 to +707

# Clear existing tasks and dependencies
self._pending_tasks.clear()
self._task_dependencies.clear()
self._assignees.clear()

Copy link
Collaborator

@a7m-1st a7m-1st Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ol1ver0413 , thanks for the PR, but as of now, Is the pipeline designed that it could interrupt running tasks or is it only with predefined tasks only? i.e. before workforce.start()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Review Required PR need to be reviewed
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

[Feature Request] Support more running mode in workforce
4 participants