Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AsyncPipeline that can schedule components to run concurrently #8812

Merged
merged 99 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
9c34772
add component checks
mathislucka Jan 11, 2025
fc2f2da
pipeline should run deterministically
mathislucka Jan 11, 2025
c28353d
add FIFOQueue
mathislucka Jan 11, 2025
64f4afc
add agent tests
mathislucka Jan 11, 2025
966552e
add order dependent tests
mathislucka Jan 11, 2025
3d0e948
run new tests
mathislucka Jan 11, 2025
7664254
remove code that is not needed
mathislucka Jan 11, 2025
932718f
test: intermediate from cycle outputs are available outside cycle
mathislucka Jan 13, 2025
acce8cd
add tests for component checks (Claude)
mathislucka Jan 13, 2025
21c78b8
adapt tests for component checks (o1 review)
mathislucka Jan 13, 2025
cf23b32
chore: format
mathislucka Jan 13, 2025
54d4f2c
remove tests that aren't needed anymore
mathislucka Jan 13, 2025
05ed852
add _calculate_priority tests
mathislucka Jan 13, 2025
93f7a9d
revert accidental change in pyproject.toml
mathislucka Jan 13, 2025
bbba1b2
test format conversion
mathislucka Jan 13, 2025
c00f8c5
adapt to naming convention
mathislucka Jan 13, 2025
235aa47
chore: proper docstrings and type hints for PQ
mathislucka Jan 13, 2025
99ea5d5
format
mathislucka Jan 13, 2025
7577d9b
add more unit tests
mathislucka Jan 13, 2025
25c64cd
rm unneeded comments
mathislucka Jan 13, 2025
5a1bbd8
test input consumption
mathislucka Jan 13, 2025
6820fc3
lint
mathislucka Jan 13, 2025
6a9cdcc
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 13, 2025
d624e57
fix: docstrings
mathislucka Jan 13, 2025
e1912f8
lint
mathislucka Jan 13, 2025
acc17b1
format
mathislucka Jan 13, 2025
207eaba
format
mathislucka Jan 13, 2025
15611fc
fix license header
mathislucka Jan 13, 2025
87010dd
fix license header
mathislucka Jan 13, 2025
7027572
add component run tests
mathislucka Jan 13, 2025
02c82b8
fix: pass correct input format to tracing
mathislucka Jan 14, 2025
7e81ff9
fix types
mathislucka Jan 14, 2025
b5db015
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 14, 2025
d2bee24
format
mathislucka Jan 14, 2025
ad16403
format
mathislucka Jan 14, 2025
2984fcb
types
mathislucka Jan 14, 2025
64a0125
add defaults from Socket instead of signature
mathislucka Jan 15, 2025
b2b8adc
fix test names
mathislucka Jan 15, 2025
6b25825
still wait for optional inputs on greedy variadic sockets
mathislucka Jan 15, 2025
2566cb8
fix format
mathislucka Jan 15, 2025
ef8e754
wip: warn for ambiguous running order
mathislucka Jan 16, 2025
0252450
wip: alternative warning
mathislucka Jan 16, 2025
580e4bd
fix license header
mathislucka Jan 16, 2025
fd32f77
Merge branch 'main' of https://github.com/deepset-ai/haystack into fi…
Amnah199 Jan 17, 2025
9426bbb
make code more readable
mathislucka Jan 20, 2025
ad1b0aa
Introduce content tracing to a behavioral test
Amnah199 Jan 20, 2025
554a6ef
Merge branch 'fix/pipeline_run' of https://github.com/deepset-ai/hays…
Amnah199 Jan 20, 2025
a38bccb
Fixing linting
Amnah199 Jan 20, 2025
0ac82b2
Remove debug print statements
Amnah199 Jan 20, 2025
339d340
Fix tracer tests
Amnah199 Jan 22, 2025
9441320
Merge branch 'refs/heads/main' into fix/pipeline_run
mathislucka Feb 4, 2025
ae295f7
remove print
mathislucka Feb 4, 2025
592492e
test: test for component inputs
mathislucka Feb 4, 2025
cfdc3dc
test: remove testing for run order
mathislucka Feb 4, 2025
25a4b40
chore: update component checks from experimental
mathislucka Feb 4, 2025
74657cd
chore: update pipeline and base from experimental
mathislucka Feb 4, 2025
3b8d886
refactor: remove unused method
mathislucka Feb 4, 2025
00ed49d
refactor: remove unused method
mathislucka Feb 4, 2025
f1d325a
refactor: outdated comment
mathislucka Feb 4, 2025
711c846
refactor: inputs state is updated as side effect
mathislucka Feb 4, 2025
fa2b65c
format
mathislucka Feb 4, 2025
8b7d761
test: add file conversion test
mathislucka Feb 4, 2025
dc6acb9
format
mathislucka Feb 4, 2025
ba81a24
fix: original implementation deepcopies outputs
mathislucka Feb 4, 2025
c905881
lint
mathislucka Feb 4, 2025
24e41e9
Merge branch 'main' into fix/pipeline_run
mathislucka Feb 4, 2025
e045e90
fix: from_dict was updated
mathislucka Feb 4, 2025
ed7c700
fix: format
mathislucka Feb 4, 2025
558050f
fix: test
mathislucka Feb 4, 2025
09965ba
test: add test for thread safety
mathislucka Feb 4, 2025
a3eb8da
remove unused imports
mathislucka Feb 4, 2025
4d9845a
format
mathislucka Feb 4, 2025
2d11923
test: FIFOPriorityQueue
mathislucka Feb 4, 2025
d6673fe
chore: add release note
mathislucka Feb 4, 2025
a48d802
feat: add AsyncPipeline
mathislucka Feb 4, 2025
fe1e1f1
chore: Add release notes
mathislucka Feb 4, 2025
99e8da3
fix: format
mathislucka Feb 4, 2025
267bbb9
debug: switch run order to debug ubuntu and windows tests
mathislucka Feb 5, 2025
2695163
fix: consider priorities of other components while waiting for DEFER
mathislucka Feb 5, 2025
1ab79cb
refactor: simplify code
mathislucka Feb 5, 2025
0f393ae
Merge branch 'main' into fix/pipeline_run
mathislucka Feb 5, 2025
c3f4c72
fix: resolve merge conflict with mermaid changes
mathislucka Feb 5, 2025
e0d2712
fix: format
mathislucka Feb 5, 2025
3ded49f
fix: remove unused import
mathislucka Feb 5, 2025
4564c3a
refactor: rename to avoid accidental conflicts
mathislucka Feb 5, 2025
401d5c0
Merge branch 'fix/pipeline_run' into feat/async_pipeline
mathislucka Feb 5, 2025
7a5be08
Merge branch 'main' into feat/async_pipeline
mathislucka Feb 6, 2025
e9f4353
fix: track pipeline type
mathislucka Feb 6, 2025
e9fbf7b
fix: and extend test
mathislucka Feb 6, 2025
56d2853
fix: format
mathislucka Feb 6, 2025
f2bde4f
style: sort alphabetically
mathislucka Feb 7, 2025
ff57a5a
Merge branch 'main' into feat/async_pipeline
davidsbatista Feb 7, 2025
06cfad9
Update test/core/pipeline/features/conftest.py
mathislucka Feb 7, 2025
54f33eb
Update test/core/pipeline/features/conftest.py
mathislucka Feb 7, 2025
2d7c33d
Update releasenotes/notes/feat-async-pipeline-338856a142e1318c.yaml
Amnah199 Feb 7, 2025
f56186c
fix: indentation, do not close loop
mathislucka Feb 7, 2025
57b1cac
fix: use asyncio.run
mathislucka Feb 7, 2025
47f419c
fix: format
mathislucka Feb 7, 2025
a326a7b
Merge branch 'main' into feat/async_pipeline
mathislucka Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions haystack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import haystack.tracing
from haystack.core.component import component
from haystack.core.errors import ComponentError, DeserializationError
from haystack.core.pipeline import Pipeline, PredefinedPipeline
from haystack.core.pipeline import AsyncPipeline, Pipeline, PredefinedPipeline
from haystack.core.serialization import default_from_dict, default_to_dict
from haystack.dataclasses import Answer, Document, ExtractedAnswer, GeneratedAnswer

Expand All @@ -18,15 +18,16 @@
haystack.tracing.auto_enable_tracing()

__all__ = [
"component",
"default_from_dict",
"default_to_dict",
"DeserializationError",
"Answer",
"AsyncPipeline",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) suggestion: keeping these imports ordered alphabetically helps locate something as the list grows

__all__ = [
    "Answer",
    "AsyncPipeline",
    "ComponentError",
    "DeserializationError",
    "Document",
    "ExtractedAnswer",
    "GeneratedAnswer",
    "Pipeline",
    "PredefinedPipeline",
    "component",
    "default_from_dict",
    "default_to_dict",
]

"ComponentError",
"Pipeline",
"PredefinedPipeline",
"DeserializationError",
"Document",
"Answer",
"GeneratedAnswer",
"ExtractedAnswer",
"GeneratedAnswer",
"Pipeline",
"PredefinedPipeline",
"component",
"default_from_dict",
"default_to_dict",
]
3 changes: 2 additions & 1 deletion haystack/core/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#
# SPDX-License-Identifier: Apache-2.0

from .async_pipeline import AsyncPipeline
from .pipeline import Pipeline
from .template import PredefinedPipeline

__all__ = ["Pipeline", "PredefinedPipeline"]
__all__ = ["AsyncPipeline", "Pipeline", "PredefinedPipeline"]
535 changes: 535 additions & 0 deletions haystack/core/pipeline/async_pipeline.py

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions haystack/telemetry/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import posthog
import yaml
Expand All @@ -18,7 +18,7 @@
from haystack.telemetry._environment import collect_system_specs

if TYPE_CHECKING:
from haystack.core.pipeline import Pipeline
from haystack.core.pipeline import AsyncPipeline, Pipeline


HAYSTACK_TELEMETRY_ENABLED = "HAYSTACK_TELEMETRY_ENABLED"
Expand Down Expand Up @@ -135,7 +135,7 @@ def send_telemetry_wrapper(*args, **kwargs):


@send_telemetry
def pipeline_running(pipeline: "Pipeline") -> Optional[Tuple[str, Dict[str, Any]]]:
def pipeline_running(pipeline: Union["Pipeline", "AsyncPipeline"]) -> Optional[Tuple[str, Dict[str, Any]]]:
"""
Collects telemetry data for a pipeline run and sends it to Posthog.

Expand Down Expand Up @@ -170,6 +170,7 @@ def pipeline_running(pipeline: "Pipeline") -> Optional[Tuple[str, Dict[str, Any]
# Data sent to Posthog
return "Pipeline run (2.x)", {
"pipeline_id": str(id(pipeline)),
"pipeline_type": generate_qualified_class_name(type(pipeline)),
"runs": pipeline._telemetry_runs,
"components": components,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
highlights: >
We are introducing the `AsyncPipeline`: Supports running pipelines asynchronously. Schedules components concurrently
whenever possible. Leads to major speed improvements for any pipelines that may run workloads in parallel.
features:
- |
Added a new `AsyncPipeline` implementation that allows pipelines to be executed from async code,
supporting concurrent scheduling of pipeline components for faster processing.
62 changes: 60 additions & 2 deletions test/core/pipeline/features/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@
from typing import Tuple, List, Dict, Any, Set, Union
from pathlib import Path
import re
import pytest
import asyncio

from pytest_bdd import when, then, parsers

from haystack import Pipeline
from haystack import Pipeline, AsyncPipeline

PIPELINE_NAME_REGEX = re.compile(r"\[(.*)\]")


@pytest.fixture(params=[AsyncPipeline, Pipeline])
def pipeline_class(request):
"""
A parametrized fixture that will yield AsyncPipeline for one test run
and Pipeline for the next test run.
"""
return request.param


@dataclass
class PipelineRunData:
"""
Expand All @@ -34,6 +45,54 @@ class _PipelineResult:

@when("I run the Pipeline", target_fixture="pipeline_result")
def run_pipeline(
pipeline_data: Tuple[Union[AsyncPipeline, Pipeline], List[PipelineRunData]], spying_tracer
) -> Union[List[Tuple[_PipelineResult, PipelineRunData]], Exception]:
if isinstance(pipeline_data[0], AsyncPipeline):
return run_async_pipeline(pipeline_data, spying_tracer)
else:
return run_sync_pipeline(pipeline_data, spying_tracer)


def run_async_pipeline(
pipeline_data: Tuple[Union[AsyncPipeline], List[PipelineRunData]], spying_tracer
) -> Union[List[Tuple[_PipelineResult, PipelineRunData]], Exception]:
"""
Attempts to run a pipeline with the given inputs.
`pipeline_data` is a tuple that must contain:
* A Pipeline instance
* The data to run the pipeline with

If successful returns a tuple of the run outputs and the expected outputs.
In case an exceptions is raised returns that.
"""
pipeline, pipeline_run_data = pipeline_data[0], pipeline_data[1]

results: List[_PipelineResult] = []

async def run_inner(data, include_outputs_from):
mathislucka marked this conversation as resolved.
Show resolved Hide resolved
"""Wrapper function to call pipeline.run_async method with required params."""
return await pipeline.run_async(data=data.inputs, include_outputs_from=include_outputs_from)

for data in pipeline_run_data:
try:
outputs = asyncio.run(run_inner(data, data.include_outputs_from))

component_calls = {
(span.tags["haystack.component.name"], span.tags["haystack.component.visits"]): span.tags[
"haystack.component.input"
]
for span in spying_tracer.spans
if "haystack.component.name" in span.tags and "haystack.component.visits" in span.tags
}
results.append(_PipelineResult(outputs=outputs, component_calls=component_calls))
spying_tracer.spans.clear()
except Exception as e:
return e

return [e for e in zip(results, pipeline_run_data)]


def run_sync_pipeline(
pipeline_data: Tuple[Pipeline, List[PipelineRunData]], spying_tracer
) -> Union[List[Tuple[_PipelineResult, PipelineRunData]], Exception]:
"""
Expand Down Expand Up @@ -61,7 +120,6 @@ def run_pipeline(
if "haystack.component.name" in span.tags and "haystack.component.visits" in span.tags
}
results.append(_PipelineResult(outputs=outputs, component_calls=component_calls))

spying_tracer.spans.clear()
except Exception as e:
return e
Expand Down
4 changes: 2 additions & 2 deletions test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Feature: Pipeline running
Scenario Outline: Running a correct Pipeline
Given a pipeline <kind>
When I run the Pipeline
Then it should return the expected result
And components are called with the expected inputs
Then components are called with the expected inputs
And it should return the expected result

Examples:
| kind |
Expand Down
Loading