Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions docs/concepts/pipeline-wrapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The pipeline wrapper provides a flexible foundation for deploying Haystack pipel

```python
from pathlib import Path
from typing import List, Generator, Union, AsyncGenerator
from typing import Generator, Union, AsyncGenerator
from haystack import Pipeline, AsyncPipeline
from hayhooks import BasePipelineWrapper, get_last_user_message, streaming_generator, async_streaming_generator

Expand All @@ -23,7 +23,7 @@ class PipelineWrapper(BasePipelineWrapper):
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
self.pipeline = Pipeline.loads(pipeline_yaml)

def run_api(self, urls: List[str], question: str) -> str:
def run_api(self, urls: list[str], question: str) -> str:
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
```
Expand Down Expand Up @@ -108,7 +108,7 @@ def setup(self) -> None:
The `run_api()` method is called for each API request to the `{pipeline_name}/run` endpoint.

```python
def run_api(self, urls: List[str], question: str) -> str:
def run_api(self, urls: list[str], question: str) -> str:
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
```
Expand All @@ -123,9 +123,9 @@ def run_api(self, urls: List[str], question: str) -> str:
**Input argument rules:**

- Arguments must be JSON-serializable
- Use proper type hints (`List[str]`, `Optional[int]`, etc.)
- Use proper type hints (`list[str]`, `Optional[int]`, etc.)
- Default values are supported
- Complex types like `Dict[str, Any]` are allowed
- Complex types like `dict[str, Any]` are allowed

## Optional Methods

Expand All @@ -134,7 +134,7 @@ def run_api(self, urls: List[str], question: str) -> str:
The asynchronous version of `run_api()` for better performance under high load.

```python
async def run_api_async(self, urls: List[str], question: str) -> str:
async def run_api_async(self, urls: list[str], question: str) -> str:
result = await self.pipeline.run_async({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
```
Expand All @@ -151,7 +151,7 @@ async def run_api_async(self, urls: List[str], question: str) -> str:
Enable OpenAI-compatible chat endpoints for integration with chat interfaces.

```python
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Union[str, Generator]:
question = get_last_user_message(messages)
result = self.pipeline.run({"prompt": {"query": question}})
return result["llm"]["replies"][0]
Expand All @@ -168,7 +168,7 @@ def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> U
Async version of chat completion with streaming support.

```python
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)
return async_streaming_generator(
pipeline=self.pipeline,
Expand All @@ -191,7 +191,7 @@ Some Haystack components only support synchronous streaming callbacks and don't
By default, `async_streaming_generator` requires all streaming components to support async callbacks:

```python
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
# This will FAIL if pipeline contains OpenAIGenerator
return async_streaming_generator(
pipeline=self.pipeline, # AsyncPipeline with OpenAIGenerator
Expand Down Expand Up @@ -369,7 +369,7 @@ class MultiLLMWrapper(BasePipelineWrapper):
self.pipeline.connect("llm_1.replies", "prompt_2.previous_response")
self.pipeline.connect("prompt_2.prompt", "llm_2.messages")

def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Generator:
question = get_last_user_message(messages)

# By default, only llm_2 (the last streaming component) will stream
Expand All @@ -386,7 +386,7 @@ class MultiLLMWrapper(BasePipelineWrapper):
For advanced use cases where you want to see outputs from multiple components, use the `streaming_components` parameter:

```python
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Generator:
question = get_last_user_message(messages)

# Enable streaming for BOTH LLMs
Expand Down Expand Up @@ -508,9 +508,9 @@ Hayhooks can handle file uploads by adding a `files` parameter:

```python
from fastapi import UploadFile
from typing import Optional, List
from typing import Optional

def run_api(self, files: Optional[List[UploadFile]] = None, query: str = "") -> str:
def run_api(self, files: Optional[list[UploadFile]] = None, query: str = "") -> str:
if files:
# Process uploaded files
filenames = [f.filename for f in files if f.filename]
Expand Down Expand Up @@ -555,7 +555,7 @@ Your pipeline wrapper may require additional dependencies:
# pipeline_wrapper.py
import trafilatura # Additional dependency

def run_api(self, urls: List[str], question: str) -> str:
def run_api(self, urls: list[str], question: str) -> str:
# Use additional library
content = trafilatura.fetch(urls[0])
# ... rest of pipeline logic
Expand Down Expand Up @@ -623,7 +623,7 @@ class PipelineWrapper(BasePipelineWrapper):
Use docstrings to provide MCP tool descriptions:

```python
def run_api(self, urls: List[str], question: str) -> str:
def run_api(self, urls: list[str], question: str) -> str:
"""
Ask questions about website content.

Expand Down
6 changes: 3 additions & 3 deletions docs/concepts/yaml-pipeline-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,17 @@ with open("pipeline.yml", "w") as f:
```python
# For OpenAI compatibility
class PipelineWrapper(BasePipelineWrapper):
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Union[str, Generator]:
...

# For file uploads
class PipelineWrapper(BasePipelineWrapper):
def run_api(self, files: Optional[List[UploadFile]] = None, query: str = "") -> str:
def run_api(self, files: Optional[list[UploadFile]] = None, query: str = "") -> str:
...

# For streaming
class PipelineWrapper(BasePipelineWrapper):
def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
...
```

Expand Down
8 changes: 4 additions & 4 deletions docs/features/file-upload-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ To accept file uploads in your pipeline, add a `files` parameter to your `run_ap

```python
from fastapi import UploadFile
from typing import Optional, List
from typing import Optional

class PipelineWrapper(BasePipelineWrapper):
def run_api(self, files: Optional[List[UploadFile]] = None, query: str = "") -> str:
def run_api(self, files: Optional[list[UploadFile]] = None, query: str = "") -> str:
if not files:
return "No files provided"

Expand Down Expand Up @@ -105,12 +105,12 @@ You can handle both files and parameters in the same request by adding them as a

```python
from fastapi import UploadFile
from typing import Optional, List
from typing import Optional

class PipelineWrapper(BasePipelineWrapper):
def run_api(
self,
files: Optional[List[UploadFile]] = None,
files: Optional[list[UploadFile]] = None,
query: str = "",
additional_param: str = "default"
) -> str:
Expand Down
7 changes: 3 additions & 4 deletions docs/features/mcp-support.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ For each deployed pipeline, Hayhooks will:

```python
from pathlib import Path
from typing import List
from haystack import Pipeline
from hayhooks import BasePipelineWrapper

Expand All @@ -145,7 +144,7 @@ class PipelineWrapper(BasePipelineWrapper):
pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text()
self.pipeline = Pipeline.loads(pipeline_yaml)

def run_api(self, urls: List[str], question: str) -> str:
def run_api(self, urls: list[str], question: str) -> str:
#
# NOTE: The following docstring will be used as MCP Tool description
#
Expand Down Expand Up @@ -245,7 +244,7 @@ Configure Claude Desktop to connect to Hayhooks MCP Server:
Use docstrings to provide better tool descriptions:

```python
def run_api(self, urls: List[str], question: str) -> str:
def run_api(self, urls: list[str], question: str) -> str:
"""
Ask questions about website content using AI.

Expand All @@ -270,7 +269,7 @@ Hayhooks automatically validates inputs based on your method signature:
```python
def run_api(
self,
urls: List[str], # Required: List of URLs
urls: list[str], # Required: List of URLs
question: str, # Required: User question
max_tokens: int = 1000 # Optional: Max tokens
) -> str:
Expand Down
21 changes: 11 additions & 10 deletions docs/features/openai-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Hayhooks can automatically generate OpenAI-compatible endpoints if you implement
### Basic Chat Completion

```python
from typing import List, Union, Generator
from pathlib import Path
from typing import Union, Generator
from haystack import Pipeline
from hayhooks import get_last_user_message, BasePipelineWrapper, log

Expand All @@ -32,7 +33,7 @@ class PipelineWrapper(BasePipelineWrapper):
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
self.pipeline = Pipeline.loads(pipeline_yaml)

def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Union[str, Generator]:
log.trace(f"Running pipeline with model: {model}, messages: {messages}, body: {body}")

question = get_last_user_message(messages)
Expand All @@ -55,7 +56,7 @@ class PipelineWrapper(BasePipelineWrapper):
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
self.pipeline = AsyncPipeline.loads(pipeline_yaml)

async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
log.trace(f"Running pipeline with model: {model}, messages: {messages}, body: {body}")

question = get_last_user_message(messages)
Expand All @@ -73,7 +74,7 @@ class PipelineWrapper(BasePipelineWrapper):
### run_chat_completion(...)

```python
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Union[str, Generator]:
"""
Run the pipeline for OpenAI-compatible chat completion.

Expand All @@ -91,7 +92,7 @@ def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> U
### run_chat_completion_async(...)

```python
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> Union[str, AsyncGenerator]:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> Union[str, AsyncGenerator]:
"""
Async version of run_chat_completion.

Expand Down Expand Up @@ -143,7 +144,7 @@ curl http://localhost:1416/v1/models
```python
from hayhooks import streaming_generator

def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Generator:
question = get_last_user_message(messages)

return streaming_generator(
Expand All @@ -157,7 +158,7 @@ def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> G
```python
from hayhooks import async_streaming_generator

async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)

return async_streaming_generator(
Expand Down Expand Up @@ -217,7 +218,7 @@ class SyncChatWrapper(BasePipelineWrapper):
self.pipeline.add_component("llm", llm)
self.pipeline.connect("chat_prompt_builder.prompt", "llm.messages")

def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> str:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> str:
question = get_last_user_message(messages)
result = self.pipeline.run({"chat_prompt_builder": {"query": question}})
return result["llm"]["replies"][0].content
Expand All @@ -241,7 +242,7 @@ class AsyncStreamingWrapper(BasePipelineWrapper):
self.pipeline.add_component("llm", llm)
self.pipeline.connect("chat_prompt_builder.prompt", "llm.messages")

async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)
return async_streaming_generator(
pipeline=self.pipeline,
Expand All @@ -254,7 +255,7 @@ class AsyncStreamingWrapper(BasePipelineWrapper):
The OpenAI-compatible endpoints support standard parameters from the `body` argument:

```python
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> str:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> str:
# Access additional parameters
temperature = body.get("temperature", 0.7)
max_tokens = body.get("max_tokens", 150)
Expand Down
10 changes: 5 additions & 5 deletions docs/features/openwebui-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ Hayhooks supports sending events to Open WebUI for enhanced user experience:
### Event Implementation

```python
from typing import AsyncGenerator, List
from typing import AsyncGenerator
from hayhooks import async_streaming_generator, get_last_user_message, BasePipelineWrapper
from hayhooks.open_webui import create_status_event, create_message_event, OpenWebUIEvent

class PipelineWrapper(BasePipelineWrapper):
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator[str | OpenWebUIEvent, None]:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator[str | OpenWebUIEvent, None]:
# Indicate loading
yield create_status_event("Processing your request...", done=False)

Expand Down Expand Up @@ -158,7 +158,7 @@ def on_tool_call_end(tool_name: str, arguments: dict, result: dict, error: bool)


class PipelineWrapper(BasePipelineWrapper):
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Generator:
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"messages": messages},
Expand Down Expand Up @@ -250,7 +250,7 @@ Here's a video example of how to deploy a Haystack pipeline from the `open-webui
Here's a complete example for a website chat pipeline:

```python
from typing import AsyncGenerator, List
from typing import AsyncGenerator
from haystack import Pipeline
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
Expand Down Expand Up @@ -282,7 +282,7 @@ class PipelineWrapper(BasePipelineWrapper):
self.pipeline.connect("converter.documents", "chat_prompt_builder.documents")
self.pipeline.connect("chat_prompt_builder.prompt", "llm.messages")

async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)

# Extract URLs from messages or use defaults
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Generator
from typing import Any, List, Union # noqa: UP035
from typing import Any, Union

from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
Expand Down Expand Up @@ -92,7 +92,7 @@ def run_api(self, query: str) -> dict[str, Any]:
)
return {"reply": result["llm_2"]["replies"][0].text if result["llm_2"]["replies"] else ""}

def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]: # noqa: ARG002, UP006
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Union[str, Generator]: # noqa: ARG002
"""
Run the pipeline in streaming mode.

Expand Down
4 changes: 2 additions & 2 deletions src/hayhooks/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ def default_on_tool_call_end(

Args:
tool_name (str): The name of the tool that was called.
arguments (Dict[str, Any]): The arguments that were passed to the tool.
arguments (dict[str, Any]): The arguments that were passed to the tool.
result (str): The result or response from the tool execution.
error (bool): Whether the tool call resulted in an error.

Returns:
List[Union[OpenWebUIEvent, str]]: A list of events to be processed by Open WebUI.
list[Union[OpenWebUIEvent, str]]: A list of events to be processed by Open WebUI.
For successful calls, returns a status event and a details tag with the tool's arguments and response.
For failed calls, returns a hidden status event and an error notification.
The list can contain both OpenWebUIEvent and str objects.
Expand Down
2 changes: 1 addition & 1 deletion src/hayhooks/server/routers/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

SAMPLE_PIPELINE_FILES = {
"pipeline_wrapper.py": (
"from typing import Dict, Any\n\ndef process(data: Dict[str, Any]) -> Dict[str, Any]:\n "
"from typing import Any\n\ndef process(data: dict[str, Any]) -> dict[str, Any]:\n "
":# Your processing logic here\n return data"
),
"requirements.txt": "pandas==1.3.5\nnumpy==1.21.0",
Expand Down