Skip to content

Commit 42dc75e

Browse files
mpangrazzisjrl
andauthored
Add support to automatic "hybrid" streaming (#182)
* Allow async_streaming_generator to support also sync-only streaming components assigning proper streaming_callback type * Refactoring ; Remove unneded comments * Using only 'auto' and False values ; refactoring ; add docs * Add extra deps for tests * Improved check for streaming capable components (check the run/run_async method signature rather than components attributes) * Lint * Update tests and docs without HuggingFace components * Update docs * Update docs/concepts/pipeline-wrapper.md Co-authored-by: Sebastian Husch Lee <[email protected]> * Update docs/concepts/pipeline-wrapper.md Co-authored-by: Sebastian Husch Lee <[email protected]> * Update docs/concepts/pipeline-wrapper.md Co-authored-by: Sebastian Husch Lee <[email protected]> * Simplify find_all_streaming_components check * Remove duplicate test * Using allow_sync_streaming_callbacks=True/False; Refactoring + add example --------- Co-authored-by: Sebastian Husch Lee <[email protected]>
1 parent 5f5f5d7 commit 42dc75e

File tree

9 files changed

+657
-30
lines changed

9 files changed

+657
-30
lines changed

docs/concepts/pipeline-wrapper.md

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,146 @@ async def run_chat_completion_async(self, model: str, messages: List[dict], body
176176
)
177177
```
178178

179+
## Hybrid Streaming: Mixing Async and Sync Components
180+
181+
!!! tip "Compatibility for Legacy Components"
182+
When working with legacy pipelines or components that only support sync streaming callbacks (like `OpenAIGenerator`), use `allow_sync_streaming_callbacks=True` to enable hybrid mode. For new code, prefer async-compatible components and use the default strict mode.
183+
184+
Some Haystack components only support synchronous streaming callbacks and don't have async equivalents. Examples include:
185+
186+
- `OpenAIGenerator` - Legacy OpenAI text generation (⚠️ Note: `OpenAIChatGenerator` IS async-compatible)
187+
- Other components without `run_async()` support
188+
189+
### The Problem
190+
191+
By default, `async_streaming_generator` requires all streaming components to support async callbacks:
192+
193+
```python
194+
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
195+
# This will FAIL if pipeline contains OpenAIGenerator
196+
return async_streaming_generator(
197+
pipeline=self.pipeline, # AsyncPipeline with OpenAIGenerator
198+
pipeline_run_args={"prompt": {"query": question}},
199+
)
200+
```
201+
202+
**Error:**
203+
204+
```text
205+
ValueError: Component 'llm' of type 'OpenAIGenerator' seems to not support
206+
async streaming callbacks...
207+
```
208+
209+
### The Solution: Hybrid Streaming Mode
210+
211+
Enable hybrid streaming mode to automatically handle both async and sync components:
212+
213+
```python
214+
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
215+
question = get_last_user_message(messages)
216+
return async_streaming_generator(
217+
pipeline=self.pipeline,
218+
pipeline_run_args={"prompt": {"query": question}},
219+
allow_sync_streaming_callbacks=True # ✅ Auto-detect and enable hybrid mode
220+
)
221+
```
222+
223+
### What `allow_sync_streaming_callbacks=True` Does
224+
225+
When you set `allow_sync_streaming_callbacks=True`, the system enables **intelligent auto-detection**:
226+
227+
1. **Scans Components**: Automatically inspects all streaming components in your pipeline
228+
2. **Detects Capabilities**: Checks if each component has `run_async()` support
229+
3. **Enables Hybrid Mode Only If Needed**:
230+
- ✅ If **all components support async** → Uses pure async mode (no overhead)
231+
- ✅ If **any component is sync-only** → Automatically enables hybrid mode
232+
4. **Bridges Sync to Async**: For sync-only components, wraps their callbacks to work seamlessly with the async event loop
233+
5. **Zero Configuration**: You don't need to know which components are sync/async - it figures it out automatically
234+
235+
!!! success "Smart Behavior"
236+
Setting `allow_sync_streaming_callbacks=True` does NOT force hybrid mode. It only enables it when actually needed. If your pipeline is fully async-capable, you get pure async performance with no overhead!
237+
238+
### Configuration Options
239+
240+
```python
241+
# Option 1: Strict mode (Default - Recommended)
242+
allow_sync_streaming_callbacks=False
243+
# → Raises error if sync-only components found
244+
# → Best for: New code, ensuring proper async components, best performance
245+
246+
# Option 2: Auto-detection (Compatibility mode)
247+
allow_sync_streaming_callbacks=True
248+
# → Automatically detects and enables hybrid mode only when needed
249+
# → Best for: Legacy pipelines, components without async support, gradual migration
250+
```
251+
252+
### Example: Legacy OpenAI Generator with Async Pipeline
253+
254+
```python
255+
from typing import AsyncGenerator
256+
from haystack import AsyncPipeline
257+
from haystack.components.builders import PromptBuilder
258+
from haystack.components.generators import OpenAIGenerator
259+
from haystack.utils import Secret
260+
from hayhooks import BasePipelineWrapper, get_last_user_message, async_streaming_generator
261+
262+
class LegacyOpenAIWrapper(BasePipelineWrapper):
263+
def setup(self) -> None:
264+
# OpenAIGenerator only supports sync streaming (legacy component)
265+
llm = OpenAIGenerator(
266+
api_key=Secret.from_env_var("OPENAI_API_KEY"),
267+
model="gpt-4o-mini"
268+
)
269+
270+
prompt_builder = PromptBuilder(
271+
template="Answer this question: {{question}}"
272+
)
273+
274+
self.pipeline = AsyncPipeline()
275+
self.pipeline.add_component("prompt", prompt_builder)
276+
self.pipeline.add_component("llm", llm)
277+
self.pipeline.connect("prompt.prompt", "llm.prompt")
278+
279+
async def run_chat_completion_async(
280+
self, model: str, messages: list[dict], body: dict
281+
) -> AsyncGenerator:
282+
question = get_last_user_message(messages)
283+
284+
# Enable hybrid mode for OpenAIGenerator
285+
return async_streaming_generator(
286+
pipeline=self.pipeline,
287+
pipeline_run_args={"prompt": {"question": question}},
288+
allow_sync_streaming_callbacks=True # ✅ Handles sync component
289+
)
290+
```
291+
292+
### When to Use Each Mode
293+
294+
**Use strict mode (default) when:**
295+
296+
- Building new pipelines (recommended default)
297+
- You want to ensure all components are **async-compatible**
298+
- Performance is critical (pure async is **~1-2μs faster** per chunk)
299+
- You're building a production system with controlled dependencies
300+
301+
**Use `allow_sync_streaming_callbacks=True` when:**
302+
303+
- Working with legacy pipelines that use `OpenAIGenerator` or other sync-only components
304+
- Deploying YAML pipelines with unknown/legacy component types
305+
- Migrating old code that doesn't have async equivalents yet
306+
- Third-party components without async support
307+
308+
### Performance Considerations
309+
310+
- **Pure async pipeline**: No overhead
311+
- **Hybrid mode (auto-detected)**: Minimal overhead (~1-2 microseconds per streaming chunk for sync components)
312+
- **Network-bound operations**: The overhead is negligible compared to LLM generation time
313+
314+
!!! success "Best Practice"
315+
**For new code**: Use the default strict mode (`allow_sync_streaming_callbacks=False`) to ensure you're using proper async components.
316+
317+
**For legacy/compatibility**: Use `allow_sync_streaming_callbacks=True` when working with older pipelines or components that don't support async streaming yet.
318+
179319
## Streaming from Multiple Components
180320

181321
!!! info "Smart Streaming Behavior"

docs/examples/async-operations.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ curl -X POST http://localhost:1416/v1/chat/completions \
2828

2929
!!! tip "Best Practices"
3030
- Prefer `run_chat_completion_async` for streaming and concurrency
31-
- Ensure components support async streaming callbacks; otherwise use the sync `streaming_generator`
31+
- Use async-compatible components (e.g., `OpenAIChatGenerator`) for best performance
32+
- For legacy pipelines with sync-only components (like `OpenAIGenerator`), use `allow_sync_streaming_callbacks=True` to enable hybrid mode
33+
- See [Hybrid Streaming](../concepts/pipeline-wrapper.md#hybrid-streaming-mixing-async-and-sync-components) for handling legacy components
3234

3335
## Related
3436

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This directory contains various examples demonstrating different use cases and f
88
|---------|-------------|--------------|----------|
99
| [multi_llm_streaming](./pipeline_wrappers/multi_llm_streaming/) | Multiple LLM components with automatic streaming | • Two sequential LLMs<br/>• Automatic multi-component streaming<br/>• No special configuration needed<br/>• Shows default streaming behavior | Demonstrating how hayhooks automatically streams from all components in a pipeline |
1010
| [async_question_answer](./pipeline_wrappers/async_question_answer/) | Async question-answering pipeline with streaming support | • Async pipeline execution<br/>• Streaming responses<br/>• OpenAI Chat Generator<br/>• Both API and chat completion interfaces | Building conversational AI systems that need async processing and real-time streaming responses |
11+
| [async_hybrid_streaming](./pipeline_wrappers/async_hybrid_streaming/) | AsyncPipeline with legacy sync-only components using hybrid mode | • AsyncPipeline with OpenAIGenerator<br/>• `allow_sync_streaming_callbacks=True`<br/>• Automatic sync-to-async bridging<br/>• Migration example | Using legacy components (OpenAIGenerator) in async pipelines, migrating from sync to async gradually, handling third-party sync-only components |
1112
| [chat_with_website](./pipeline_wrappers/chat_with_website/) | Answer questions about website content | • Web content fetching<br/>• HTML to document conversion<br/>• Content-based Q&A<br/>• Configurable URLs | Creating AI assistants that can answer questions about specific websites or web-based documentation |
1213
| [chat_with_website_mcp](./pipeline_wrappers/chat_with_website_mcp/) | MCP-compatible website chat pipeline | • MCP (Model Context Protocol) support<br/>• Website content analysis<br/>• API-only interface<br/>• Simplified deployment | Integrating website analysis capabilities into MCP-compatible AI systems and tools |
1314
| [chat_with_website_streaming](./pipeline_wrappers/chat_with_website_streaming/) | Streaming website chat responses | • Real-time streaming<br/>• Website content processing<br/>• Progressive response generation<br/>• Enhanced user experience | Building responsive web applications that provide real-time AI responses about website content |
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Async Hybrid Streaming Example
2+
3+
This example demonstrates using `allow_sync_streaming_callbacks=True` to enable hybrid streaming mode with AsyncPipeline and legacy sync-only components.
4+
5+
## Overview
6+
7+
This example shows how to use an **AsyncPipeline** with **OpenAIGenerator** (a legacy component that only supports synchronous streaming callbacks) by enabling hybrid mode with `allow_sync_streaming_callbacks=True`.
8+
9+
## The Problem
10+
11+
Some Haystack components like `OpenAIGenerator` only support **synchronous** streaming callbacks and don't have `run_async()` support. When you try to use them with `async_streaming_generator` in an AsyncPipeline, you'll get an error:
12+
13+
```text
14+
ValueError: Component 'llm' of type 'OpenAIGenerator' seems to not support async streaming callbacks
15+
```
16+
17+
## The Solution
18+
19+
Set `allow_sync_streaming_callbacks=True` to enable **hybrid mode**:
20+
21+
```python
22+
async_streaming_generator(
23+
pipeline=self.pipeline,
24+
pipeline_run_args={...},
25+
allow_sync_streaming_callbacks=True # ✅ Enables hybrid mode
26+
)
27+
```
28+
29+
### What Hybrid Mode Does
30+
31+
When `allow_sync_streaming_callbacks=True`, the system automatically detects components with sync-only streaming callbacks (e.g., `OpenAIGenerator`) and enables hybrid mode to bridge them to work in async context. If all components support async, no bridging is applied (pure async mode).
32+
33+
## When to Use This
34+
35+
**Use `allow_sync_streaming_callbacks=True` when:**
36+
37+
- Working with **legacy components** like `OpenAIGenerator` that don't have async equivalents
38+
- Deploying **YAML pipelines** where you don't control which components are used
39+
- **Migrating** from sync to async pipelines gradually
40+
- Using **third-party components** without async support
41+
42+
**For new code, prefer:**
43+
44+
- Using async-compatible components (e.g., `OpenAIChatGenerator` instead of `OpenAIGenerator`)
45+
- Default strict mode (`allow_sync_streaming_callbacks=False`) to ensure proper async components
46+
47+
## Usage
48+
49+
### Deploy with Hayhooks
50+
51+
```bash
52+
# Set your OpenAI API key
53+
export OPENAI_API_KEY=your_api_key_here
54+
55+
# Deploy the pipeline
56+
hayhooks deploy examples/pipeline_wrappers/async_hybrid_streaming
57+
58+
# Test it via OpenAI-compatible API
59+
curl -X POST http://localhost:1416/v1/chat/completions \
60+
-H "Content-Type: application/json" \
61+
-d '{
62+
"model": "async_hybrid_streaming",
63+
"messages": [{"role": "user", "content": "What is machine learning?"}],
64+
"stream": true
65+
}'
66+
```
67+
68+
## Performance
69+
70+
Hybrid mode might have a minimal overhead (~1-2 microseconds per streaming chunk for sync components). This is negligible compared to network latency and LLM generation time.
71+
72+
## Related Documentation
73+
74+
- [Hybrid Streaming Concept](https://deepset-ai.github.io/hayhooks/concepts/pipeline-wrapper/#hybrid-streaming-mixing-async-and-sync-components)
75+
- [Async Operations](https://deepset-ai.github.io/hayhooks/examples/async-operations/)
76+
- [Pipeline Wrapper Guide](https://deepset-ai.github.io/hayhooks/concepts/pipeline-wrapper/)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
components:
2+
llm:
3+
init_parameters:
4+
api_base_url: null
5+
api_key:
6+
env_vars:
7+
- OPENAI_API_KEY
8+
strict: true
9+
type: env_var
10+
generation_kwargs: {}
11+
model: gpt-4o-mini
12+
streaming_callback: null
13+
system_prompt: null
14+
type: haystack.components.generators.openai.OpenAIGenerator
15+
prompt:
16+
init_parameters:
17+
required_variables: "*"
18+
template: |
19+
Answer the following question concisely and accurately:
20+
{{query}}
21+
22+
Answer:
23+
variables: null
24+
type: haystack.components.builders.prompt_builder.PromptBuilder
25+
connection_type_validation: true
26+
connections:
27+
- receiver: llm.prompt
28+
sender: prompt.prompt
29+
max_runs_per_component: 100
30+
metadata: {}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from collections.abc import AsyncGenerator
2+
from pathlib import Path
3+
4+
from haystack import AsyncPipeline
5+
6+
from hayhooks import BasePipelineWrapper, async_streaming_generator, get_last_user_message, log
7+
8+
9+
class PipelineWrapper(BasePipelineWrapper):
10+
def setup(self) -> None:
11+
"""
12+
Setup an AsyncPipeline with a legacy OpenAIGenerator component.
13+
14+
OpenAIGenerator only supports sync streaming callbacks (no run_async() method).
15+
To use it with AsyncPipeline and async_streaming_generator, we need to enable
16+
hybrid mode with allow_sync_streaming_callbacks=True.
17+
"""
18+
pipeline_yaml = (Path(__file__).parent / "hybrid_streaming.yml").read_text()
19+
self.pipeline = AsyncPipeline.loads(pipeline_yaml)
20+
21+
async def run_api_async(self, question: str) -> str:
22+
"""
23+
Simple async API endpoint that returns the final answer.
24+
25+
Args:
26+
question: The user's question
27+
28+
Returns:
29+
The LLM's answer as a string
30+
"""
31+
log.trace(f"Running pipeline with question: {question}")
32+
33+
result = await self.pipeline.run_async({"prompt": {"query": question}})
34+
return result["llm"]["replies"][0]
35+
36+
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
37+
"""
38+
OpenAI-compatible chat completion endpoint with streaming support.
39+
40+
This demonstrates using allow_sync_streaming_callbacks=True to enable hybrid mode,
41+
which allows the sync-only OpenAIGenerator to work with async_streaming_generator.
42+
43+
Args:
44+
model: The model name (ignored in this example)
45+
messages: Chat messages in OpenAI format
46+
body: Additional request parameters
47+
48+
Yields:
49+
Streaming chunks from the pipeline execution
50+
"""
51+
log.trace(f"Running pipeline with model: {model}, messages: {messages}, body: {body}")
52+
53+
question = get_last_user_message(messages)
54+
log.trace(f"Question: {question}")
55+
56+
# ✅ Enable hybrid mode with allow_sync_streaming_callbacks=True
57+
# This is required because OpenAIGenerator (legacy component) only supports
58+
# sync streaming callbacks. The hybrid mode automatically detects this and
59+
# bridges the sync callback to work with the async event loop.
60+
#
61+
# If all components supported async, this would use pure async mode with no overhead.
62+
return async_streaming_generator(
63+
pipeline=self.pipeline,
64+
pipeline_run_args={"prompt": {"query": question}},
65+
allow_sync_streaming_callbacks=True,
66+
)

0 commit comments

Comments
 (0)