Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support both callback and generator-based streaming in all Chat Generators #8742

Open
vblagoje opened this issue Jan 17, 2025 · 4 comments
Open

Comments

@vblagoje
Copy link
Member

Motivation

Currently, all chat generator components (OpenAI, Anthropic, etc.) support streaming only through callbacks:

def callback(chunk: StreamingChunk):
    print(chunk.content)

generator = OpenAIChatGenerator()
result = generator.run(messages, streaming_callback=callback)

This works well for simple use cases (notebooks etc), but becomes problematic when:

  1. Integrating with async frameworks like FastAPI that expect a generator for streaming responses:

    @app.get("/stream")
    async def stream_endpoint():
        return StreamingResponse(generator_function())  # Needs a generator
  2. Building pipelines where downstream components expect to consume a stream of tokens:

    pipeline.connect("chat", "stream_processor.stream")  # Currently not possible
  3. Implementing custom streaming logic that doesn't fit the callback pattern:

    # Currently not possible:
    for chunk in result.stream:
        # Custom processing
        await process_chunk(chunk)

Proposed Solution

Add a second output socket stream to all chat generator components:

@component.output_types(
    replies=List[ChatMessage],
    stream=Generator[ChatCompletionChunk, None, None]
)

This allows components to support both streaming patterns:

  1. Callback-based (existing behavior):

    result = generator.run(messages, streaming_callback=callback)
    # result = {"replies": [...], "stream": None}
  2. Generator-based (new behavior):

    result = generator.run(messages)
    # result = {"replies": [], "stream": <generator>}
    for chunk in result["stream"]:
        print(chunk.content)

Implementation Details

Two possible approaches:

Option 1: Use Component Socket Detection

Add helper method to detect if stream socket is connected:

def has_output_receivers(self, instance, socket_name: str) -> bool:
    if not hasattr(instance, "__haystack_output__"):
        return False
    socket = instance.__haystack_output__.get(socket_name)
    return bool(socket.receivers) if socket else False

Components would enable streaming based on either callback or socket connection:

stream_has_receivers = component.has_output_receivers(self, "stream")
is_streaming = streaming_callback is not None or stream_has_receivers

Pros:

  • Clean integration with pipeline system
  • No API changes needed
  • Automatic streaming when socket connected

Cons:

  • Less explicit control over streaming mode
  • Less explicit and mysterious

Option 2: Use Sentinel Value

Add sentinel to signal generator-based streaming:

class GENERATE_STREAM:
    """Sentinel object used to signal generator-based streaming"""
    pass

GENERATE = GENERATE_STREAM()

# Usage:
result = generator.run(messages, streaming_callback=GENERATE)

Pros:

  • Explicit control over streaming mode
  • Clear API intention

Cons:

  • More complex API

Questions to Resolve

  1. Should we support both streaming modes simultaneously?
  2. How should we handle errors in streaming mode?
  3. Should we standardize chunk format across different LLM providers which would be nice, no?
@vblagoje
Copy link
Member Author

This article by @aryaminus covers the workarounds and mental gymnastics users have to go through to enable this functionality.

@mpangrazzi
Copy link
Contributor

mpangrazzi commented Jan 20, 2025

@vblagoje yeah I saw it. In general I agree with proposed approach, my points:

Should we support both streaming modes simultaneously?

I think we need first to investigate which is the primary use case for streaming_callback (apart from the article above). We may decide to support both modes initially, then gently deprecate one.

How should we handle errors in streaming mode?

On SSE use case (ie open-webui), network timeout errors (the most common ones) should be handled on the client side. If one instead simply consume a generator, error should be handled while consuming it with classic try / except block.

Should we standardize chunk format across different LLM providers which would be nice, no?

I agree on this!

@aryaminus
Copy link

aryaminus commented Jan 20, 2025

@vblagoje felt like dropping my thoughts:

  1. Should we support both streaming modes simultaneously?
    w/o overcomplicating, generator-based might have better inclination as it allows adding flavors, unless we want something simple and pragmatic and thus callback-based.

  2. How should we handle errors in streaming mode?
    propagate errors through the generator for generator-based / pass errors to the callback or log them explicitly for callback-based.

  3. Should we standardize chunk format across LLM providers?
    not by default.


my leniency is towards Option 1 (Component Socket Detection) for seamless integration but suggest adding an optional parameter (e.g., streaming_mode="callback" | "generator") for explicit control.

@vblagoje
Copy link
Member Author

vblagoje commented Jan 27, 2025

Thanks @aryaminus - very good suggestion on streaming_mode="callback" | "generator") for explicit control. We are internally talking how to proceed on this. More feedback from the community is always appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants