From bb5403e2425bfc996a6264da1204e02f5c1f34bb Mon Sep 17 00:00:00 2001 From: Anton Dubovik Date: Fri, 21 Jun 2024 14:50:48 +0100 Subject: [PATCH] fix: fixing performance regression (#121) --- .ort.yml | 8 ++--- aidial_adapter_openai/app.py | 18 ++++++++--- aidial_adapter_openai/constant.py | 4 --- aidial_adapter_openai/databricks.py | 25 +++++---------- aidial_adapter_openai/gpt.py | 36 ++++++++++------------ aidial_adapter_openai/mistral.py | 19 +++--------- aidial_adapter_openai/utils/http_client.py | 20 ++++++++++++ aidial_adapter_openai/utils/parsers.py | 4 +++ aidial_adapter_openai/utils/reflection.py | 13 ++++++-- aidial_adapter_openai/utils/streaming.py | 13 ++++++++ poetry.lock | 23 +++++++------- pyproject.toml | 5 +-- tests/conftest.py | 4 ++- 13 files changed, 112 insertions(+), 80 deletions(-) create mode 100644 aidial_adapter_openai/utils/http_client.py diff --git a/.ort.yml b/.ort.yml index 62ed830..b663ffc 100644 --- a/.ort.yml +++ b/.ort.yml @@ -22,9 +22,9 @@ resolutions: - message: ".*PyPI::tiktoken:0\\.7\\.0.*" reason: "CANT_FIX_EXCEPTION" comment: "MIT License: https://github.com/openai/tiktoken/blob/0.7.0/LICENSE" - - message: ".*PyPI::httpcore:0\\.18\\.0.*" + - message: ".*PyPI::httpcore:1\\.0\\.5.*" reason: "CANT_FIX_EXCEPTION" - comment: "BSD 3-Clause New or Revised License: https://github.com/encode/httpcore/blob/0.18.0/LICENSE.md" - - message: ".*PyPI::httpx:0\\.25\\.0.*" + comment: "BSD 3-Clause New or Revised License: https://github.com/encode/httpcore/blob/1.0.5/LICENSE.md" + - message: ".*PyPI::httpx:0\\.27\\.0.*" reason: "CANT_FIX_EXCEPTION" - comment: "BSD 3-Clause New or Revised License: https://github.com/encode/httpx/blob/0.25.0/LICENSE.md" \ No newline at end of file + comment: "BSD 3-Clause New or Revised License: https://github.com/encode/httpx/blob/0.27.0/LICENSE.md" \ No newline at end of file diff --git a/aidial_adapter_openai/app.py b/aidial_adapter_openai/app.py index 12fce06..0b7f28b 100644 --- a/aidial_adapter_openai/app.py +++ b/aidial_adapter_openai/app.py @@ -1,5 +1,6 @@ import json import os +from contextlib import asynccontextmanager from typing import Awaitable, Dict, TypeVar from aidial_sdk.exceptions import HTTPException as DialException @@ -10,7 +11,6 @@ from fastapi.responses import JSONResponse, Response from openai import APIConnectionError, APIStatusError, APITimeoutError -from aidial_adapter_openai.constant import DEFAULT_TIMEOUT from aidial_adapter_openai.dalle3 import ( chat_completion as dalle3_chat_completion, ) @@ -26,7 +26,8 @@ chat_completion as mistral_chat_completion, ) from aidial_adapter_openai.utils.auth import get_credentials -from aidial_adapter_openai.utils.log_config import configure_loggers +from aidial_adapter_openai.utils.http_client import get_http_client +from aidial_adapter_openai.utils.log_config import configure_loggers, logger from aidial_adapter_openai.utils.parsers import ( embeddings_parser, parse_body, @@ -36,7 +37,16 @@ from aidial_adapter_openai.utils.storage import create_file_storage from aidial_adapter_openai.utils.tokens import Tokenizer -app = FastAPI() + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + logger.info("Application shutdown") + await get_http_client().aclose() + + +app = FastAPI(lifespan=lifespan) + init_telemetry(app, TelemetryConfig()) configure_loggers() @@ -200,7 +210,7 @@ async def embedding(deployment_id: str, request: Request): upstream_endpoint = request.headers["X-UPSTREAM-ENDPOINT"] client = embeddings_parser.parse(upstream_endpoint).get_client( - {**creds, "api_version": api_version, "timeout": DEFAULT_TIMEOUT} + {**creds, "api_version": api_version} ) return await handle_exceptions( diff --git a/aidial_adapter_openai/constant.py b/aidial_adapter_openai/constant.py index 42016e3..e69de29 100644 --- a/aidial_adapter_openai/constant.py +++ b/aidial_adapter_openai/constant.py @@ -1,4 +0,0 @@ -from openai import Timeout - -# connect timeout and total timeout -DEFAULT_TIMEOUT = Timeout(600, connect=10) diff --git a/aidial_adapter_openai/databricks.py b/aidial_adapter_openai/databricks.py index 98cb800..c28d75e 100644 --- a/aidial_adapter_openai/databricks.py +++ b/aidial_adapter_openai/databricks.py @@ -1,29 +1,25 @@ -from typing import Any +from typing import Any, cast from fastapi.responses import StreamingResponse from openai import AsyncStream from openai.types.chat.chat_completion import ChatCompletion from openai.types.chat.chat_completion_chunk import ChatCompletionChunk -from aidial_adapter_openai.constant import DEFAULT_TIMEOUT from aidial_adapter_openai.utils.auth import OpenAICreds -from aidial_adapter_openai.utils.log_config import logger -from aidial_adapter_openai.utils.parsers import chat_completions_parser +from aidial_adapter_openai.utils.parsers import ( + OpenAIParams, + chat_completions_parser, +) from aidial_adapter_openai.utils.reflection import call_with_extra_body from aidial_adapter_openai.utils.sse_stream import to_openai_sse_stream -from aidial_adapter_openai.utils.streaming import map_stream - - -def debug_print(chunk): - logger.debug(f"chunk: {chunk}") - return chunk +from aidial_adapter_openai.utils.streaming import chunk_to_dict, map_stream async def chat_completion( data: Any, upstream_endpoint: str, creds: OpenAICreds ): client = chat_completions_parser.parse(upstream_endpoint).get_client( - {**creds, "timeout": DEFAULT_TIMEOUT} + cast(OpenAIParams, creds) ) response: AsyncStream[ChatCompletionChunk] | ChatCompletion = ( @@ -32,12 +28,7 @@ async def chat_completion( if isinstance(response, AsyncStream): return StreamingResponse( - to_openai_sse_stream( - map_stream( - debug_print, - map_stream(lambda chunk: chunk.to_dict(), response), - ) - ), + to_openai_sse_stream(map_stream(chunk_to_dict, response)), media_type="text/event-stream", ) else: diff --git a/aidial_adapter_openai/gpt.py b/aidial_adapter_openai/gpt.py index 2544e84..5798653 100644 --- a/aidial_adapter_openai/gpt.py +++ b/aidial_adapter_openai/gpt.py @@ -4,21 +4,19 @@ from openai.types.chat.chat_completion import ChatCompletion from openai.types.chat.chat_completion_chunk import ChatCompletionChunk -from aidial_adapter_openai.constant import DEFAULT_TIMEOUT from aidial_adapter_openai.utils.auth import OpenAICreds -from aidial_adapter_openai.utils.log_config import logger from aidial_adapter_openai.utils.parsers import chat_completions_parser from aidial_adapter_openai.utils.reflection import call_with_extra_body from aidial_adapter_openai.utils.sse_stream import to_openai_sse_stream -from aidial_adapter_openai.utils.streaming import generate_stream, map_stream +from aidial_adapter_openai.utils.streaming import ( + chunk_to_dict, + debug_print, + generate_stream, + map_stream, +) from aidial_adapter_openai.utils.tokens import Tokenizer, discard_messages -def debug_print(chunk): - logger.debug(f"chunk: {chunk}") - return chunk - - async def gpt_chat_completion( data: dict, deployment_id: str, @@ -49,7 +47,7 @@ async def gpt_chat_completion( ) client = chat_completions_parser.parse(upstream_endpoint).get_client( - {**creds, "api_version": api_version, "timeout": DEFAULT_TIMEOUT} + {**creds, "api_version": api_version} ) response: AsyncStream[ChatCompletionChunk] | ChatCompletion = ( @@ -57,19 +55,17 @@ async def gpt_chat_completion( ) if isinstance(response, AsyncStream): + prompt_tokens = tokenizer.calculate_prompt_tokens(data["messages"]) return StreamingResponse( to_openai_sse_stream( - map_stream( - debug_print, - generate_stream( - prompt_tokens, - map_stream(lambda obj: obj.to_dict(), response), - tokenizer, - deployment_id, - discarded_messages, - ), - ) + generate_stream( + prompt_tokens, + map_stream(chunk_to_dict, response), + tokenizer, + deployment_id, + discarded_messages, + ), ), media_type="text/event-stream", ) @@ -77,5 +73,5 @@ async def gpt_chat_completion( resp = response.to_dict() if discarded_messages is not None: resp |= {"statistics": {"discarded_messages": discarded_messages}} - debug_print(resp) + debug_print("response", resp) return resp diff --git a/aidial_adapter_openai/mistral.py b/aidial_adapter_openai/mistral.py index 795053a..cab8304 100644 --- a/aidial_adapter_openai/mistral.py +++ b/aidial_adapter_openai/mistral.py @@ -5,17 +5,11 @@ from openai.types.chat.chat_completion import ChatCompletion from openai.types.chat.chat_completion_chunk import ChatCompletionChunk -from aidial_adapter_openai.constant import DEFAULT_TIMEOUT from aidial_adapter_openai.utils.auth import OpenAICreds -from aidial_adapter_openai.utils.log_config import logger +from aidial_adapter_openai.utils.http_client import get_http_client from aidial_adapter_openai.utils.reflection import call_with_extra_body from aidial_adapter_openai.utils.sse_stream import to_openai_sse_stream -from aidial_adapter_openai.utils.streaming import map_stream - - -def debug_print(chunk): - logger.debug(f"chunk: {chunk}") - return chunk +from aidial_adapter_openai.utils.streaming import chunk_to_dict, map_stream async def chat_completion( @@ -24,8 +18,8 @@ async def chat_completion( client = AsyncOpenAI( base_url=upstream_endpoint, - timeout=DEFAULT_TIMEOUT, api_key=creds.get("api_key"), + http_client=get_http_client(), ) response: AsyncStream[ChatCompletionChunk] | ChatCompletion = ( @@ -34,12 +28,7 @@ async def chat_completion( if isinstance(response, AsyncStream): return StreamingResponse( - to_openai_sse_stream( - map_stream( - debug_print, - map_stream(lambda chunk: chunk.to_dict(), response), - ) - ), + to_openai_sse_stream(map_stream(chunk_to_dict, response)), media_type="text/event-stream", ) else: diff --git a/aidial_adapter_openai/utils/http_client.py b/aidial_adapter_openai/utils/http_client.py new file mode 100644 index 0000000..7ccc2ca --- /dev/null +++ b/aidial_adapter_openai/utils/http_client.py @@ -0,0 +1,20 @@ +import functools + +import httpx + +# connect timeout and total timeout +DEFAULT_TIMEOUT = httpx.Timeout(600, connect=10) + +# Borrowed from openai._constants.DEFAULT_CONNECTION_LIMITS +DEFAULT_CONNECTION_LIMITS = httpx.Limits( + max_connections=1000, max_keepalive_connections=100 +) + + +@functools.cache +def get_http_client() -> httpx.AsyncClient: + return httpx.AsyncClient( + timeout=DEFAULT_TIMEOUT, + limits=DEFAULT_CONNECTION_LIMITS, + follow_redirects=True, + ) diff --git a/aidial_adapter_openai/utils/parsers.py b/aidial_adapter_openai/utils/parsers.py index 73d4a04..8646caa 100644 --- a/aidial_adapter_openai/utils/parsers.py +++ b/aidial_adapter_openai/utils/parsers.py @@ -8,6 +8,8 @@ from openai import AsyncAzureOpenAI, AsyncOpenAI, Timeout from pydantic import BaseModel +from aidial_adapter_openai.utils.http_client import get_http_client + class OpenAIParams(TypedDict, total=False): api_key: str @@ -34,6 +36,7 @@ def get_client(self, params: OpenAIParams) -> AsyncAzureOpenAI: azure_ad_token=params.get("azure_ad_token"), api_version=params.get("api_version"), timeout=params.get("timeout"), + http_client=get_http_client(), ) @@ -45,6 +48,7 @@ def get_client(self, params: OpenAIParams) -> AsyncOpenAI: base_url=self.base_url, api_key=params.get("api_key"), timeout=params.get("timeout"), + http_client=get_http_client(), ) diff --git a/aidial_adapter_openai/utils/reflection.py b/aidial_adapter_openai/utils/reflection.py index 916a0f7..af6de98 100644 --- a/aidial_adapter_openai/utils/reflection.py +++ b/aidial_adapter_openai/utils/reflection.py @@ -1,8 +1,17 @@ +import functools import inspect from typing import Any, Callable, Coroutine, TypeVar from aidial_sdk.exceptions import HTTPException as DialException + +@functools.lru_cache(maxsize=64) +def _inspect_signature( + func: Callable[..., Coroutine[Any, Any, Any]] +) -> inspect.Signature: + return inspect.signature(func) + + T = TypeVar("T") @@ -12,7 +21,7 @@ async def call_with_extra_body( if has_kwargs_argument(func): return await func(**arg) - expected_args = set(inspect.signature(func).parameters.keys()) + expected_args = set(_inspect_signature(func).parameters.keys()) actual_args = set(arg.keys()) extra_args = actual_args - expected_args @@ -37,7 +46,7 @@ def has_kwargs_argument(func: Callable[..., Coroutine[Any, Any, Any]]) -> bool: """ Determines if the given function accepts a variable keyword argument (**kwargs). """ - signature = inspect.signature(func) + signature = _inspect_signature(func) for param in signature.parameters.values(): if param.kind == inspect.Parameter.VAR_KEYWORD: return True diff --git a/aidial_adapter_openai/utils/streaming.py b/aidial_adapter_openai/utils/streaming.py index d1aae2c..70162a9 100644 --- a/aidial_adapter_openai/utils/streaming.py +++ b/aidial_adapter_openai/utils/streaming.py @@ -1,3 +1,4 @@ +import logging from time import time from typing import Any, AsyncIterator, Callable, Optional, TypeVar from uuid import uuid4 @@ -6,6 +7,7 @@ from aidial_sdk.utils.merge_chunks import merge from fastapi.responses import JSONResponse, Response, StreamingResponse from openai import APIError +from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from aidial_adapter_openai.utils.env import get_env_bool from aidial_adapter_openai.utils.log_config import logger @@ -196,3 +198,14 @@ async def map_stream( new_item = func(item) if new_item is not None: yield new_item + + +def debug_print(title: str, chunk: dict) -> None: + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"{title}: {chunk}") + + +def chunk_to_dict(chunk: ChatCompletionChunk) -> dict: + dict = chunk.to_dict() + debug_print("chunk", dict) + return dict diff --git a/poetry.lock b/poetry.lock index 80c898c..28e4b9c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "aidial-sdk" @@ -842,39 +842,40 @@ files = [ [[package]] name = "httpcore" -version = "0.18.0" +version = "1.0.5" description = "A minimal low-level HTTP client." optional = false python-versions = ">=3.8" files = [ - {file = "httpcore-0.18.0-py3-none-any.whl", hash = "sha256:adc5398ee0a476567bf87467063ee63584a8bce86078bf748e48754f60202ced"}, - {file = "httpcore-0.18.0.tar.gz", hash = "sha256:13b5e5cd1dca1a6636a6aaea212b19f4f85cd88c366a2b82304181b769aab3c9"}, + {file = "httpcore-1.0.5-py3-none-any.whl", hash = "sha256:421f18bac248b25d310f3cacd198d55b8e6125c107797b609ff9b7a6ba7991b5"}, + {file = "httpcore-1.0.5.tar.gz", hash = "sha256:34a38e2f9291467ee3b44e89dd52615370e152954ba21721378a87b2960f7a61"}, ] [package.dependencies] -anyio = ">=3.0,<5.0" certifi = "*" h11 = ">=0.13,<0.15" -sniffio = "==1.*" [package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] http2 = ["h2 (>=3,<5)"] socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<0.26.0)"] [[package]] name = "httpx" -version = "0.25.0" +version = "0.27.0" description = "The next generation HTTP client." optional = false python-versions = ">=3.8" files = [ - {file = "httpx-0.25.0-py3-none-any.whl", hash = "sha256:181ea7f8ba3a82578be86ef4171554dd45fec26a02556a744db029a0a27b7100"}, - {file = "httpx-0.25.0.tar.gz", hash = "sha256:47ecda285389cb32bb2691cc6e069e3ab0205956f681c5b2ad2325719751d875"}, + {file = "httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5"}, + {file = "httpx-0.27.0.tar.gz", hash = "sha256:a0cb88a46f32dc874e04ee956e4c2764aba2aa228f650b06788ba6bda2962ab5"}, ] [package.dependencies] +anyio = "*" certifi = "*" -httpcore = ">=0.18.0,<0.19.0" +httpcore = "==1.*" idna = "*" sniffio = "*" @@ -2436,4 +2437,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.13" -content-hash = "de645b9905e10080c3ae3f9343a8c1001a87046425152dfcf93633db228a7233" +content-hash = "eadd9f526c195228ed213521514840f62efacd6584c7f9b32ba1c1b739bcdbef" diff --git a/pyproject.toml b/pyproject.toml index d37052d..7a4eb61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,11 +27,12 @@ tiktoken = "0.7.0" uvicorn = "0.23" wrapt = "^1.15.0" pydantic = "^1.10.12" -numpy = "^1.26.0" # need for openai embeddings +# required by openai embeddings; avoiding openai[datalib], +# since it also depends on pandas +numpy = "^1.26.0" pillow = "^10.3.0" azure-identity = "^1.16.1" aidial-sdk = {version = "^0.8.0", extras = ["telemetry"]} -httpx = "^0.25.0" # TODO: remove once SDK supports conditional instrumentation [tool.poetry.group.test.dependencies] pytest = "7.4.0" diff --git a/tests/conftest.py b/tests/conftest.py index d3e51af..3cc2c18 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import httpx import pytest_asyncio +from httpx import ASGITransport from aidial_adapter_openai.app import app @@ -7,6 +8,7 @@ @pytest_asyncio.fixture async def test_app(): async with httpx.AsyncClient( - app=app, base_url="http://test-app.com" + transport=ASGITransport(app=app), # type: ignore + base_url="http://test-app.com", ) as client: yield client