Skip to content

Commit

Permalink
Merge branch 'fix/pipeline_run' into feat/async_pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
mathislucka committed Feb 5, 2025
2 parents 1ab79cb + 4564c3a commit 401d5c0
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 35 deletions.
74 changes: 60 additions & 14 deletions haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, TextIO, Tuple, Type, TypeVar, Union

from networkx import MultiDiGraph # type:ignore
import networkx # type:ignore

from haystack import logging
from haystack.core.component import Component, InputSocket, OutputSocket, component
Expand All @@ -33,18 +33,19 @@
is_any_greedy_socket_ready,
is_socket_lazy_variadic,
)
from haystack.core.pipeline.descriptions import find_pipeline_inputs, find_pipeline_outputs
from haystack.core.pipeline.draw import _to_mermaid_image
from haystack.core.pipeline.template import PipelineTemplate, PredefinedPipeline
from haystack.core.pipeline.utils import FIFOPriorityQueue, parse_connect_string
from haystack.core.serialization import DeserializationCallbacks, component_from_dict, component_to_dict
from haystack.core.type_utils import _type_name, _types_are_compatible
from haystack.marshal import Marshaller, YamlMarshaller
from haystack.utils import is_in_jupyter, type_serialization

from .descriptions import find_pipeline_inputs, find_pipeline_outputs
from .draw import _to_mermaid_image
from .template import PipelineTemplate, PredefinedPipeline

DEFAULT_MARSHALLER = YamlMarshaller()

# We use a generic type to annotate the return value of classmethods,
# We use a generic type to annotate the return value of class methods,
# so that static analyzers won't be confused when derived classes
# use those methods.
T = TypeVar("T", bound="PipelineBase")
Expand Down Expand Up @@ -82,7 +83,7 @@ def __init__(self, metadata: Optional[Dict[str, Any]] = None, max_runs_per_compo
self._telemetry_runs = 0
self._last_telemetry_sent: Optional[datetime] = None
self.metadata = metadata or {}
self.graph = MultiDiGraph()
self.graph = networkx.MultiDiGraph()
self._max_runs_per_component = max_runs_per_component

def __eq__(self, other) -> bool:
Expand Down Expand Up @@ -637,31 +638,76 @@ def outputs(self, include_components_with_connected_outputs: bool = False) -> Di
}
return outputs

def show(self) -> None:
def show(self, server_url: str = "https://mermaid.ink", params: Optional[dict] = None) -> None:
"""
If running in a Jupyter notebook, display an image representing this `Pipeline`.
Display an image representing this `Pipeline` in a Jupyter notebook.
This function generates a diagram of the `Pipeline` using a Mermaid server and displays it directly in
the notebook.
:param server_url:
The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
info on how to set up your own Mermaid server.
:param params:
Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
Supported keys:
- format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
- type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
- theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
- bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
- width: Width of the output image (integer).
- height: Height of the output image (integer).
- scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
- fit: Whether to fit the diagram size to the page (PDF only, boolean).
- paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
- landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
:raises PipelineDrawingError:
If the function is called outside of a Jupyter notebook or if there is an issue with rendering.
"""
if is_in_jupyter():
from IPython.display import Image, display # type: ignore

image_data = _to_mermaid_image(self.graph)

image_data = _to_mermaid_image(self.graph, server_url=server_url, params=params)
display(Image(image_data))
else:
msg = "This method is only supported in Jupyter notebooks. Use Pipeline.draw() to save an image locally."
raise PipelineDrawingError(msg)

def draw(self, path: Path) -> None:
def draw(self, path: Path, server_url: str = "https://mermaid.ink", params: Optional[dict] = None) -> None:
"""
Save an image representing this `Pipeline` to `path`.
Save an image representing this `Pipeline` to the specified file path.
This function generates a diagram of the `Pipeline` using the Mermaid server and saves it to the provided path.
:param path:
The path to save the image to.
The file path where the generated image will be saved.
:param server_url:
The base URL of the Mermaid server used for rendering (default: 'https://mermaid.ink').
See https://github.com/jihchi/mermaid.ink and https://github.com/mermaid-js/mermaid-live-editor for more
info on how to set up your own Mermaid server.
:param params:
Dictionary of customization parameters to modify the output. Refer to Mermaid documentation for more details
Supported keys:
- format: Output format ('img', 'svg', or 'pdf'). Default: 'img'.
- type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'.
- theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'.
- bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white').
- width: Width of the output image (integer).
- height: Height of the output image (integer).
- scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified.
- fit: Whether to fit the diagram size to the page (PDF only, boolean).
- paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true.
- landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
:raises PipelineDrawingError:
If there is an issue with rendering or saving the image.
"""
# Before drawing we edit a bit the graph, to avoid modifying the original that is
# used for running the pipeline we copy it.
image_data = _to_mermaid_image(self.graph)
image_data = _to_mermaid_image(self.graph, server_url=server_url, params=params)
Path(path).write_bytes(image_data)

def walk(self) -> Iterator[Tuple[str, Component]]:
Expand Down
4 changes: 2 additions & 2 deletions haystack/core/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def push(self, item: Any, priority: int) -> None:
:param priority:
Priority level for the item. Lower numbers indicate higher priority.
"""
count = next(self._counter)
entry = (priority, count, item)
next_count = next(self._counter)
entry = (priority, next_count, item)
heapq.heappush(self._queue, entry)

def pop(self) -> Tuple[int, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ def test_run_unit(self, hf_pipeline_mock):
assert result["documents"][1].to_dict()["classification"]["label"] == "negative"

@pytest.mark.integration
def test_run(self):
def test_run(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
component = TransformersZeroShotDocumentClassifier(
model="cross-encoder/nli-deberta-v3-xsmall", labels=["positive", "negative"]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ def test_run_wrong_input_format(self):
embedder.run(text=list_integers_input)

@pytest.mark.integration
def test_run_trunc(self):
def test_run_trunc(self, monkeypatch):
"""
sentence-transformers/paraphrase-albert-small-v2 maps sentences & paragraphs to a 768 dimensional dense vector space
"""
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
checkpoint = "sentence-transformers/paraphrase-albert-small-v2"
text = "a nice text to embed"

Expand Down
15 changes: 10 additions & 5 deletions test/components/evaluators/test_sas_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def test_run_not_warmed_up(self):
evaluator.run(ground_truth_answers=ground_truths, predicted_answers=predictions)

@pytest.mark.integration
def test_run_with_matching_predictions(self):
def test_run_with_matching_predictions(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
evaluator = SASEvaluator()
ground_truths = [
"A construction budget of US $2.3 billion",
Expand All @@ -124,7 +125,8 @@ def test_run_with_matching_predictions(self):
assert result["individual_scores"] == pytest.approx([1.0, 1.0, 1.0])

@pytest.mark.integration
def test_run_with_single_prediction(self):
def test_run_with_single_prediction(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
evaluator = SASEvaluator()

ground_truths = ["US $2.3 billion"]
Expand All @@ -137,7 +139,8 @@ def test_run_with_single_prediction(self):
assert result["individual_scores"] == pytest.approx([0.689089], abs=1e-5)

@pytest.mark.integration
def test_run_with_mismatched_predictions(self):
def test_run_with_mismatched_predictions(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
evaluator = SASEvaluator()
ground_truths = [
"US $2.3 billion",
Expand All @@ -156,7 +159,8 @@ def test_run_with_mismatched_predictions(self):
assert result["individual_scores"] == pytest.approx([0.689089, 0.870389, 0.908679], abs=1e-5)

@pytest.mark.integration
def test_run_with_bi_encoder_model(self):
def test_run_with_bi_encoder_model(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
evaluator = SASEvaluator(model="sentence-transformers/all-mpnet-base-v2")
ground_truths = [
"A construction budget of US $2.3 billion",
Expand All @@ -175,7 +179,8 @@ def test_run_with_bi_encoder_model(self):
assert result["individual_scores"] == pytest.approx([1.0, 1.0, 1.0])

@pytest.mark.integration
def test_run_with_cross_encoder_model(self):
def test_run_with_cross_encoder_model(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
evaluator = SASEvaluator(model="cross-encoder/ms-marco-MiniLM-L-6-v2")
ground_truths = [
"A construction budget of US $2.3 billion",
Expand Down
3 changes: 2 additions & 1 deletion test/components/generators/chat/test_hugging_face_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ def test_messages_conversion_is_called(self, mock_convert, model_info_mock):

@pytest.mark.integration
@pytest.mark.flaky(reruns=3, reruns_delay=10)
def test_live_run(self):
def test_live_run(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
messages = [ChatMessage.from_user("Please create a summary about the following topic: Climate change")]

llm = HuggingFaceLocalChatGenerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,9 @@ def test_stop_words_criteria_using_hf_tokenizer(self):
assert criteria(generated_text_ids, scores=None) is True

@pytest.mark.integration
def test_hf_pipeline_runs_with_our_criteria(self):
def test_hf_pipeline_runs_with_our_criteria(self, monkeypatch):
"""Test that creating our own StopWordsCriteria and passing it to a Huggingface pipeline works."""
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
generator = HuggingFaceLocalGenerator(
model="google/flan-t5-small", task="text2text-generation", stop_words=["unambiguously"]
)
Expand All @@ -466,7 +467,8 @@ def test_hf_pipeline_runs_with_our_criteria(self):

@pytest.mark.integration
@pytest.mark.flaky(reruns=3, reruns_delay=10)
def test_live_run(self):
def test_live_run(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
llm = HuggingFaceLocalGenerator(model="Qwen/Qwen2.5-0.5B-Instruct", generation_kwargs={"max_new_tokens": 50})
llm.warm_up()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,11 @@ def test_pipeline_serialise_deserialise(self):

@pytest.mark.integration
@pytest.mark.parametrize("similarity", ["dot_product", "cosine"])
def test_run(self, similarity):
def test_run(self, similarity, monkeypatch):
"""
Tests that run method returns documents in the correct order
"""
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
ranker = SentenceTransformersDiversityRanker(
model="sentence-transformers/all-MiniLM-L6-v2", similarity=similarity
)
Expand All @@ -601,7 +602,8 @@ def test_run(self, similarity):

@pytest.mark.integration
@pytest.mark.parametrize("similarity", ["dot_product", "cosine"])
def test_run_real_world_use_case(self, similarity):
def test_run_real_world_use_case(self, similarity, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
ranker = SentenceTransformersDiversityRanker(
model="sentence-transformers/all-MiniLM-L6-v2", similarity=similarity
)
Expand Down Expand Up @@ -673,7 +675,8 @@ def test_run_real_world_use_case(self, similarity):

@pytest.mark.integration
@pytest.mark.parametrize("similarity", ["dot_product", "cosine"])
def test_run_with_maximum_margin_relevance_strategy(self, similarity):
def test_run_with_maximum_margin_relevance_strategy(self, similarity, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
query = "renewable energy sources"
docs = [
Document(content="18th-century French literature"),
Expand Down
9 changes: 6 additions & 3 deletions test/components/readers/test_extractive.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ def test_deduplicate_by_overlap(


@pytest.mark.integration
def test_t5():
def test_t5(monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
reader = ExtractiveReader("sjrhuschlee/flan-t5-base-squad2")
reader.warm_up()
answers = reader.run(example_queries[0], example_documents[0], top_k=2)[
Expand All @@ -800,7 +801,8 @@ def test_t5():


@pytest.mark.integration
def test_roberta():
def test_roberta(monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
reader = ExtractiveReader("deepset/tinyroberta-squad2")
reader.warm_up()
answers = reader.run(example_queries[0], example_documents[0], top_k=2)[
Expand Down Expand Up @@ -829,7 +831,8 @@ def test_roberta():


@pytest.mark.integration
def test_matches_hf_pipeline():
def test_matches_hf_pipeline(monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
reader = ExtractiveReader(
"deepset/tinyroberta-squad2", device=ComponentDevice.from_str("cpu"), overlap_threshold=None
)
Expand Down
6 changes: 4 additions & 2 deletions test/components/routers/test_transformers_text_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ def test_run_unit(self, hf_pipeline_mock, mock_auto_config_from_pretrained):
assert out == {"en": "What is the color of the sky?"}

@pytest.mark.integration
def test_run(self):
def test_run(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
router = TransformersTextRouter(model="papluca/xlm-roberta-base-language-detection")
router.warm_up()
out = router.run("What is the color of the sky?")
Expand Down Expand Up @@ -202,7 +203,8 @@ def test_run(self):
assert out == {"en": "What is the color of the sky?"}

@pytest.mark.integration
def test_wrong_labels(self):
def test_wrong_labels(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
router = TransformersTextRouter(model="papluca/xlm-roberta-base-language-detection", labels=["en", "de"])
with pytest.raises(ValueError):
router.warm_up()
3 changes: 2 additions & 1 deletion test/components/routers/test_zero_shot_text_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def test_run_unit(self, hf_pipeline_mock):
assert out == {"query": "What is the color of the sky?"}

@pytest.mark.integration
def test_run(self):
def test_run(self, monkeypatch):
monkeypatch.delenv("HF_API_TOKEN", raising=False) # https://github.com/deepset-ai/haystack/issues/8811
router = TransformersZeroShotTextRouter(labels=["query", "passage"])
router.warm_up()
out = router.run("What is the color of the sky?")
Expand Down

0 comments on commit 401d5c0

Please sign in to comment.