Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pipeline run bugs in cyclic and acyclic pipelines #8707

Open
wants to merge 80 commits into
base: main
Choose a base branch
from

Conversation

mathislucka
Copy link
Member

@mathislucka mathislucka commented Jan 11, 2025

Related Issues

Proposed Changes:

The previous Pipeline.run logic had several flaws. Outputs of the pipeline depended on the insertion order of components or connections, cycles weren't processed in the right order, components ran more times than they should, and the output of a pipeline was not deterministic.

This PR introduces a new Pipeline.run logic that relies almost exclusively on the available data to decide when a component should run.

It is loosely based on the idea of Kahn Process Networks (a distributed model of computing).

Core Logic
Internally, the Pipeline.run method uses a single priority queue to schedule components to run. Components are popped from the priority queue one at a time until there are no more components that can run. Once there are no more components that can run, the pipeline returns an output. The queue is re-computed when needed because the produced outputs of the components in the pipeline might change priorities. Since pipelines can have cycles, the component can enter the priority queue multiple times.

When can a component run?
A component can run once it passes two conditions:

  1. All mandatory inputs are available (i.e. if we call component.run we do not get an exception because of missing positional or keyword arguments).
  2. The component was triggered to run.

Triggers
There are three types of triggers that cause a component to run:

  1. The component receives input from outside the pipeline (e.g. user input)
  2. The component receives input from another component in the pipeline
  3. The component does not have incoming connections to any other component in the pipeline and Pipeline.run is called

A trigger is "consumed" by the component, meaning that it can only cause it to run once. For example, for each Pipeline.run invocation, the component can only run once because of user input. It could still run again, but it needs to receive a second trigger to do that.

A component does not always run immediately when it receives a trigger, it only runs when it has highest priority in the priority queue.

Priorities
At a high level, we differentiate between components that can't run, because they don't fulfil the conditions to run. Components that can run immediately in any order, and components that we want to run later because they might still receive optional or lazy variadic inputs.

Inputs & Outputs
A component can receive inputs and it can produce outputs.

When a component runs, it "consumes" (deletes) its inputs, meaning that these same inputs will not be available in case the component runs another time. Inputs from outside the pipeline are an exception to this rule. They are only consumed when they are passed to a GreedyVariadic socket of the component (e.g. BranchJoiner). Other inputs from outside the pipeline will always be available to a component, no matter how often it runs.

After a component ran, its outputs are distributed to the connected input sockets of other components in the pipeline. Outputs that are not connected to any other input sockets in the pipeline, are returned to the user.

Impact on existing pipelines

Non-Cyclic Pipelines
For non-cyclic pipelines, the execution order of components might change. This does not have any impact on the outputs of a pipeline, except in one condition:

If the pipeline has two branches that are joined by a lazy variadic socket (e.g. DocumentJoiner), the order of the joined inputs might change. In the existing pipeline logic, the order is determined by the order of adding components to the pipeline and the order of connecting components. This behavior is not documented anywhere and the user can't know in which order these components will be executed without studying the underlying implementation in Haystack and NetworkX. This PR introduces a lexicographical sort for these cases, other possibilities could be discussed. We can potentially provide the users with utility functions to test if the output might change, when we release the changes from this PR.

Cyclic Pipelines
Cyclic pipelines are affected by several bugs in the current pipeline logic.
Cyclic pipelines might be affected if they meet any of these conditions:

  • pipelines with more than one optional or greedy variadic edge in the cycle (e.g. PromptBuilder, BranchJoiner)
  • pipelines with two cycles that share an optional or greedy variadic edge
  • ... (more?)

For these conditions, neither the run order nor the outputs of the pipeline might be deterministic (i.e. the output of the pipeline might change although the code didn't change). Again, it should be possible to provide tooling that helps users understand if their pipelines are affected.

Open Issues

Cycles without a defined entrypoint
For cycles with more than one component that only have default inputs and that receive inputs from outside the pipeline, there is no defined entrypoint (i.e. we can not know which component in the cycle should run first).

Consider this pipeline:
image

The use case is that one LLM generates code that is checked by another LLM for correctness and then either returned to the first LLM if it has feedback or returned to the user, if the second LLM decides that the code is good enough. The instructions for the "Feedback LLM" and the task for the "Code LLM" are both provided by the user from outside the cycle. Since we have 2 PromptBuilder components in the cycle (configured so that all inputs are optional) and they both receive an input that triggers them to run at the same time, and they are both waiting for exactly one more input (but can run without it), there is no defined order for which of these components should be executed first.

At least from my understanding, this problem can't be solved purely based on the topology of the pipeline graph or on the available data.

In this implementation, the code_prompt would run first because c comes before f in a lexicographical sort. We can document this behavior but it might still surprise our users. Additional measures could be:

  • test for this condition and log a warning
  • test for this condition and raise an exception (the user could make the edge from code_llm to feedback_prompt required to solve the problem)
  • give users another way to specify the running order for components that could run at the same time when the running order could affect the outputs of the pipeline (might be complex)

Should cycles run like loops in a programming language?

@tstadel pointed out that components outside of a cycle might receive inputs from the cycle while the cycle is still running. In very few edge cases, this could cause the components outside the cycle to run repeatedly (and in turn triggering other components in the pipeline).

When we think of our pipeline in terms of a distributed model of computing, then this behavior would be expected.

However, if we assume that a cycle in a pipeline works the same way as a loop in a programming language, then the loop should run to completion before we execute any other components.

Consider this pseudo-implementation:

outside_output = None
for component in cycle:
  if component.output_receiver == 'outside_output':
    outside_output = component.output

# continue with the last value that was set for outside_output

Another option would be to treat the input socket of a component like a FIFO queue, meaning that the outputs would not be overwritten in case components in the cycle provide the output multiple times.

outside_output = []
for component in cycle:
  if component.output_receiver == 'outside_output':
    outside_output.append(component.output)

# receiving component will run as many times as outputs were appended to outside_output

My recommendation would be to follow the distributed model of computing approach and allow a component to run as soon as it receives inputs and is ready to run. The implementation is a lot less complex, especially so, when we introduce concurrency to our pipeline execution. If the user does not want the component outside the cycle to run before the cycle has fully executed, it is easy to achieve that by a different dataflow in the pipeline or by marking edges as required.

How did you test it?

  • behavioral tests
  • some tests that are currently failing are expected to fail, because we are testing the running order of the components, which is not actually the behavior that we want to test

Notes for the reviewer

This is a work in progress.

The change needs to be tested extensively.

My recommendation would be to update the behavioral tests so that we test for the inputs that a component received to run and the pipeline outputs instead of testing for the component execution order.

Using content tracing, we can adapt the existing testing approach by exchanging expected_run_order with expected_component_inputs. Expected component inputs could be tested like this (pseudo-code):

expected_component_inputs = {('<component_name>', '<visits>'): {...inputs}}

for key, inputs in actual_inputs.items():
   assert inputs == expected_component_inputs.get(key)

This is better than the current approach because we don't really want to test the execution order of the components, we only care about how often a component runs, if it has the same inputs, and if the pipeline has the same outputs.

Currently, the behavioral tests do not test these two behaviors (as demonstrated here):

  • how often a component runs
  • if it has the same inputs

For real-world use cases, changes in these two behaviors will have an impact on the output of the pipeline.

Adapting the tests allows us to re-use the test suite when we implement pipelines that run components concurrently.

Checklist

  • I have read the contributors guidelines and the code of conduct
  • I have updated the related issue with new insights and changes
  • I added unit tests and updated the docstrings
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test: and added ! in case the PR includes breaking changes.
  • I documented my code
  • I ran pre-commit hooks and fixed any issue

@@ -2821,3 +2821,148 @@ def run(self, replies: List[str]):
)
],
)

@given("a pipeline that passes outputs that are consumed in cycle to outside the cycle", target_fixture="pipeline_data")
def passes_outputs_outside_cycle():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tstadel could you check if this test covers the use case that you had with passing the prompt outside the cycle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails on main and it yields an empty prompt instead of the expected prompt value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output from the cycle isn't distributed in the old logic because we are filtering for components that are in the cycle here:

receivers = [item for item in self._find_receivers_from(name) if item[0] in cycle]

And then we remove any output that was already received by a component in the cycle here:

to_remove_from_component_result.add(sender_socket.name)

That means that outputs that are received both by components inside and outside the cycle will only go to components inside the cycle.

@mathislucka
Copy link
Member Author

mathislucka commented Feb 4, 2025

@Amnah199 @julian-risch @davidsbatista

Updated the PR to the latest state from experimental.
It also incorporates some of the changes that are needed to prepare for the AsyncPipeline (already merged to experimental).
I'll raise the AsyncPipeline-change as a separate PR.

Here is what was added in addition to the latest changes:

  • added a test for thread-safe execution
  • added a test for the file conversion case that we discussed
  • added missing unit tests for the FIFOPriorityQueue
  • completed the work from @Amnah199 on testing for component inputs instead of run order
  • added a release note

@mathislucka mathislucka marked this pull request as ready for review February 4, 2025 16:23
@mathislucka mathislucka requested review from a team as code owners February 4, 2025 16:23
@mathislucka mathislucka requested review from dfokina and davidsbatista and removed request for a team February 4, 2025 16:23
"llm",
"output_validator",
],
expected_component_calls={
Copy link
Contributor

@Amnah199 Amnah199 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mathislucka I was considering the option of replacing these few very long component_calls with <Any>. I think they are convoluting the file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I considered it too at some point but I came to the conclusion that it would weaken the tests.

For the example below, we actually care about the content of these calls because it indicates if prior components ran correctly.

Granted, the example is a bit long and I think we should shorten these examples in future tests so that they test the functionality and have enough resemblance with a real-world use case so that a dev would easily understand what's being tested, but we don't need a full length prompt for every case.

Since these tests already existed before, I wouldn't like to touch the example now though.

I'd vote for keeping the full content to keep the tests strong. For future tests we should make the examples shorter and if we really don't care about the content we can still add ANY for these cases.

:param priority:
Priority level for the item. Lower numbers indicate higher priority.
"""
count = next(self._counter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although within a different scope, this variable count has the same name as from itertools import count
maybe a different name like order - just to be safe

@@ -619,76 +637,31 @@ def outputs(self, include_components_with_connected_outputs: bool = False) -> Di
}
return outputs

def show(self, server_url: str = "https://mermaid.ink", params: Optional[dict] = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you sync the PR with the current main - this should not be happening

@davidsbatista
Copy link
Contributor

@mathislucka can you check your base.py and the new work done with mermaid ink that was recently merged? your PR if merged would delete it.

@mathislucka
Copy link
Member Author

@mathislucka can you check your base.py and the new work done with mermaid ink that was recently merged? your PR if merged would delete it.

I'll check, the PR was merged in between me merging main to this branch and changing base. I'll check all commits since the last merge and re-apply the changes.

@mathislucka
Copy link
Member Author

@davidsbatista thanks for catching this. I should have been more careful when copying over the changes from experimental.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cycle detection removes same edge multiple times
5 participants