From f036fccf02db471616a48a054d058796971c4679 Mon Sep 17 00:00:00 2001 From: William Bakst Date: Fri, 21 Jun 2024 10:10:51 -0700 Subject: [PATCH] refactor: decorator got messy, so I pulled everything into individual files --- mirascope/core/base/__init__.py | 10 +- mirascope/core/base/_utils.py | 4 + mirascope/core/base/call_response.py | 2 +- mirascope/core/base/call_response_chunk.py | 2 +- mirascope/core/base/function_return.py | 2 +- mirascope/core/base/{prompts.py => prompt.py} | 0 mirascope/core/base/stream.py | 61 +++ .../core/base/{streams.py => stream_async.py} | 52 +- ...ctured_streams.py => structured_stream.py} | 25 +- .../core/base/structured_stream_async.py | 35 ++ mirascope/core/base/{tools.py => tool.py} | 0 mirascope/core/openai/__init__.py | 17 +- mirascope/core/openai/_call.py | 51 ++ mirascope/core/openai/_call_async.py | 54 ++ mirascope/core/openai/_extract.py | 54 ++ mirascope/core/openai/_extract_async.py | 54 ++ mirascope/core/openai/_stream.py | 107 ++++ mirascope/core/openai/_stream_async.py | 120 ++++ mirascope/core/openai/_structured_stream.py | 76 +++ .../core/openai/_structured_stream_async.py | 92 ++++ mirascope/core/openai/_utils.py | 67 ++- mirascope/core/openai/call.py | 144 +++++ mirascope/core/openai/call_async.py | 153 ++++++ mirascope/core/openai/call_response.py | 2 +- mirascope/core/openai/call_response_chunk.py | 2 +- mirascope/core/openai/calls.py | 517 ------------------ mirascope/core/openai/streams.py | 159 ------ mirascope/core/openai/structured_streams.py | 68 --- mirascope/core/openai/{tools.py => tool.py} | 0 tests/core/base/test_utils.py | 3 +- 30 files changed, 1087 insertions(+), 846 deletions(-) rename mirascope/core/base/{prompts.py => prompt.py} (100%) create mode 100644 mirascope/core/base/stream.py rename mirascope/core/base/{streams.py => stream_async.py} (55%) rename mirascope/core/base/{structured_streams.py => structured_stream.py} (56%) create mode 100644 mirascope/core/base/structured_stream_async.py rename mirascope/core/base/{tools.py => tool.py} (100%) create mode 100644 mirascope/core/openai/_call.py create mode 100644 mirascope/core/openai/_call_async.py create mode 100644 mirascope/core/openai/_extract.py create mode 100644 mirascope/core/openai/_extract_async.py create mode 100644 mirascope/core/openai/_stream.py create mode 100644 mirascope/core/openai/_stream_async.py create mode 100644 mirascope/core/openai/_structured_stream.py create mode 100644 mirascope/core/openai/_structured_stream_async.py create mode 100644 mirascope/core/openai/call.py create mode 100644 mirascope/core/openai/call_async.py delete mode 100644 mirascope/core/openai/calls.py delete mode 100644 mirascope/core/openai/streams.py delete mode 100644 mirascope/core/openai/structured_streams.py rename mirascope/core/openai/{tools.py => tool.py} (100%) diff --git a/mirascope/core/base/__init__.py b/mirascope/core/base/__init__.py index 2a395bb03..7ace8b203 100644 --- a/mirascope/core/base/__init__.py +++ b/mirascope/core/base/__init__.py @@ -6,10 +6,12 @@ from .call_response_chunk import BaseCallResponseChunk from .function_return import BaseFunctionReturn from .message_param import BaseMessageParam -from .prompts import BasePrompt, tags -from .streams import BaseAsyncStream, BaseStream -from .structured_streams import BaseAsyncStructuredStream, BaseStructuredStream -from .tools import BaseTool +from .prompt import BasePrompt, tags +from .stream import BaseStream +from .stream_async import BaseAsyncStream +from .structured_stream import BaseStructuredStream +from .structured_stream_async import BaseAsyncStructuredStream +from .tool import BaseTool __all__ = [ "BaseAsyncStream", diff --git a/mirascope/core/base/_utils.py b/mirascope/core/base/_utils.py index 57577f0de..6691ff859 100644 --- a/mirascope/core/base/_utils.py +++ b/mirascope/core/base/_utils.py @@ -281,6 +281,10 @@ def extract_tool_return( ) if is_base_type(response_model): temp_model = convert_base_type_to_base_tool(response_model, BaseModel) # type: ignore + if allow_partial: + return partial(temp_model).model_validate(json_obj).value # type: ignore return temp_model.model_validate(json_obj).value # type: ignore + if allow_partial: + return partial(response_model).model_validate(json_obj) # type: ignore return response_model.model_validate(json_obj) # type: ignore diff --git a/mirascope/core/base/call_response.py b/mirascope/core/base/call_response.py index 6069385c1..7045a622c 100644 --- a/mirascope/core/base/call_response.py +++ b/mirascope/core/base/call_response.py @@ -15,7 +15,7 @@ from .call_params import BaseCallParams from .function_return import BaseFunctionReturn -from .tools import BaseTool +from .tool import BaseTool _ResponseT = TypeVar("_ResponseT", bound=Any) _BaseToolT = TypeVar("_BaseToolT", bound=BaseTool) diff --git a/mirascope/core/base/call_response_chunk.py b/mirascope/core/base/call_response_chunk.py index 5886a3148..a1f5389ab 100644 --- a/mirascope/core/base/call_response_chunk.py +++ b/mirascope/core/base/call_response_chunk.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, ConfigDict, field_serializer -from .tools import BaseTool +from .tool import BaseTool _ChunkT = TypeVar("_ChunkT", bound=Any) _BaseToolT = TypeVar("_BaseToolT", bound=BaseTool) diff --git a/mirascope/core/base/function_return.py b/mirascope/core/base/function_return.py index b3f16e542..859f95427 100644 --- a/mirascope/core/base/function_return.py +++ b/mirascope/core/base/function_return.py @@ -5,7 +5,7 @@ from typing_extensions import NotRequired, TypedDict from .call_params import BaseCallParams -from .tools import BaseTool +from .tool import BaseTool _MessageParamT = TypeVar("_MessageParamT", bound=Any) _CallParamsT = TypeVar("_CallParamsT", bound=BaseCallParams) diff --git a/mirascope/core/base/prompts.py b/mirascope/core/base/prompt.py similarity index 100% rename from mirascope/core/base/prompts.py rename to mirascope/core/base/prompt.py diff --git a/mirascope/core/base/stream.py b/mirascope/core/base/stream.py new file mode 100644 index 000000000..76101b4c8 --- /dev/null +++ b/mirascope/core/base/stream.py @@ -0,0 +1,61 @@ +"""This module contains the base classes for streaming responses from LLMs.""" + +from abc import ABC +from collections.abc import Generator +from typing import Any, Generic, TypeVar + +from .call_response_chunk import BaseCallResponseChunk +from .tool import BaseTool + +_BaseCallResponseChunkT = TypeVar( + "_BaseCallResponseChunkT", bound=BaseCallResponseChunk +) +_UserMessageParamT = TypeVar("_UserMessageParamT", bound=Any) +_AssistantMessageParamT = TypeVar("_AssistantMessageParamT", bound=Any) +_BaseToolT = TypeVar("_BaseToolT", bound=BaseTool) + + +class BaseStream( + Generic[ + _BaseCallResponseChunkT, + _UserMessageParamT, + _AssistantMessageParamT, + _BaseToolT, + ], + ABC, +): + """A base class for streaming responses from LLMs.""" + + stream: Generator[_BaseCallResponseChunkT, None, None] + message_param_type: type[_AssistantMessageParamT] + + cost: float | None = None + user_message_param: _UserMessageParamT | None = None + message_param: _BaseToolT + + def __init__( + self, + stream: Generator[_BaseCallResponseChunkT, None, None], + message_param_type: type[_AssistantMessageParamT], + ): + """Initializes an instance of `BaseStream`.""" + self.stream = stream + self.message_param_type = message_param_type + + def __iter__( + self, + ) -> Generator[tuple[_BaseCallResponseChunkT, _BaseToolT | None], None, None]: + """Iterator over the stream and stores useful information.""" + content = "" + for chunk in self.stream: + content += chunk.content + if chunk.cost is not None: + self.cost = chunk.cost + yield chunk, None + self.user_message_param = chunk.user_message_param + kwargs = {"role": "assistant"} + if "message" in self.message_param_type.__annotations__: + kwargs["message"] = content + else: + kwargs["content"] = content + self.message_param = self.message_param_type(**kwargs) diff --git a/mirascope/core/base/streams.py b/mirascope/core/base/stream_async.py similarity index 55% rename from mirascope/core/base/streams.py rename to mirascope/core/base/stream_async.py index 6ecc801f3..ea8267c82 100644 --- a/mirascope/core/base/streams.py +++ b/mirascope/core/base/stream_async.py @@ -1,11 +1,11 @@ -"""This module contains the base classes for streaming responses from LLMs.""" +"""This module contains the base classes for async streaming responses from LLMs.""" from abc import ABC -from collections.abc import AsyncGenerator, Generator +from collections.abc import AsyncGenerator from typing import Any, Generic, TypeVar from .call_response_chunk import BaseCallResponseChunk -from .tools import BaseTool +from .tool import BaseTool _BaseCallResponseChunkT = TypeVar( "_BaseCallResponseChunkT", bound=BaseCallResponseChunk @@ -15,52 +15,6 @@ _BaseToolT = TypeVar("_BaseToolT", bound=BaseTool) -class BaseStream( - Generic[ - _BaseCallResponseChunkT, - _UserMessageParamT, - _AssistantMessageParamT, - _BaseToolT, - ], - ABC, -): - """A base class for streaming responses from LLMs.""" - - stream: Generator[_BaseCallResponseChunkT, None, None] - message_param_type: type[_AssistantMessageParamT] - - cost: float | None = None - user_message_param: _UserMessageParamT | None = None - message_param: _BaseToolT - - def __init__( - self, - stream: Generator[_BaseCallResponseChunkT, None, None], - message_param_type: type[_AssistantMessageParamT], - ): - """Initializes an instance of `BaseStream`.""" - self.stream = stream - self.message_param_type = message_param_type - - def __iter__( - self, - ) -> Generator[tuple[_BaseCallResponseChunkT, _BaseToolT | None], None, None]: - """Iterator over the stream and stores useful information.""" - content = "" - for chunk in self.stream: - content += chunk.content - if chunk.cost is not None: - self.cost = chunk.cost - yield chunk, None - self.user_message_param = chunk.user_message_param - kwargs = {"role": "assistant"} - if "message" in self.message_param_type.__annotations__: - kwargs["message"] = content - else: - kwargs["content"] = content - self.message_param = self.message_param_type(**kwargs) - - class BaseAsyncStream( Generic[ _BaseCallResponseChunkT, diff --git a/mirascope/core/base/structured_streams.py b/mirascope/core/base/structured_stream.py similarity index 56% rename from mirascope/core/base/structured_streams.py rename to mirascope/core/base/structured_stream.py index 0795e23be..805eb417b 100644 --- a/mirascope/core/base/structured_streams.py +++ b/mirascope/core/base/structured_stream.py @@ -1,7 +1,7 @@ """This module contains the base classes for structured streams from LLMs.""" from abc import ABC, abstractmethod -from collections.abc import AsyncGenerator, Generator +from collections.abc import Generator from typing import Any, Generic, TypeVar from pydantic import BaseModel @@ -33,26 +33,3 @@ def __init__( @abstractmethod def __iter__(self) -> Generator[_ResponseModelT, None, None]: """Iterates over the stream and extracts structured outputs.""" - - -class BaseAsyncStructuredStream(Generic[_ChunkT, _ResponseModelT], ABC): - """A base class for async streaming structured outputs from LLMs.""" - - stream: AsyncGenerator[_ChunkT, None] - response_model: type[_ResponseModelT] - json_mode: bool - - def __init__( - self, - stream: AsyncGenerator[_ChunkT, None], - response_model: type[_ResponseModelT], - json_mode: bool = False, - ): - """Initializes an instance of `BaseAsyncStructuredStream`.""" - self.stream = stream - self.response_model = response_model - self.json_mode = json_mode - - @abstractmethod - def __aiter__(self) -> AsyncGenerator[_ResponseModelT, None]: - """Iterates over the stream and extracts structured outputs.""" diff --git a/mirascope/core/base/structured_stream_async.py b/mirascope/core/base/structured_stream_async.py new file mode 100644 index 000000000..6a82d0692 --- /dev/null +++ b/mirascope/core/base/structured_stream_async.py @@ -0,0 +1,35 @@ +"""This module contains the base classes for async structured streams from LLMs.""" + +from abc import ABC, abstractmethod +from collections.abc import AsyncGenerator +from typing import Any, Generic, TypeVar + +from pydantic import BaseModel + +from ._utils import BaseType + +_ChunkT = TypeVar("_ChunkT", bound=Any) +_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | BaseType) + + +class BaseAsyncStructuredStream(Generic[_ChunkT, _ResponseModelT], ABC): + """A base class for async streaming structured outputs from LLMs.""" + + stream: AsyncGenerator[_ChunkT, None] + response_model: type[_ResponseModelT] + json_mode: bool + + def __init__( + self, + stream: AsyncGenerator[_ChunkT, None], + response_model: type[_ResponseModelT], + json_mode: bool = False, + ): + """Initializes an instance of `BaseAsyncStructuredStream`.""" + self.stream = stream + self.response_model = response_model + self.json_mode = json_mode + + @abstractmethod + def __aiter__(self) -> AsyncGenerator[_ResponseModelT, None]: + """Iterates over the stream and extracts structured outputs.""" diff --git a/mirascope/core/base/tools.py b/mirascope/core/base/tool.py similarity index 100% rename from mirascope/core/base/tools.py rename to mirascope/core/base/tool.py diff --git a/mirascope/core/openai/__init__.py b/mirascope/core/openai/__init__.py index 1eb1090fa..8a59be39c 100644 --- a/mirascope/core/openai/__init__.py +++ b/mirascope/core/openai/__init__.py @@ -1,26 +1,23 @@ """The Mirascope OpenAI Module.""" +from .call import openai_call +from .call import openai_call as call +from .call_async import openai_call_async +from .call_async import openai_call_async as call_async from .call_params import OpenAICallParams from .call_response import OpenAICallResponse from .call_response_chunk import OpenAICallResponseChunk -from .calls import openai_call, openai_call_async -from .calls import openai_call as call -from .calls import openai_call_async as call_async from .function_return import OpenAICallFunctionReturn -from .streams import OpenAIAsyncStream, OpenAIStream -from .structured_streams import OpenAIAsyncStructuredStream, OpenAIStructuredStream -from .tools import OpenAITool +from .tool import OpenAITool __all__ = [ "call", - "OpenAIAsyncStream", - "OpenAIAsyncStructuredStream", + "call_async", "OpenAICallFunctionReturn", "OpenAICallParams", "OpenAICallResponse", "OpenAICallResponseChunk", - "OpenAIStream", - "OpenAIStructuredStream", "OpenAITool", "openai_call", + "openai_call_async", ] diff --git a/mirascope/core/openai/_call.py b/mirascope/core/openai/_call.py new file mode 100644 index 000000000..f4f861a76 --- /dev/null +++ b/mirascope/core/openai/_call.py @@ -0,0 +1,51 @@ +"""This module contains the OpenAI `call_decorator` function.""" + +import datetime +import inspect +from functools import wraps +from typing import Callable, ParamSpec + +from openai import OpenAI + +from ..base import BaseTool +from ._utils import openai_api_calculate_cost, setup_call +from .call_params import OpenAICallParams +from .call_response import OpenAICallResponse +from .function_return import OpenAICallFunctionReturn + +_P = ParamSpec("_P") + + +def call_decorator( + fn: Callable[_P, OpenAICallFunctionReturn], + model: str, + tools: list[type[BaseTool] | Callable] | None, + call_params: OpenAICallParams, +) -> Callable[_P, OpenAICallResponse]: + @wraps(fn) + def inner(*args: _P.args, **kwargs: _P.kwargs) -> OpenAICallResponse: + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = fn(*args, **kwargs) + prompt_template, messages, tool_types, call_kwargs = setup_call( + fn, fn_args, fn_return, tools, call_params + ) + client = OpenAI() + start_time = datetime.datetime.now().timestamp() * 1000 + response = client.chat.completions.create( + model=model, stream=False, messages=messages, **call_kwargs + ) + return OpenAICallResponse( + response=response, + tool_types=tool_types, + prompt_template=prompt_template, + fn_args=fn_args, + fn_return=fn_return, + messages=messages, + call_params=call_kwargs, + user_message_param=messages[-1] if messages[-1]["role"] == "user" else None, + start_time=start_time, + end_time=datetime.datetime.now().timestamp() * 1000, + cost=openai_api_calculate_cost(response.usage, response.model), + ) + + return inner diff --git a/mirascope/core/openai/_call_async.py b/mirascope/core/openai/_call_async.py new file mode 100644 index 000000000..5e8254335 --- /dev/null +++ b/mirascope/core/openai/_call_async.py @@ -0,0 +1,54 @@ +"""This module contains the OpenAI `call_async_decorator` function.""" + +import datetime +import inspect +from functools import wraps +from typing import Awaitable, Callable, ParamSpec + +from openai import AsyncOpenAI + +from ..base import BaseTool +from ._utils import ( + openai_api_calculate_cost, + setup_call, +) +from .call_params import OpenAICallParams +from .call_response import OpenAICallResponse +from .function_return import OpenAICallFunctionReturn + +_P = ParamSpec("_P") + + +def call_async_decorator( + fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], + model: str, + tools: list[type[BaseTool] | Callable] | None, + call_params: OpenAICallParams, +) -> Callable[_P, Awaitable[OpenAICallResponse]]: + @wraps(fn) + async def inner_async(*args: _P.args, **kwargs: _P.kwargs) -> OpenAICallResponse: + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = await fn(*args, **kwargs) + prompt_template, messages, tool_types, call_kwargs = setup_call( + fn, fn_args, fn_return, tools, call_params + ) + client = AsyncOpenAI() + start_time = datetime.datetime.now().timestamp() * 1000 + response = await client.chat.completions.create( + model=model, stream=False, messages=messages, **call_kwargs + ) + return OpenAICallResponse( + response=response, + tool_types=tool_types, + prompt_template=prompt_template, + fn_args=fn_args, + fn_return=fn_return, + messages=messages, + call_params=call_kwargs, + user_message_param=messages[-1] if messages[-1]["role"] == "user" else None, + start_time=start_time, + end_time=datetime.datetime.now().timestamp() * 1000, + cost=openai_api_calculate_cost(response.usage, response.model), + ) + + return inner_async diff --git a/mirascope/core/openai/_extract.py b/mirascope/core/openai/_extract.py new file mode 100644 index 000000000..f3566d90c --- /dev/null +++ b/mirascope/core/openai/_extract.py @@ -0,0 +1,54 @@ +"""This module contains the OpenAI `extract_decorator` function.""" + +import inspect +from functools import wraps +from typing import Callable, ParamSpec, TypeVar + +from openai import OpenAI +from pydantic import BaseModel + +from ..base import _utils +from ._utils import setup_extract +from .call_params import OpenAICallParams +from .function_return import OpenAICallFunctionReturn +from .tool import OpenAITool + +_P = ParamSpec("_P") +_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) + + +def extract_decorator( + fn: Callable[_P, OpenAICallFunctionReturn], + model: str, + response_model: type[_ResponseModelT], + call_params: OpenAICallParams, +) -> Callable[_P, _ResponseModelT]: + assert response_model is not None + tool = _utils.setup_extract_tool(response_model, OpenAITool) + + @wraps(fn) + def inner(*args: _P.args, **kwargs: _P.kwargs) -> _ResponseModelT: + assert response_model is not None + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = fn(*args, **kwargs) + json_mode, messages, call_kwargs = setup_extract( + fn, fn_args, fn_return, tool, call_params + ) + client = OpenAI() + response = client.chat.completions.create( + model=model, stream=False, messages=messages, **call_kwargs + ) + + if json_mode and (content := response.choices[0].message.content): + json_output = content + elif tool_calls := response.choices[0].message.tool_calls: + json_output = tool_calls[0].function.arguments + else: + raise ValueError("No tool call or JSON object found in response.") + + output = _utils.extract_tool_return(response_model, json_output, False) + if isinstance(response_model, BaseModel): + output._response = response # type: ignore + return output + + return inner diff --git a/mirascope/core/openai/_extract_async.py b/mirascope/core/openai/_extract_async.py new file mode 100644 index 000000000..b68e6a75c --- /dev/null +++ b/mirascope/core/openai/_extract_async.py @@ -0,0 +1,54 @@ +"""This module contains the OpenAI `extract_async_decorator` function.""" + +import inspect +from functools import wraps +from typing import Awaitable, Callable, ParamSpec, TypeVar + +from openai import AsyncOpenAI +from pydantic import BaseModel + +from ..base import _utils +from ._utils import setup_extract +from .call_params import OpenAICallParams +from .function_return import OpenAICallFunctionReturn +from .tool import OpenAITool + +_P = ParamSpec("_P") +_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) + + +def extract_async_decorator( + fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], + model: str, + response_model: type[_ResponseModelT], + call_params: OpenAICallParams, +) -> Callable[_P, Awaitable[_ResponseModelT]]: + assert response_model is not None + tool = _utils.setup_extract_tool(response_model, OpenAITool) + + @wraps(fn) + async def inner(*args: _P.args, **kwargs: _P.kwargs) -> _ResponseModelT: + assert response_model is not None + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = await fn(*args, **kwargs) + json_mode, messages, call_kwargs = setup_extract( + fn, fn_args, fn_return, tool, call_params + ) + client = AsyncOpenAI() + response = await client.chat.completions.create( + model=model, stream=False, messages=messages, **call_kwargs + ) + + if json_mode and (content := response.choices[0].message.content): + json_output = content + elif tool_calls := response.choices[0].message.tool_calls: + json_output = tool_calls[0].function.arguments + else: + raise ValueError("No tool call or JSON object found in response.") + + output = _utils.extract_tool_return(response_model, json_output, False) + if isinstance(response_model, BaseModel): + output._response = response # type: ignore + return output + + return inner diff --git a/mirascope/core/openai/_stream.py b/mirascope/core/openai/_stream.py new file mode 100644 index 000000000..98276bd8d --- /dev/null +++ b/mirascope/core/openai/_stream.py @@ -0,0 +1,107 @@ +"""This module contains the OpenAI `stream_decorator` function.""" + +import inspect +from collections.abc import Generator +from functools import wraps +from typing import Callable, ParamSpec + +from openai import AzureOpenAI, OpenAI +from openai.types.chat import ( + ChatCompletionAssistantMessageParam, + ChatCompletionMessageToolCall, + ChatCompletionUserMessageParam, +) +from openai.types.chat.chat_completion_message_tool_call import Function + +from ..base import BaseStream, BaseTool +from ._utils import handle_chunk, openai_api_calculate_cost, setup_call +from .call_params import OpenAICallParams +from .call_response import OpenAICallResponse +from .call_response_chunk import OpenAICallResponseChunk +from .function_return import OpenAICallFunctionReturn +from .tool import OpenAITool + +_P = ParamSpec("_P") + + +class OpenAIStream( + BaseStream[ + OpenAICallResponseChunk, + ChatCompletionUserMessageParam, + ChatCompletionAssistantMessageParam, + OpenAITool, + ] +): + """A class for streaming responses from OpenAI's API.""" + + def __init__(self, stream: Generator[OpenAICallResponseChunk, None, None]): + """Initializes an instance of `OpenAIStream`.""" + super().__init__(stream, ChatCompletionAssistantMessageParam) + + def __iter__( + self, + ) -> Generator[tuple[OpenAICallResponseChunk, OpenAITool | None], None, None]: + """Iterator over the stream and constructs tools as they are streamed.""" + current_tool_call = ChatCompletionMessageToolCall( + id="", function=Function(arguments="", name=""), type="function" + ) + current_tool_type, tool_calls = None, [] + for chunk, _ in super().__iter__(): + if not chunk.tool_types or not chunk.tool_calls: + if current_tool_type: + yield chunk, current_tool_type.from_tool_call(current_tool_call) + tool_calls.append(current_tool_call) + current_tool_type = None + else: + yield chunk, None + tool, current_tool_call, current_tool_type = handle_chunk( + chunk, current_tool_call, current_tool_type + ) + if tool is not None: + yield chunk, tool + tool_calls.append(tool.tool_call) + if tool_calls: + self.message_param["tool_calls"] = tool_calls # type: ignore + + @classmethod + def tool_message_params(cls, tools_and_outputs): + """Returns the tool message parameters for tool call results.""" + return OpenAICallResponse.tool_message_params(tools_and_outputs) + + +def stream_decorator( + fn: Callable[_P, OpenAICallFunctionReturn], + model: str, + tools: list[type[BaseTool] | Callable] | None, + call_params: OpenAICallParams, +) -> Callable[_P, OpenAIStream]: + @wraps(fn) + def inner(*args: _P.args, **kwargs: _P.kwargs) -> OpenAIStream: + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = fn(*args, **kwargs) + _, messages, tool_types, call_kwargs = setup_call( + fn, fn_args, fn_return, tools, call_params + ) + client = OpenAI() + + if not isinstance(client, AzureOpenAI): + call_kwargs["stream_options"] = {"include_usage": True} + + stream = client.chat.completions.create( + model=model, stream=True, messages=messages, **call_kwargs + ) + + def generator(): + for chunk in stream: + yield OpenAICallResponseChunk( + chunk=chunk, + user_message_param=messages[-1] + if messages[-1]["role"] == "user" + else None, + tool_types=tool_types, + cost=openai_api_calculate_cost(chunk.usage, chunk.model), + ) + + return OpenAIStream(generator()) + + return inner diff --git a/mirascope/core/openai/_stream_async.py b/mirascope/core/openai/_stream_async.py new file mode 100644 index 000000000..908bc1ff4 --- /dev/null +++ b/mirascope/core/openai/_stream_async.py @@ -0,0 +1,120 @@ +"""This module contains the OpenAI `stream_async_decorator` function.""" + +import inspect +from collections.abc import AsyncGenerator +from functools import wraps +from typing import ( + Awaitable, + Callable, + ParamSpec, +) + +from openai import AsyncAzureOpenAI, AsyncOpenAI +from openai.types.chat import ( + ChatCompletionAssistantMessageParam, + ChatCompletionMessageToolCall, + ChatCompletionUserMessageParam, +) +from openai.types.chat.chat_completion_message_tool_call import Function + +from ..base import BaseAsyncStream, BaseTool +from ._utils import ( + handle_chunk, + openai_api_calculate_cost, + setup_call, +) +from .call_params import OpenAICallParams +from .call_response import OpenAICallResponse +from .call_response_chunk import OpenAICallResponseChunk +from .function_return import OpenAICallFunctionReturn +from .tool import OpenAITool + +_P = ParamSpec("_P") + + +class OpenAIAsyncStream( + BaseAsyncStream[ + OpenAICallResponseChunk, + ChatCompletionUserMessageParam, + ChatCompletionAssistantMessageParam, + OpenAITool, + ] +): + """A class for streaming responses from OpenAI's API.""" + + def __init__(self, stream: AsyncGenerator[OpenAICallResponseChunk, None]): + """Initializes an instance of `OpenAIAsyncStream`.""" + super().__init__(stream, ChatCompletionAssistantMessageParam) + + def __aiter__( + self, + ) -> AsyncGenerator[tuple[OpenAICallResponseChunk, OpenAITool | None], None]: + """Iterator over the stream and constructs tools as they are streamed.""" + stream = super().__aiter__() + + async def generator(): + current_tool_call = ChatCompletionMessageToolCall( + id="", function=Function(arguments="", name=""), type="function" + ) + current_tool_type, tool_calls = None, [] + async for chunk, _ in stream: + if not chunk.tool_types or not chunk.tool_calls: + if current_tool_type: + yield chunk, current_tool_type.from_tool_call(current_tool_call) + tool_calls.append(current_tool_call) + current_tool_type = None + else: + yield chunk, None + tool, current_tool_call, current_tool_type = handle_chunk( + chunk, current_tool_call, current_tool_type + ) + if tool is not None: + yield chunk, tool + tool_calls.append(tool.tool_call) + if tool_calls: + self.message_param["tool_calls"] = tool_calls # type: ignore + + return generator() + + @classmethod + def tool_message_params(cls, tools_and_outputs): + """Returns the tool message parameters for tool call results.""" + return OpenAICallResponse.tool_message_params(tools_and_outputs) + + +def stream_async_decorator( + fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], + model: str, + tools: list[type[BaseTool] | Callable] | None, + call_params: OpenAICallParams, +) -> Callable[_P, Awaitable[OpenAIAsyncStream]]: + @wraps(fn) + async def inner_async(*args: _P.args, **kwargs: _P.kwargs) -> OpenAIAsyncStream: + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = await fn(*args, **kwargs) + _, messages, tool_types, call_kwargs = setup_call( + fn, fn_args, fn_return, tools, call_params + ) + client = AsyncOpenAI() + + if not isinstance(client, AsyncAzureOpenAI): + call_kwargs["stream_options"] = {"include_usage": True} + + stream = await client.chat.completions.create( + model=model, stream=True, messages=messages, **call_kwargs + ) + + async def generator(): + async for chunk in stream: + yield OpenAICallResponseChunk( + chunk=chunk, + user_message_param=messages[-1] + if messages[-1]["role"] == "user" + else None, + tool_types=tool_types, + cost=openai_api_calculate_cost(chunk.usage, chunk.model), + ) + + return OpenAIAsyncStream(generator()) + + return inner_async diff --git a/mirascope/core/openai/_structured_stream.py b/mirascope/core/openai/_structured_stream.py new file mode 100644 index 000000000..2336f6987 --- /dev/null +++ b/mirascope/core/openai/_structured_stream.py @@ -0,0 +1,76 @@ +"""This module contains the OpenAI `structured_stream_decorator` function.""" + +import inspect +from collections.abc import Generator +from functools import wraps +from typing import Callable, Generic, Iterable, ParamSpec, TypeVar + +from openai import OpenAI +from openai.types.chat import ChatCompletionChunk +from pydantic import BaseModel + +from ..base import BaseStructuredStream, _utils +from ._utils import setup_extract +from .call_params import OpenAICallParams +from .function_return import OpenAICallFunctionReturn +from .tool import OpenAITool + +_P = ParamSpec("_P") +_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) + + +class OpenAIStructuredStream( + Generic[_ResponseModelT], + BaseStructuredStream[ChatCompletionChunk, _ResponseModelT], +): + """A class for streaming structured outputs from OpenAI's API.""" + + def __iter__(self) -> Generator[_ResponseModelT, None, None]: + """Iterates over the stream and extracts structured outputs.""" + json_output = "" + for chunk in self.stream: + if self.json_mode and (content := chunk.choices[0].delta.content): + json_output += content + elif ( + (tool_calls := chunk.choices[0].delta.tool_calls) + and (function := tool_calls[0].function) + and (arguments := function.arguments) + ): + json_output += arguments + else: + ValueError("No tool call or JSON object found in response.") + if json_output: + yield _utils.extract_tool_return(self.response_model, json_output, True) + yield _utils.extract_tool_return(self.response_model, json_output, False) + + +def structured_stream_decorator( + fn: Callable[_P, OpenAICallFunctionReturn], + model: str, + response_model: type[_ResponseModelT], + call_params: OpenAICallParams, +) -> Callable[_P, Iterable[_ResponseModelT]]: + assert response_model is not None + tool = _utils.setup_extract_tool(response_model, OpenAITool) + + @wraps(fn) + def inner(*args: _P.args, **kwargs: _P.kwargs) -> Iterable[_ResponseModelT]: + assert response_model is not None + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = fn(*args, **kwargs) + json_mode, messages, call_kwargs = setup_extract( + fn, fn_args, fn_return, tool, call_params + ) + client = OpenAI() + return OpenAIStructuredStream( + stream=( + chunk + for chunk in client.chat.completions.create( + model=model, stream=True, messages=messages, **call_kwargs + ) + ), + response_model=response_model, + json_mode=json_mode, + ) + + return inner diff --git a/mirascope/core/openai/_structured_stream_async.py b/mirascope/core/openai/_structured_stream_async.py new file mode 100644 index 000000000..b70129604 --- /dev/null +++ b/mirascope/core/openai/_structured_stream_async.py @@ -0,0 +1,92 @@ +"""This module contains the OpenAI `structured_stream_async_decorator` function.""" + +import inspect +from collections.abc import AsyncGenerator +from functools import wraps +from typing import ( + AsyncIterable, + Awaitable, + Callable, + Generic, + ParamSpec, + TypeVar, +) + +from openai import AsyncOpenAI +from openai.types.chat import ChatCompletionChunk +from pydantic import BaseModel + +from ..base import BaseAsyncStructuredStream, _utils +from ._utils import setup_extract +from .call_params import OpenAICallParams +from .function_return import OpenAICallFunctionReturn +from .tool import OpenAITool + +_P = ParamSpec("_P") +_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) + + +class OpenAIAsyncStructuredStream( + Generic[_ResponseModelT], + BaseAsyncStructuredStream[ChatCompletionChunk, _ResponseModelT], +): + """A class for async streaming structured outputs from OpenAI's API.""" + + def __aiter__(self) -> AsyncGenerator[_ResponseModelT, None]: + """Iterates over the stream and extracts structured outputs.""" + + async def generator(): + nonlocal self + json_output = "" + async for chunk in self.stream: + if self.json_mode and (content := chunk.choices[0].delta.content): + json_output += content + elif ( + (tool_calls := chunk.choices[0].delta.tool_calls) + and (function := tool_calls[0].function) + and (arguments := function.arguments) + ): + json_output += arguments + else: + ValueError("No tool call or JSON object found in response.") + if json_output: + yield _utils.extract_tool_return( + self.response_model, json_output, True + ) + yield _utils.extract_tool_return(self.response_model, json_output, False) + + return generator() + + +def structured_stream_async_decorator( + fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], + model: str, + response_model: type[_ResponseModelT], + call_params: OpenAICallParams, +) -> Callable[_P, Awaitable[AsyncIterable[_ResponseModelT]]]: + assert response_model is not None + tool = _utils.setup_extract_tool(response_model, OpenAITool) + + @wraps(fn) + async def inner( + *args: _P.args, **kwargs: _P.kwargs + ) -> AsyncIterable[_ResponseModelT]: + assert response_model is not None + fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments + fn_return = await fn(*args, **kwargs) + json_mode, messages, call_kwargs = setup_extract( + fn, fn_args, fn_return, tool, call_params + ) + client = AsyncOpenAI() + return OpenAIAsyncStructuredStream( + stream=( + chunk + async for chunk in await client.chat.completions.create( + model=model, stream=True, messages=messages, **call_kwargs + ) + ), + response_model=response_model, + json_mode=json_mode, + ) + + return inner diff --git a/mirascope/core/openai/_utils.py b/mirascope/core/openai/_utils.py index 4e5ed33b6..b03d40293 100644 --- a/mirascope/core/openai/_utils.py +++ b/mirascope/core/openai/_utils.py @@ -5,15 +5,19 @@ from textwrap import dedent from typing import Any, Callable, TypeVar, overload -import jiter -from openai.types.chat import ChatCompletionMessageParam +from openai.types.chat import ( + ChatCompletionMessageParam, + ChatCompletionMessageToolCall, +) +from openai.types.chat.chat_completion_message_tool_call import Function from openai.types.completion_usage import CompletionUsage from pydantic import BaseModel -from ..base import BaseTool, _partial, _utils +from ..base import BaseTool, _utils from .call_params import OpenAICallParams +from .call_response_chunk import OpenAICallResponseChunk from .function_return import OpenAICallFunctionReturn -from .tools import OpenAITool +from .tool import OpenAITool _ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) @@ -30,8 +34,7 @@ def setup_call( list[ChatCompletionMessageParam], None, OpenAICallParams, -]: - ... # pragma: no cover +]: ... # pragma: no cover @overload @@ -46,8 +49,7 @@ def setup_call( list[ChatCompletionMessageParam], list[type[OpenAITool]], OpenAICallParams, -]: - ... # pragma: no cover +]: ... # pragma: no cover def setup_call( @@ -96,6 +98,55 @@ def setup_call( return prompt_template, messages, tool_types, call_kwargs # type: ignore +def handle_chunk( + chunk: OpenAICallResponseChunk, + current_tool_call: ChatCompletionMessageToolCall, + current_tool_type: type[OpenAITool] | None, +) -> tuple[ + OpenAITool | None, + ChatCompletionMessageToolCall, + type[OpenAITool] | None, +]: + """Handles a chunk of the stream.""" + if not chunk.tool_types or not chunk.tool_calls: + return None, current_tool_call, current_tool_type + + tool_call = chunk.tool_calls[0] + # Reset on new tool + if tool_call.id and tool_call.function is not None: + previous_tool_call = current_tool_call.model_copy() + previous_tool_type = current_tool_type + current_tool_call = ChatCompletionMessageToolCall( + id=tool_call.id, + function=Function( + arguments="", + name=tool_call.function.name if tool_call.function.name else "", + ), + type="function", + ) + current_tool_type = None + for tool_type in chunk.tool_types: + if tool_type._name() == tool_call.function.name: + current_tool_type = tool_type + break + if current_tool_type is None: + raise RuntimeError( + f"Unknown tool type in stream: {tool_call.function.name}" + ) + if previous_tool_call.id and previous_tool_type is not None: + return ( + previous_tool_type.from_tool_call(previous_tool_call), + current_tool_call, + current_tool_type, + ) + + # Update arguments with each chunk + if tool_call.function and tool_call.function.arguments: + current_tool_call.function.arguments += tool_call.function.arguments + + return None, current_tool_call, current_tool_type + + def setup_extract( fn: Callable, fn_args: dict[str, Any], diff --git a/mirascope/core/openai/call.py b/mirascope/core/openai/call.py new file mode 100644 index 000000000..72ea235b3 --- /dev/null +++ b/mirascope/core/openai/call.py @@ -0,0 +1,144 @@ +"""The `openai_call` decorator for functions as LLM calls.""" + +from functools import partial +from typing import ( + Callable, + Iterable, + Literal, + ParamSpec, + TypeVar, + Unpack, + overload, +) + +from pydantic import BaseModel + +from ..base import BaseTool, _utils +from ._call import call_decorator +from ._extract import extract_decorator +from ._stream import OpenAIStream, stream_decorator +from ._structured_stream import structured_stream_decorator +from .call_params import OpenAICallParams +from .call_response import OpenAICallResponse +from .function_return import OpenAICallFunctionReturn + +_P = ParamSpec("_P") +_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) + + +@overload +def openai_call( + model: str, + *, + stream: Literal[False] = False, + tools: list[type[BaseTool] | Callable] | None = None, + response_model: None = None, + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, OpenAICallFunctionReturn]], + Callable[_P, OpenAICallResponse], +]: ... # pragma: no cover + + +@overload +def openai_call( + model: str, + *, + stream: Literal[False] = False, + tools: list[type[BaseTool] | Callable] | None = None, + response_model: type[_ResponseModelT], + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, OpenAICallFunctionReturn]], + Callable[_P, _ResponseModelT], +]: ... # pragma: no cover + + +@overload +def openai_call( + model: str, + *, + stream: Literal[True], + tools: list[type[BaseTool] | Callable] | None = None, + response_model: None = None, + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, OpenAICallFunctionReturn]], + Callable[_P, OpenAIStream], +]: ... # pragma: no cover + + +@overload +def openai_call( + model: str, + *, + stream: Literal[True], + tools: list[type[BaseTool] | Callable] | None = None, + response_model: type[_ResponseModelT], + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, OpenAICallFunctionReturn]], + Callable[_P, Iterable[_ResponseModelT]], +]: ... # pragma: no cover + + +def openai_call( + model: str, + *, + stream: bool = False, + tools: list[type[BaseTool] | Callable] | None = None, + response_model: type[_ResponseModelT] | None = None, + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, OpenAICallFunctionReturn]], + Callable[ + _P, + OpenAICallResponse | OpenAIStream | _ResponseModelT | Iterable[_ResponseModelT], + ], +]: + '''A decorator for calling the OpenAI API with a typed function. + + This decorator is used to wrap a typed function that calls the OpenAI API. It parses + the docstring of the wrapped function as the messages array and templates the input + arguments for the function into each message's template. + + Example: + + ```python + @openai_call(model="gpt-4o") + def recommend_book(genre: str): + """Recommend a {genre} book.""" + + response = recommend_book("fantasy") + ``` + + Args: + model: The OpenAI model to use in the API call. + stream: Whether to stream the response from the API call. + tools: The tools to use in the OpenAI API call. + **call_params: The `OpenAICallParams` call parameters to use in the API call. + + Returns: + The decorator for turning a typed function into an OpenAI API call. + ''' + + if response_model: + if stream: + return partial( + structured_stream_decorator, + model=model, + response_model=response_model, + call_params=call_params, + ) + else: + return partial( + extract_decorator, + model=model, + response_model=response_model, + call_params=call_params, + ) + if stream: + return partial( + stream_decorator, model=model, tools=tools, call_params=call_params + ) + return partial(call_decorator, model=model, tools=tools, call_params=call_params) diff --git a/mirascope/core/openai/call_async.py b/mirascope/core/openai/call_async.py new file mode 100644 index 000000000..08a0f4de9 --- /dev/null +++ b/mirascope/core/openai/call_async.py @@ -0,0 +1,153 @@ +"""The `openai_call_async` decorator for easy OpenAI API typed functions.""" + +from functools import partial +from typing import ( + AsyncIterable, + Awaitable, + Callable, + Literal, + ParamSpec, + TypeVar, + Unpack, + overload, +) + +from pydantic import BaseModel + +from ..base import BaseTool, _utils +from ._call_async import call_async_decorator +from ._extract_async import extract_async_decorator +from ._stream_async import OpenAIAsyncStream, stream_async_decorator +from ._structured_stream_async import structured_stream_async_decorator +from .call_params import OpenAICallParams +from .call_response import OpenAICallResponse +from .function_return import OpenAICallFunctionReturn + +_P = ParamSpec("_P") +_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) + + +@overload +def openai_call_async( + model: str, + *, + stream: Literal[False] = False, + tools: list[type[BaseTool] | Callable] | None = None, + response_model: None = None, + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], + Callable[_P, Awaitable[OpenAICallResponse]], +]: ... # pragma: no cover + + +@overload +def openai_call_async( + model: str, + *, + stream: Literal[False] = False, + tools: list[type[BaseTool] | Callable] | None = None, + response_model: type[_ResponseModelT], + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], + Callable[_P, Awaitable[_ResponseModelT]], +]: ... # pragma: no cover + + +@overload +def openai_call_async( + model: str, + *, + stream: Literal[True], + tools: list[type[BaseTool] | Callable] | None = None, + response_model: None = None, + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], + Callable[_P, Awaitable[OpenAIAsyncStream]], +]: ... # pragma: no cover + + +@overload +def openai_call_async( + model: str, + *, + stream: Literal[True], + tools: list[type[BaseTool] | Callable] | None = None, + response_model: type[_ResponseModelT], + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], + Callable[_P, Awaitable[AsyncIterable[_ResponseModelT]]], +]: ... # pragma: no cover + + +def openai_call_async( + model: str, + *, + stream: bool = False, + tools: list[type[BaseTool] | Callable] | None = None, + response_model: type[_ResponseModelT] | None = None, + **call_params: Unpack[OpenAICallParams], +) -> Callable[ + [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], + Callable[ + _P, + Awaitable[OpenAICallResponse] + | Awaitable[OpenAIAsyncStream] + | Awaitable[_ResponseModelT] + | Awaitable[AsyncIterable[_ResponseModelT]], + ], +]: + '''A decorator for calling the AsyncOpenAI API with a typed function. + + This decorator is used to wrap a typed function that calls the OpenAI API. It parses + the docstring of the wrapped function as the messages array and templates the input + arguments for the function into each message's template. + + Example: + + ```python + @openai_call_async(model="gpt-4o") + async def recommend_book(genre: str): + """Recommend a {genre} book.""" + + async def run(): + response = await recommend_book("fantasy") + + asyncio.run(run()) + ``` + + Args: + model: The OpenAI model to use in the API call. + stream: Whether to stream the response from the API call. + tools: The tools to use in the OpenAI API call. + **call_params: The `OpenAICallParams` call parameters to use in the API call. + + Returns: + The decorator for turning a typed function into an AsyncOpenAI API call. + ''' + + if response_model: + if stream: + return partial( + structured_stream_async_decorator, + model=model, + response_model=response_model, + call_params=call_params, + ) + else: + return partial( + extract_async_decorator, + model=model, + response_model=response_model, + call_params=call_params, + ) + if stream: + return partial( + stream_async_decorator, model=model, tools=tools, call_params=call_params + ) + return partial( + call_async_decorator, model=model, tools=tools, call_params=call_params + ) diff --git a/mirascope/core/openai/call_response.py b/mirascope/core/openai/call_response.py index 2b2aee4bc..f587e926a 100644 --- a/mirascope/core/openai/call_response.py +++ b/mirascope/core/openai/call_response.py @@ -16,7 +16,7 @@ from ..base import BaseCallResponse from .call_params import OpenAICallParams from .function_return import OpenAICallFunctionReturn -from .tools import OpenAITool +from .tool import OpenAITool class OpenAICallResponse( diff --git a/mirascope/core/openai/call_response_chunk.py b/mirascope/core/openai/call_response_chunk.py index 2395bc9d5..8e9194810 100644 --- a/mirascope/core/openai/call_response_chunk.py +++ b/mirascope/core/openai/call_response_chunk.py @@ -12,7 +12,7 @@ from openai.types.completion_usage import CompletionUsage from ..base import BaseCallResponseChunk -from .tools import OpenAITool +from .tool import OpenAITool class OpenAICallResponseChunk( diff --git a/mirascope/core/openai/calls.py b/mirascope/core/openai/calls.py deleted file mode 100644 index 0c5e9692b..000000000 --- a/mirascope/core/openai/calls.py +++ /dev/null @@ -1,517 +0,0 @@ -"""The `openai_call` decorator for easy OpenAI API typed functions.""" - -import datetime -import inspect -from functools import wraps -from typing import ( - AsyncIterable, - Awaitable, - Callable, - Iterable, - Literal, - ParamSpec, - TypeVar, - Unpack, - overload, -) - -from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI -from pydantic import BaseModel - -from ..base import BaseTool, _utils -from ._utils import ( - openai_api_calculate_cost, - setup_call, - setup_extract, -) -from .call_params import OpenAICallParams -from .call_response import OpenAICallResponse -from .call_response_chunk import OpenAICallResponseChunk -from .function_return import OpenAICallFunctionReturn -from .streams import OpenAIAsyncStream, OpenAIStream -from .structured_streams import OpenAIAsyncStructuredStream, OpenAIStructuredStream -from .tools import OpenAITool - -_P = ParamSpec("_P") -_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) - - -@overload -def openai_call( - model: str, - *, - stream: Literal[False] = False, - tools: list[type[BaseTool] | Callable] | None = None, - response_model: None = None, - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, OpenAICallFunctionReturn]], - Callable[_P, OpenAICallResponse], -]: - ... # pragma: no cover - - -@overload -def openai_call( - model: str, - *, - stream: Literal[False] = False, - tools: list[type[BaseTool] | Callable] | None = None, - response_model: type[_ResponseModelT], - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, OpenAICallFunctionReturn]], - Callable[_P, _ResponseModelT], -]: - ... # pragma: no cover - - -@overload -def openai_call( - model: str, - *, - stream: Literal[True], - tools: list[type[BaseTool] | Callable] | None = None, - response_model: None = None, - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, OpenAICallFunctionReturn]], - Callable[_P, OpenAIStream], -]: - ... # pragma: no cover - - -@overload -def openai_call( - model: str, - *, - stream: Literal[True], - tools: list[type[BaseTool] | Callable] | None = None, - response_model: type[_ResponseModelT], - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, OpenAICallFunctionReturn]], - Callable[_P, Iterable[_ResponseModelT]], -]: - ... # pragma: no cover - - -def openai_call( - model: str, - *, - stream: bool = False, - tools: list[type[BaseTool] | Callable] | None = None, - response_model: type[_ResponseModelT] | None = None, - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, OpenAICallFunctionReturn]], - Callable[ - _P, - OpenAICallResponse | OpenAIStream | _ResponseModelT | Iterable[_ResponseModelT], - ], -]: - '''A decorator for calling the OpenAI API with a typed function. - - This decorator is used to wrap a typed function that calls the OpenAI API. It parses - the docstring of the wrapped function as the messages array and templates the input - arguments for the function into each message's template. - - Example: - - ```python - @openai_call(model="gpt-4o") - def recommend_book(genre: str): - """Recommend a {genre} book.""" - - response = recommend_book("fantasy") - ``` - - Args: - model: The OpenAI model to use in the API call. - stream: Whether to stream the response from the API call. - tools: The tools to use in the OpenAI API call. - **call_params: The `OpenAICallParams` call parameters to use in the API call. - - Returns: - The decorator for turning a typed function into an OpenAI API call. - ''' - - def call_decorator( - fn: Callable[_P, OpenAICallFunctionReturn], - ) -> Callable[_P, OpenAICallResponse]: - @wraps(fn) - def inner(*args: _P.args, **kwargs: _P.kwargs) -> OpenAICallResponse: - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = fn(*args, **kwargs) - prompt_template, messages, tool_types, call_kwargs = setup_call( - fn, fn_args, fn_return, tools, call_params - ) - client = OpenAI() - start_time = datetime.datetime.now().timestamp() * 1000 - response = client.chat.completions.create( - model=model, stream=False, messages=messages, **call_kwargs - ) - return OpenAICallResponse( - response=response, - tool_types=tool_types, - prompt_template=prompt_template, - fn_args=fn_args, - fn_return=fn_return, - messages=messages, - call_params=call_kwargs, - user_message_param=messages[-1] - if messages[-1]["role"] == "user" - else None, - start_time=start_time, - end_time=datetime.datetime.now().timestamp() * 1000, - cost=openai_api_calculate_cost(response.usage, response.model), - ) - - return inner - - def stream_decorator( - fn: Callable[_P, OpenAICallFunctionReturn], - ) -> Callable[_P, OpenAIStream]: - @wraps(fn) - def inner(*args: _P.args, **kwargs: _P.kwargs) -> OpenAIStream: - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = fn(*args, **kwargs) - _, messages, tool_types, call_kwargs = setup_call( - fn, fn_args, fn_return, tools, call_params - ) - client = OpenAI() - - if not isinstance(client, AzureOpenAI): - call_kwargs["stream_options"] = {"include_usage": True} - - stream = client.chat.completions.create( - model=model, stream=True, messages=messages, **call_kwargs - ) - - def generator(): - for chunk in stream: - yield OpenAICallResponseChunk( - chunk=chunk, - user_message_param=messages[-1] - if messages[-1]["role"] == "user" - else None, - tool_types=tool_types, - cost=openai_api_calculate_cost(chunk.usage, chunk.model), - ) - - return OpenAIStream(generator()) - - return inner - - def extract_decorator( - fn: Callable[_P, OpenAICallFunctionReturn], - ) -> Callable[_P, _ResponseModelT]: - assert response_model is not None - tool = _utils.setup_extract_tool(response_model, OpenAITool) - - @wraps(fn) - def inner(*args: _P.args, **kwargs: _P.kwargs) -> _ResponseModelT: - assert response_model is not None - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = fn(*args, **kwargs) - json_mode, messages, call_kwargs = setup_extract( - fn, fn_args, fn_return, tool, call_params - ) - client = OpenAI() - response = client.chat.completions.create( - model=model, stream=False, messages=messages, **call_kwargs - ) - - if json_mode and (content := response.choices[0].message.content): - json_output = content - elif tool_calls := response.choices[0].message.tool_calls: - json_output = tool_calls[0].function.arguments - else: - raise ValueError("No tool call or JSON object found in response.") - - output = _utils.extract_tool_return(response_model, json_output, False) - if isinstance(response_model, BaseModel): - output._response = response # type: ignore - return output - - return inner - - def extract_stream_decorator( - fn: Callable[_P, OpenAICallFunctionReturn], - ) -> Callable[_P, Iterable[_ResponseModelT]]: - assert response_model is not None - tool = _utils.setup_extract_tool(response_model, OpenAITool) - - @wraps(fn) - def inner(*args: _P.args, **kwargs: _P.kwargs) -> Iterable[_ResponseModelT]: - assert response_model is not None - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = fn(*args, **kwargs) - json_mode, messages, call_kwargs = setup_extract( - fn, fn_args, fn_return, tool, call_params - ) - client = OpenAI() - return OpenAIStructuredStream( - stream=( - chunk - for chunk in client.chat.completions.create( - model=model, stream=True, messages=messages, **call_kwargs - ) - ), - response_model=response_model, - json_mode=json_mode, - ) - - return inner - - if response_model: - return extract_stream_decorator if stream else extract_decorator - if stream: - return stream_decorator - return call_decorator - - -@overload -def openai_call_async( - model: str, - *, - stream: Literal[False] = False, - tools: list[type[BaseTool] | Callable] | None = None, - response_model: None = None, - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], - Callable[_P, Awaitable[OpenAICallResponse]], -]: - ... # pragma: no cover - - -@overload -def openai_call_async( - model: str, - *, - stream: Literal[False] = False, - tools: list[type[BaseTool] | Callable] | None = None, - response_model: type[_ResponseModelT], - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], - Callable[_P, Awaitable[_ResponseModelT]], -]: - ... # pragma: no cover - - -@overload -def openai_call_async( - model: str, - *, - stream: Literal[True], - tools: list[type[BaseTool] | Callable] | None = None, - response_model: None = None, - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], - Callable[_P, Awaitable[OpenAIAsyncStream]], -]: - ... # pragma: no cover - - -@overload -def openai_call_async( - model: str, - *, - stream: Literal[True], - tools: list[type[BaseTool] | Callable] | None = None, - response_model: type[_ResponseModelT], - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], - Callable[_P, Awaitable[AsyncIterable[_ResponseModelT]]], -]: - ... # pragma: no cover - - -def openai_call_async( - model: str, - *, - stream: bool = False, - tools: list[type[BaseTool] | Callable] | None = None, - response_model: type[_ResponseModelT] | None = None, - **call_params: Unpack[OpenAICallParams], -) -> Callable[ - [Callable[_P, Awaitable[OpenAICallFunctionReturn]]], - Callable[ - _P, - Awaitable[OpenAICallResponse] - | Awaitable[OpenAIAsyncStream] - | Awaitable[_ResponseModelT] - | Awaitable[AsyncIterable[_ResponseModelT]], - ], -]: - '''A decorator for calling the AsyncOpenAI API with a typed function. - - This decorator is used to wrap a typed function that calls the OpenAI API. It parses - the docstring of the wrapped function as the messages array and templates the input - arguments for the function into each message's template. - - Example: - - ```python - @openai_call_async(model="gpt-4o") - async def recommend_book(genre: str): - """Recommend a {genre} book.""" - - async def run(): - response = await recommend_book("fantasy") - - asyncio.run(run()) - ``` - - Args: - model: The OpenAI model to use in the API call. - stream: Whether to stream the response from the API call. - tools: The tools to use in the OpenAI API call. - **call_params: The `OpenAICallParams` call parameters to use in the API call. - - Returns: - The decorator for turning a typed function into an AsyncOpenAI API call. - ''' - - def call_decorator( - fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], - ) -> Callable[_P, Awaitable[OpenAICallResponse]]: - @wraps(fn) - async def inner_async( - *args: _P.args, **kwargs: _P.kwargs - ) -> OpenAICallResponse: - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = await fn(*args, **kwargs) - prompt_template, messages, tool_types, call_kwargs = setup_call( - fn, fn_args, fn_return, tools, call_params - ) - client = AsyncOpenAI() - start_time = datetime.datetime.now().timestamp() * 1000 - response = await client.chat.completions.create( - model=model, stream=False, messages=messages, **call_kwargs - ) - return OpenAICallResponse( - response=response, - tool_types=tool_types, - prompt_template=prompt_template, - fn_args=fn_args, - fn_return=fn_return, - messages=messages, - call_params=call_kwargs, - user_message_param=messages[-1] - if messages[-1]["role"] == "user" - else None, - start_time=start_time, - end_time=datetime.datetime.now().timestamp() * 1000, - cost=openai_api_calculate_cost(response.usage, response.model), - ) - - return inner_async - - def stream_decorator( - fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], - ) -> Callable[_P, Awaitable[OpenAIAsyncStream]]: - @wraps(fn) - async def inner_async(*args: _P.args, **kwargs: _P.kwargs) -> OpenAIAsyncStream: - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = await fn(*args, **kwargs) - _, messages, tool_types, call_kwargs = setup_call( - fn, fn_args, fn_return, tools, call_params - ) - client = AsyncOpenAI() - - if not isinstance(client, AsyncAzureOpenAI): - call_kwargs["stream_options"] = {"include_usage": True} - - stream = await client.chat.completions.create( - model=model, stream=True, messages=messages, **call_kwargs - ) - - async def generator(): - async for chunk in stream: - yield OpenAICallResponseChunk( - chunk=chunk, - user_message_param=messages[-1] - if messages[-1]["role"] == "user" - else None, - tool_types=tool_types, - cost=openai_api_calculate_cost(chunk.usage, chunk.model), - ) - - return OpenAIAsyncStream(generator()) - - return inner_async - - def extract_decorator( - fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], - ) -> Callable[_P, Awaitable[_ResponseModelT]]: - nonlocal response_model - assert response_model is not None - tool = _utils.setup_extract_tool(response_model, OpenAITool) - - @wraps(fn) - async def inner(*args: _P.args, **kwargs: _P.kwargs) -> _ResponseModelT: - assert response_model is not None - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = await fn(*args, **kwargs) - json_mode, messages, call_kwargs = setup_extract( - fn, fn_args, fn_return, tool, call_params - ) - client = AsyncOpenAI() - response = await client.chat.completions.create( - model=model, stream=False, messages=messages, **call_kwargs - ) - - if json_mode and (content := response.choices[0].message.content): - json_output = content - elif tool_calls := response.choices[0].message.tool_calls: - json_output = tool_calls[0].function.arguments - else: - raise ValueError("No tool call or JSON object found in response.") - - output = _utils.extract_tool_return(response_model, json_output, False) - if isinstance(response_model, BaseModel): - output._response = response # type: ignore - return output - - return inner - - def extract_stream_decorator( - fn: Callable[_P, Awaitable[OpenAICallFunctionReturn]], - ) -> Callable[_P, Awaitable[AsyncIterable[_ResponseModelT]]]: - assert response_model is not None - tool = _utils.setup_extract_tool(response_model, OpenAITool) - - @wraps(fn) - async def inner( - *args: _P.args, **kwargs: _P.kwargs - ) -> AsyncIterable[_ResponseModelT]: - assert response_model is not None - fn_args = inspect.signature(fn).bind(*args, **kwargs).arguments - fn_return = await fn(*args, **kwargs) - json_mode, messages, call_kwargs = setup_extract( - fn, fn_args, fn_return, tool, call_params - ) - client = AsyncOpenAI() - return OpenAIAsyncStructuredStream( - stream=( - chunk - async for chunk in await client.chat.completions.create( - model=model, stream=True, messages=messages, **call_kwargs - ) - ), - response_model=response_model, - json_mode=json_mode, - ) - - return inner - - if response_model: - return extract_stream_decorator if stream else extract_decorator - if stream: - return stream_decorator - return call_decorator diff --git a/mirascope/core/openai/streams.py b/mirascope/core/openai/streams.py deleted file mode 100644 index 8e385a4b6..000000000 --- a/mirascope/core/openai/streams.py +++ /dev/null @@ -1,159 +0,0 @@ -"""This module contains classes for streaming responses from OpenAI's API.""" - -from collections.abc import AsyncGenerator, Generator - -from openai.types.chat import ( - ChatCompletionAssistantMessageParam, - ChatCompletionMessageToolCall, - ChatCompletionUserMessageParam, -) -from openai.types.chat.chat_completion_message_tool_call import Function - -from ..base import BaseAsyncStream, BaseStream -from .call_response import OpenAICallResponse -from .call_response_chunk import OpenAICallResponseChunk -from .tools import OpenAITool - - -def _handle_chunk( - chunk: OpenAICallResponseChunk, - current_tool_call: ChatCompletionMessageToolCall, - current_tool_type: type[OpenAITool] | None, -) -> tuple[ - OpenAITool | None, - ChatCompletionMessageToolCall, - type[OpenAITool] | None, -]: - """Handles a chunk of the stream.""" - if not chunk.tool_types or not chunk.tool_calls: - return None, current_tool_call, current_tool_type - - tool_call = chunk.tool_calls[0] - # Reset on new tool - if tool_call.id and tool_call.function is not None: - previous_tool_call = current_tool_call.model_copy() - previous_tool_type = current_tool_type - current_tool_call = ChatCompletionMessageToolCall( - id=tool_call.id, - function=Function( - arguments="", - name=tool_call.function.name if tool_call.function.name else "", - ), - type="function", - ) - current_tool_type = None - for tool_type in chunk.tool_types: - if tool_type._name() == tool_call.function.name: - current_tool_type = tool_type - break - if current_tool_type is None: - raise RuntimeError( - f"Unknown tool type in stream: {tool_call.function.name}" - ) - if previous_tool_call.id and previous_tool_type is not None: - return ( - previous_tool_type.from_tool_call(previous_tool_call), - current_tool_call, - current_tool_type, - ) - - # Update arguments with each chunk - if tool_call.function and tool_call.function.arguments: - current_tool_call.function.arguments += tool_call.function.arguments - - return None, current_tool_call, current_tool_type - - -class OpenAIStream( - BaseStream[ - OpenAICallResponseChunk, - ChatCompletionUserMessageParam, - ChatCompletionAssistantMessageParam, - OpenAITool, - ] -): - """A class for streaming responses from OpenAI's API.""" - - def __init__(self, stream: Generator[OpenAICallResponseChunk, None, None]): - """Initializes an instance of `OpenAIStream`.""" - super().__init__(stream, ChatCompletionAssistantMessageParam) - - def __iter__( - self, - ) -> Generator[tuple[OpenAICallResponseChunk, OpenAITool | None], None, None]: - """Iterator over the stream and constructs tools as they are streamed.""" - current_tool_call = ChatCompletionMessageToolCall( - id="", function=Function(arguments="", name=""), type="function" - ) - current_tool_type, tool_calls = None, [] - for chunk, _ in super().__iter__(): - if not chunk.tool_types or not chunk.tool_calls: - if current_tool_type: - yield chunk, current_tool_type.from_tool_call(current_tool_call) - tool_calls.append(current_tool_call) - current_tool_type = None - else: - yield chunk, None - tool, current_tool_call, current_tool_type = _handle_chunk( - chunk, current_tool_call, current_tool_type - ) - if tool is not None: - yield chunk, tool - tool_calls.append(tool.tool_call) - if tool_calls: - self.message_param["tool_calls"] = tool_calls # type: ignore - - @classmethod - def tool_message_params(cls, tools_and_outputs): - """Returns the tool message parameters for tool call results.""" - return OpenAICallResponse.tool_message_params(tools_and_outputs) - - -class OpenAIAsyncStream( - BaseAsyncStream[ - OpenAICallResponseChunk, - ChatCompletionUserMessageParam, - ChatCompletionAssistantMessageParam, - OpenAITool, - ] -): - """A class for streaming responses from OpenAI's API.""" - - def __init__(self, stream: AsyncGenerator[OpenAICallResponseChunk, None]): - """Initializes an instance of `OpenAIAsyncStream`.""" - super().__init__(stream, ChatCompletionAssistantMessageParam) - - def __aiter__( - self, - ) -> AsyncGenerator[tuple[OpenAICallResponseChunk, OpenAITool | None], None]: - """Iterator over the stream and constructs tools as they are streamed.""" - stream = super().__aiter__() - - async def generator(): - current_tool_call = ChatCompletionMessageToolCall( - id="", function=Function(arguments="", name=""), type="function" - ) - current_tool_type, tool_calls = None, [] - async for chunk, _ in stream: - if not chunk.tool_types or not chunk.tool_calls: - if current_tool_type: - yield chunk, current_tool_type.from_tool_call(current_tool_call) - tool_calls.append(current_tool_call) - current_tool_type = None - else: - yield chunk, None - tool, current_tool_call, current_tool_type = _handle_chunk( - chunk, current_tool_call, current_tool_type - ) - if tool is not None: - yield chunk, tool - tool_calls.append(tool.tool_call) - if tool_calls: - self.message_param["tool_calls"] = tool_calls # type: ignore - - return generator() - - @classmethod - def tool_message_params(cls, tools_and_outputs): - """Returns the tool message parameters for tool call results.""" - return OpenAICallResponse.tool_message_params(tools_and_outputs) diff --git a/mirascope/core/openai/structured_streams.py b/mirascope/core/openai/structured_streams.py deleted file mode 100644 index d5a1c29b6..000000000 --- a/mirascope/core/openai/structured_streams.py +++ /dev/null @@ -1,68 +0,0 @@ -"""This module contains classes for structured output streaming with OpenAI's API.""" - -from collections.abc import AsyncGenerator, Generator -from typing import Generic, TypeVar - -from openai.types.chat import ChatCompletionChunk -from pydantic import BaseModel - -from ..base import BaseAsyncStructuredStream, BaseStructuredStream, _utils - -_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel | _utils.BaseType) - - -class OpenAIStructuredStream( - Generic[_ResponseModelT], - BaseStructuredStream[ChatCompletionChunk, _ResponseModelT], -): - """A class for streaming structured outputs from OpenAI's API.""" - - def __iter__(self) -> Generator[_ResponseModelT, None, None]: - """Iterates over the stream and extracts structured outputs.""" - json_output = "" - for chunk in self.stream: - if self.json_mode and (content := chunk.choices[0].delta.content): - json_output += content - elif ( - (tool_calls := chunk.choices[0].delta.tool_calls) - and (function := tool_calls[0].function) - and (arguments := function.arguments) - ): - json_output += arguments - else: - ValueError("No tool call or JSON object found in response.") - if json_output: - yield _utils.extract_tool_return(self.response_model, json_output, True) - yield _utils.extract_tool_return(self.response_model, json_output, False) - - -class OpenAIAsyncStructuredStream( - Generic[_ResponseModelT], - BaseAsyncStructuredStream[ChatCompletionChunk, _ResponseModelT], -): - """A class for async streaming structured outputs from OpenAI's API.""" - - def __aiter__(self) -> AsyncGenerator[_ResponseModelT, None]: - """Iterates over the stream and extracts structured outputs.""" - - async def generator(): - nonlocal self - json_output = "" - async for chunk in self.stream: - if self.json_mode and (content := chunk.choices[0].delta.content): - json_output += content - elif ( - (tool_calls := chunk.choices[0].delta.tool_calls) - and (function := tool_calls[0].function) - and (arguments := function.arguments) - ): - json_output += arguments - else: - ValueError("No tool call or JSON object found in response.") - if json_output: - yield _utils.extract_tool_return( - self.response_model, json_output, True - ) - yield _utils.extract_tool_return(self.response_model, json_output, False) - - return generator() diff --git a/mirascope/core/openai/tools.py b/mirascope/core/openai/tool.py similarity index 100% rename from mirascope/core/openai/tools.py rename to mirascope/core/openai/tool.py diff --git a/tests/core/base/test_utils.py b/tests/core/base/test_utils.py index 989fcc524..f0e2593ad 100644 --- a/tests/core/base/test_utils.py +++ b/tests/core/base/test_utils.py @@ -114,8 +114,7 @@ def fn(model_name: str = "", self=None, cls=None) -> str: def test_convert_function_to_base_model_errors() -> None: """Tests the various `ValueErro` cases in `convert_function_to_base_model`.""" - def empty(param) -> str: - ... # pragma: no cover + def empty(param) -> str: ... # pragma: no cover with pytest.raises(ValueError): _utils.convert_function_to_base_tool(empty, BaseTool)