-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: pipeline run bugs in cyclic and acyclic pipelines #8707
base: main
Are you sure you want to change the base?
Conversation
@@ -2821,3 +2821,148 @@ def run(self, replies: List[str]): | |||
) | |||
], | |||
) | |||
|
|||
@given("a pipeline that passes outputs that are consumed in cycle to outside the cycle", target_fixture="pipeline_data") | |||
def passes_outputs_outside_cycle(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tstadel could you check if this test covers the use case that you had with passing the prompt outside the cycle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test fails on main and it yields an empty prompt instead of the expected prompt value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output from the cycle isn't distributed in the old logic because we are filtering for components that are in the cycle here:
haystack/haystack/core/pipeline/pipeline.py
Line 213 in 167ede1
receivers = [item for item in self._find_receivers_from(name) if item[0] in cycle] |
And then we remove any output that was already received by a component in the cycle here:
haystack/haystack/core/pipeline/base.py
Line 889 in 167ede1
to_remove_from_component_result.add(sender_socket.name) |
That means that outputs that are received both by components inside and outside the cycle will only go to components inside the cycle.
@Amnah199 @julian-risch @davidsbatista Updated the PR to the latest state from experimental. Here is what was added in addition to the latest changes:
|
"llm", | ||
"output_validator", | ||
], | ||
expected_component_calls={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mathislucka I was considering the option of replacing these few very long component_calls with <Any>
. I think they are convoluting the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I considered it too at some point but I came to the conclusion that it would weaken the tests.
For the example below, we actually care about the content of these calls because it indicates if prior components ran correctly.
Granted, the example is a bit long and I think we should shorten these examples in future tests so that they test the functionality and have enough resemblance with a real-world use case so that a dev would easily understand what's being tested, but we don't need a full length prompt for every case.
Since these tests already existed before, I wouldn't like to touch the example now though.
I'd vote for keeping the full content to keep the tests strong. For future tests we should make the examples shorter and if we really don't care about the content we can still add ANY
for these cases.
haystack/core/pipeline/utils.py
Outdated
:param priority: | ||
Priority level for the item. Lower numbers indicate higher priority. | ||
""" | ||
count = next(self._counter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although within a different scope, this variable count
has the same name as from itertools import count
maybe a different name like order
- just to be safe
haystack/core/pipeline/base.py
Outdated
@@ -619,76 +637,31 @@ def outputs(self, include_components_with_connected_outputs: bool = False) -> Di | |||
} | |||
return outputs | |||
|
|||
def show(self, server_url: str = "https://mermaid.ink", params: Optional[dict] = None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you sync the PR with the current main
- this should not be happening
@mathislucka can you check your |
I'll check, the PR was merged in between me merging |
83fc53c
to
c3f4c72
Compare
@davidsbatista thanks for catching this. I should have been more careful when copying over the changes from experimental. |
Related Issues
Proposed Changes:
The previous
Pipeline.run
logic had several flaws. Outputs of the pipeline depended on the insertion order of components or connections, cycles weren't processed in the right order, components ran more times than they should, and the output of a pipeline was not deterministic.This PR introduces a new
Pipeline.run
logic that relies almost exclusively on the available data to decide when a component should run.It is loosely based on the idea of Kahn Process Networks (a distributed model of computing).
Core Logic
Internally, the
Pipeline.run
method uses a single priority queue to schedule components to run. Components are popped from the priority queue one at a time until there are no more components that can run. Once there are no more components that can run, the pipeline returns an output. The queue is re-computed when needed because the produced outputs of the components in the pipeline might change priorities. Since pipelines can have cycles, the component can enter the priority queue multiple times.When can a component run?
A component can run once it passes two conditions:
component.run
we do not get an exception because of missing positional or keyword arguments).Triggers
There are three types of triggers that cause a component to run:
Pipeline.run
is calledA trigger is "consumed" by the component, meaning that it can only cause it to run once. For example, for each
Pipeline.run
invocation, the component can only run once because of user input. It could still run again, but it needs to receive a second trigger to do that.A component does not always run immediately when it receives a trigger, it only runs when it has highest priority in the priority queue.
Priorities
At a high level, we differentiate between components that can't run, because they don't fulfil the conditions to run. Components that can run immediately in any order, and components that we want to run later because they might still receive optional or lazy variadic inputs.
Inputs & Outputs
A component can receive inputs and it can produce outputs.
When a component runs, it "consumes" (deletes) its inputs, meaning that these same inputs will not be available in case the component runs another time. Inputs from outside the pipeline are an exception to this rule. They are only consumed when they are passed to a
GreedyVariadic
socket of the component (e.g.BranchJoiner
). Other inputs from outside the pipeline will always be available to a component, no matter how often it runs.After a component ran, its outputs are distributed to the connected input sockets of other components in the pipeline. Outputs that are not connected to any other input sockets in the pipeline, are returned to the user.
Impact on existing pipelines
Non-Cyclic Pipelines
For non-cyclic pipelines, the execution order of components might change. This does not have any impact on the outputs of a pipeline, except in one condition:
If the pipeline has two branches that are joined by a lazy variadic socket (e.g.
DocumentJoiner
), the order of the joined inputs might change. In the existing pipeline logic, the order is determined by the order of adding components to the pipeline and the order of connecting components. This behavior is not documented anywhere and the user can't know in which order these components will be executed without studying the underlying implementation in Haystack and NetworkX. This PR introduces a lexicographical sort for these cases, other possibilities could be discussed. We can potentially provide the users with utility functions to test if the output might change, when we release the changes from this PR.Cyclic Pipelines
Cyclic pipelines are affected by several bugs in the current pipeline logic.
Cyclic pipelines might be affected if they meet any of these conditions:
For these conditions, neither the run order nor the outputs of the pipeline might be deterministic (i.e. the output of the pipeline might change although the code didn't change). Again, it should be possible to provide tooling that helps users understand if their pipelines are affected.
Open Issues
Cycles without a defined entrypoint
For cycles with more than one component that only have default inputs and that receive inputs from outside the pipeline, there is no defined entrypoint (i.e. we can not know which component in the cycle should run first).
Consider this pipeline:
The use case is that one LLM generates code that is checked by another LLM for correctness and then either returned to the first LLM if it has feedback or returned to the user, if the second LLM decides that the code is good enough. The instructions for the "Feedback LLM" and the task for the "Code LLM" are both provided by the user from outside the cycle. Since we have 2
PromptBuilder
components in the cycle (configured so that all inputs are optional) and they both receive an input that triggers them to run at the same time, and they are both waiting for exactly one more input (but can run without it), there is no defined order for which of these components should be executed first.At least from my understanding, this problem can't be solved purely based on the topology of the pipeline graph or on the available data.
In this implementation, the
code_prompt
would run first becausec
comes beforef
in a lexicographical sort. We can document this behavior but it might still surprise our users. Additional measures could be:code_llm
tofeedback_prompt
required to solve the problem)Should cycles run like loops in a programming language?
@tstadel pointed out that components outside of a cycle might receive inputs from the cycle while the cycle is still running. In very few edge cases, this could cause the components outside the cycle to run repeatedly (and in turn triggering other components in the pipeline).
When we think of our pipeline in terms of a distributed model of computing, then this behavior would be expected.
However, if we assume that a cycle in a pipeline works the same way as a loop in a programming language, then the loop should run to completion before we execute any other components.
Consider this pseudo-implementation:
Another option would be to treat the input socket of a component like a FIFO queue, meaning that the outputs would not be overwritten in case components in the cycle provide the output multiple times.
My recommendation would be to follow the distributed model of computing approach and allow a component to run as soon as it receives inputs and is ready to run. The implementation is a lot less complex, especially so, when we introduce concurrency to our pipeline execution. If the user does not want the component outside the cycle to run before the cycle has fully executed, it is easy to achieve that by a different dataflow in the pipeline or by marking edges as required.
How did you test it?
Notes for the reviewer
This is a work in progress.
The change needs to be tested extensively.
My recommendation would be to update the behavioral tests so that we test for the inputs that a component received to run and the pipeline outputs instead of testing for the component execution order.
Using content tracing, we can adapt the existing testing approach by exchanging
expected_run_order
withexpected_component_inputs
. Expected component inputs could be tested like this (pseudo-code):This is better than the current approach because we don't really want to test the execution order of the components, we only care about how often a component runs, if it has the same inputs, and if the pipeline has the same outputs.
Currently, the behavioral tests do not test these two behaviors (as demonstrated here):
For real-world use cases, changes in these two behaviors will have an impact on the output of the pipeline.
Adapting the tests allows us to re-use the test suite when we implement pipelines that run components concurrently.
Checklist
fix:
,feat:
,build:
,chore:
,ci:
,docs:
,style:
,refactor:
,perf:
,test:
and added!
in case the PR includes breaking changes.