From ef357ce5328e8045a3c3681d0d4af582190c977c Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Wed, 8 Oct 2025 16:48:56 -0700 Subject: [PATCH 1/5] [serve][llm] Remove request ID workaround Signed-off-by: Seiji Eicher --- .../serve/configs/openai_api_models.py | 22 --------- .../serve/deployments/llm/vllm/vllm_engine.py | 47 ++----------------- 2 files changed, 4 insertions(+), 65 deletions(-) diff --git a/python/ray/llm/_internal/serve/configs/openai_api_models.py b/python/ray/llm/_internal/serve/configs/openai_api_models.py index 78d4f4687e25..d2fd84ebacaa 100644 --- a/python/ray/llm/_internal/serve/configs/openai_api_models.py +++ b/python/ray/llm/_internal/serve/configs/openai_api_models.py @@ -9,7 +9,6 @@ from pydantic import ( BaseModel, ConfigDict, - Field, ) from vllm.entrypoints.openai.protocol import ( ChatCompletionRequest as vLLMChatCompletionRequest, @@ -26,7 +25,6 @@ ScoreRequest as vLLMScoreRequest, ScoreResponse as vLLMScoreResponse, ) -from vllm.utils import random_uuid if TYPE_CHECKING: from ray.llm._internal.serve.configs.server_models import LLMConfig @@ -52,19 +50,9 @@ class ErrorResponse(vLLMErrorResponse): model_config = ConfigDict(arbitrary_types_allowed=True) -# TODO (Kourosh): Upstream class CompletionRequest(vLLMCompletionRequest): model_config = ConfigDict(arbitrary_types_allowed=True) - request_id: str = Field( - default_factory=lambda: f"{random_uuid()}", - description=( - "The request_id related to this request. If the caller does " - "not set it, a random_uuid will be generated. This id is used " - "through out the inference process and return in response." - ), - ) - class CompletionResponse(vLLMCompletionResponse): model_config = ConfigDict(arbitrary_types_allowed=True) @@ -74,19 +62,9 @@ class CompletionStreamResponse(vLLMCompletionStreamResponse): model_config = ConfigDict(arbitrary_types_allowed=True) -# TODO (Kourosh): Upstream class EmbeddingCompletionRequest(vLLMEmbeddingCompletionRequest): model_config = ConfigDict(arbitrary_types_allowed=True) - request_id: str = Field( - default_factory=lambda: f"{random_uuid()}", - description=( - "The request_id related to this request. If the caller does " - "not set it, a random_uuid will be generated. This id is used " - "through out the inference process and return in response." - ), - ) - class EmbeddingChatRequest(vLLMEmbeddingChatRequest): model_config = ConfigDict(arbitrary_types_allowed=True) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index d8eeea45801a..dbd8105bd718 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING, AsyncGenerator, Optional, Tuple, Union from starlette.datastructures import State -from starlette.requests import Request from vllm.engine.arg_utils import AsyncEngineArgs from vllm.entrypoints.openai.cli_args import FrontendArgs from vllm.entrypoints.openai.protocol import ErrorResponse as VLLMErrorResponse @@ -358,35 +357,13 @@ async def resolve_lora(self, disk_lora_model: DiskMultiplexConfig): if isinstance(lora_request, VLLMErrorResponse): raise ValueError(f"Failed to load lora model: {lora_request.error.message}") - def _create_raw_request( - self, - request: Union[ - CompletionRequest, ChatCompletionRequest, EmbeddingRequest, ScoreRequest - ], - path: str, - ) -> Request: - scope = { - "type": "http", - "method": "POST", - "path": path, - "headers": [(b"x-request-id", getattr(request, "request_id", "").encode())], - "query_string": b"", - } - return Request(scope) - async def chat( self, request: ChatCompletionRequest ) -> AsyncGenerator[Union[str, ChatCompletionResponse, ErrorResponse], None]: self._validate_openai_serving_chat() - # TODO (Kourosh): Remove when we upstream request_id attribute to vLLM. - # PR: https://github.com/vllm-project/vllm/pull/21009 - # Create a fake starlette.Request object with the x-request-id header - # so that the create_chat_completion API can assign the request_id properly. - raw_request = self._create_raw_request(request, "/chat/completions") - chat_response = await self._oai_serving_chat.create_chat_completion( # type: ignore[attr-defined] - request, raw_request=raw_request + request ) if isinstance(chat_response, AsyncGenerator): @@ -407,15 +384,8 @@ async def completions( ) -> AsyncGenerator[Union[str, CompletionResponse, ErrorResponse], None]: self._validate_openai_serving_completion() - # TODO (Kourosh): Remove when we upstream request_id attribute to vLLM. - # PR: https://github.com/vllm-project/vllm/pull/21009 - # Create a fake starlette.Request object with the x-request-id header - # so that the create_completion API can assign the request_id properly. - raw_request = self._create_raw_request(request, "/completions") - completion_response = await self._oai_serving_completion.create_completion( # type: ignore[attr-defined] - request, - raw_request=raw_request, + request ) if isinstance(completion_response, AsyncGenerator): @@ -438,13 +408,8 @@ async def embeddings( ) -> AsyncGenerator[Union[EmbeddingResponse, ErrorResponse], None]: self._validate_openai_serving_embedding() - # TODO (Kourosh): Remove when upstream is fixed to accept req_id. - # Create a fake starlette.Request object with the x-request-id header - # so that the create_embedding API can assign the request_id properly. - raw_request = self._create_raw_request(request, "/embeddings") - embedding_response = await self._oai_serving_embedding.create_embedding( # type: ignore[attr-defined] - request, raw_request=raw_request + request ) if isinstance(embedding_response, VLLMErrorResponse): @@ -459,11 +424,7 @@ async def score( ) -> AsyncGenerator[Union[ScoreResponse, ErrorResponse], None]: self._validate_openai_serving_scores() - raw_request = self._create_raw_request(request, "/score") - - score_response = await self._oai_serving_scores.create_score( - request, raw_request=raw_request - ) + score_response = await self._oai_serving_scores.create_score(request) if isinstance(score_response, VLLMErrorResponse): yield ErrorResponse(**score_response.model_dump()) From e75ccb43006e14d94be1fbe3dd0cd16da14c37ae Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Wed, 8 Oct 2025 18:10:50 -0700 Subject: [PATCH 2/5] Remove unnecessary classes Signed-off-by: Seiji Eicher --- .../serve/configs/openai_api_models.py | 99 ++++++------------- 1 file changed, 30 insertions(+), 69 deletions(-) diff --git a/python/ray/llm/_internal/serve/configs/openai_api_models.py b/python/ray/llm/_internal/serve/configs/openai_api_models.py index d2fd84ebacaa..448c2bff1fc8 100644 --- a/python/ray/llm/_internal/serve/configs/openai_api_models.py +++ b/python/ray/llm/_internal/serve/configs/openai_api_models.py @@ -6,82 +6,43 @@ from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, List, Optional, Union -from pydantic import ( - BaseModel, - ConfigDict, -) +from pydantic import BaseModel, ConfigDict from vllm.entrypoints.openai.protocol import ( - ChatCompletionRequest as vLLMChatCompletionRequest, - ChatCompletionResponse as vLLMChatCompletionResponse, - ChatCompletionStreamResponse as vLLMChatCompletionStreamResponse, - CompletionRequest as vLLMCompletionRequest, - CompletionResponse as vLLMCompletionResponse, - CompletionStreamResponse as vLLMCompletionStreamResponse, - EmbeddingChatRequest as vLLMEmbeddingChatRequest, - EmbeddingCompletionRequest as vLLMEmbeddingCompletionRequest, - EmbeddingResponse as vLLMEmbeddingResponse, - ErrorInfo as vLLMErrorInfo, - ErrorResponse as vLLMErrorResponse, - ScoreRequest as vLLMScoreRequest, - ScoreResponse as vLLMScoreResponse, + ChatCompletionRequest, + ChatCompletionResponse, + ChatCompletionStreamResponse, + CompletionRequest, + CompletionResponse, + CompletionStreamResponse, + EmbeddingChatRequest, + EmbeddingCompletionRequest, + EmbeddingResponse, + ErrorInfo, + ErrorResponse, + ScoreRequest, + ScoreResponse, ) +__all__ = [ + "ChatCompletionRequest", + "ChatCompletionResponse", + "ChatCompletionStreamResponse", + "CompletionRequest", + "CompletionResponse", + "CompletionStreamResponse", + "EmbeddingChatRequest", + "EmbeddingCompletionRequest", + "EmbeddingResponse", + "ErrorInfo", + "ErrorResponse", + "ScoreRequest", + "ScoreResponse", +] + if TYPE_CHECKING: from ray.llm._internal.serve.configs.server_models import LLMConfig -class ChatCompletionRequest(vLLMChatCompletionRequest): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ChatCompletionResponse(vLLMChatCompletionResponse): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ChatCompletionStreamResponse(vLLMChatCompletionStreamResponse): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ErrorInfo(vLLMErrorInfo): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ErrorResponse(vLLMErrorResponse): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class CompletionRequest(vLLMCompletionRequest): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class CompletionResponse(vLLMCompletionResponse): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class CompletionStreamResponse(vLLMCompletionStreamResponse): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class EmbeddingCompletionRequest(vLLMEmbeddingCompletionRequest): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class EmbeddingChatRequest(vLLMEmbeddingChatRequest): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class EmbeddingResponse(vLLMEmbeddingResponse): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ScoreRequest(vLLMScoreRequest): - model_config = ConfigDict(arbitrary_types_allowed=True) - - -class ScoreResponse(vLLMScoreResponse): - model_config = ConfigDict(arbitrary_types_allowed=True) - - EmbeddingRequest = Union[EmbeddingCompletionRequest, EmbeddingChatRequest] LLMEmbeddingsResponse = Union[ From 1ec8d025b3633fd31e53df7c955f8b47632ec5d5 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Thu, 9 Oct 2025 13:33:08 -0700 Subject: [PATCH 3/5] Propagate raw HTTP request in Ray Serve LLM APIs Signed-off-by: Seiji Eicher --- .../serve/deployments/llm/llm_engine.py | 13 ++++-- .../serve/deployments/llm/llm_server.py | 30 ++++++++---- .../serve/deployments/llm/vllm/vllm_engine.py | 19 ++++---- .../_internal/serve/deployments/protocol.py | 6 ++- .../serve/deployments/routers/router.py | 46 +++++++++++++------ 5 files changed, 77 insertions(+), 37 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py index 89bf1a2f5cc9..56fa98d9a541 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -1,5 +1,7 @@ import abc -from typing import TYPE_CHECKING, AsyncGenerator, Union +from typing import TYPE_CHECKING, AsyncGenerator, Optional, Union + +from fastapi import Request from ray.llm._internal.serve.configs.server_models import ( DiskMultiplexConfig, @@ -42,7 +44,7 @@ async def reset_prefix_cache(self) -> None: @abc.abstractmethod async def chat( - self, request: "ChatCompletionRequest" + self, request: "ChatCompletionRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[str, "ChatCompletionResponse", "ErrorResponse"], None]: """Run a ChatCompletion with the engine. @@ -56,6 +58,7 @@ async def chat( Args: request: The chat completion request. + raw_request: The raw FastAPI request object. Yields: Union[str, ChatCompletionResponse, ErrorResponse]: A string representing a chunk of the response, a ChatCompletionResponse object, or an ErrorResponse object. @@ -67,7 +70,7 @@ async def chat( @abc.abstractmethod async def completions( - self, request: "CompletionRequest" + self, request: "CompletionRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[str, "CompletionResponse", "ErrorResponse"], None]: """Run a Completion with the engine. @@ -85,6 +88,7 @@ async def completions( Args: request: The completion request. + raw_request: The raw FastAPI request object. Yields: Union[str, CompletionResponse, ErrorResponse]: A string @@ -98,7 +102,7 @@ async def completions( @abc.abstractmethod async def embeddings( - self, request: "EmbeddingRequest" + self, request: "EmbeddingRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[Union["EmbeddingResponse", "ErrorResponse"], None]: """Run an Embedding with the engine. @@ -112,6 +116,7 @@ async def embeddings( Args: request: The embedding request. + raw_request: The raw FastAPI request object. Returns: An async generator that yields EmbeddingResponse objects or ErrorResponse objects, and returns None when the generator is done. diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index 21c07adaa52d..54b94d1b3c8e 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -287,6 +287,7 @@ async def _run_request( *, engine_method: str, batch_output_stream: bool = False, + raw_request: Optional[Request] = None, ) -> AsyncGenerator[Any, None]: """Run the engine method on the request + perform batching when stream=True. @@ -294,6 +295,7 @@ async def _run_request( request: The request to run. engine_method: The method to call on the engine. batch_output_stream: Whether to batch the output stream. + raw_request: The raw FastAPI request object. Returns: An AsyncGenerator of the response. If stream is True and batching is enabled, then the generator will yield a list of streaming responses (strings of the format data: {response_json}\n\n). Otherwise, it will yield the non-streaming response from engine directly. @@ -305,15 +307,15 @@ async def _run_request( is_stream = hasattr(request, "stream") and request.stream if is_stream and batch_output_stream: stream = self._batch_output_stream( - getattr(self.engine, engine_method)(request) + getattr(self.engine, engine_method)(request, raw_request) ) else: - stream = getattr(self.engine, engine_method)(request) + stream = getattr(self.engine, engine_method)(request, raw_request) return stream async def chat( - self, request: "ChatCompletionRequest" + self, request: "ChatCompletionRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[ Union[List[Union[str, "ErrorResponse"]], "ChatCompletionResponse"], None ]: @@ -321,6 +323,7 @@ async def chat( Args: request: A ChatCompletionRequest object. + raw_request: The raw FastAPI request object. Returns: An AsyncGenerator of the response. If stream is True and batching is enabled, then the generator will yield a list of chat streaming responses (strings of the format data: {response_json}\n\n). Otherwise, it will yield the ChatCompletionResponse object directly. @@ -329,10 +332,11 @@ async def chat( request, engine_method="chat", batch_output_stream=True, + raw_request=raw_request, ) async def completions( - self, request: "CompletionRequest" + self, request: "CompletionRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[ Union[List[Union[str, "ErrorResponse"]], "CompletionResponse"], None ]: @@ -340,6 +344,7 @@ async def completions( Args: request: A CompletionRequest object. + raw_request: The raw FastAPI request object. Returns: An AsyncGenerator of the response. If stream is True and batching is enabled, then the generator will yield a list of completion streaming responses (strings of the format data: {response_json}\n\n). Otherwise, it will yield the CompletionResponse object directly. @@ -348,10 +353,11 @@ async def completions( request, engine_method="completions", batch_output_stream=True, + raw_request=raw_request, ) async def embeddings( - self, request: "EmbeddingRequest" + self, request: "EmbeddingRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[List["ErrorResponse"], "EmbeddingResponse"], None]: """Runs an embeddings request to the engine and returns the response. @@ -359,17 +365,21 @@ async def embeddings( Args: request: An EmbeddingRequest object. + raw_request: The raw FastAPI request object. Returns: An AsyncGenerator over the EmbeddingResponse object. """ # NOTE: Embeddings does not need batching. return await self._run_request( - request, engine_method="embeddings", batch_output_stream=False + request, + engine_method="embeddings", + batch_output_stream=False, + raw_request=raw_request, ) async def score( - self, request: "ScoreRequest" + self, request: "ScoreRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[Union["ScoreResponse", "ErrorResponse"], None]: """Runs a score request to the engine and returns the response. @@ -377,13 +387,17 @@ async def score( Args: request: A ScoreRequest object. + raw_request: The raw FastAPI request object. Returns: An AsyncGenerator over the ScoreResponse object. """ # NOTE: Score does not need batching, similar to embeddings. return await self._run_request( - request, engine_method="score", batch_output_stream=False + request, + engine_method="score", + batch_output_stream=False, + raw_request=raw_request, ) async def check_health(self) -> None: diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index a3c665546ef4..7a44b416c2f0 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -2,6 +2,7 @@ import os from typing import TYPE_CHECKING, AsyncGenerator, Optional, Tuple, Union +from fastapi import Request from starlette.datastructures import State from vllm.engine.arg_utils import AsyncEngineArgs from vllm.entrypoints.openai.cli_args import FrontendArgs @@ -343,12 +344,12 @@ async def resolve_lora(self, disk_lora_model: DiskMultiplexConfig): raise ValueError(f"Failed to load lora model: {lora_request.error.message}") async def chat( - self, request: ChatCompletionRequest + self, request: ChatCompletionRequest, raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[str, ChatCompletionResponse, ErrorResponse], None]: self._validate_openai_serving_chat() chat_response = await self._oai_serving_chat.create_chat_completion( # type: ignore[attr-defined] - request + request, raw_request ) if isinstance(chat_response, AsyncGenerator): @@ -365,12 +366,12 @@ async def chat( yield ChatCompletionResponse(**chat_response.model_dump()) async def completions( - self, request: CompletionRequest + self, request: CompletionRequest, raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[str, CompletionResponse, ErrorResponse], None]: self._validate_openai_serving_completion() completion_response = await self._oai_serving_completion.create_completion( # type: ignore[attr-defined] - request + request, raw_request ) if isinstance(completion_response, AsyncGenerator): @@ -389,12 +390,12 @@ async def completions( yield CompletionResponse(**completion_response.model_dump()) async def embeddings( - self, request: EmbeddingRequest + self, request: EmbeddingRequest, raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[EmbeddingResponse, ErrorResponse], None]: self._validate_openai_serving_embedding() embedding_response = await self._oai_serving_embedding.create_embedding( # type: ignore[attr-defined] - request + request, raw_request ) if isinstance(embedding_response, VLLMErrorResponse): @@ -405,11 +406,13 @@ async def embeddings( yield EmbeddingResponse(**embedding_response.model_dump()) async def score( - self, request: ScoreRequest + self, request: ScoreRequest, raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[ScoreResponse, ErrorResponse], None]: self._validate_openai_serving_scores() - score_response = await self._oai_serving_scores.create_score(request) + score_response = await self._oai_serving_scores.create_score( + request, raw_request + ) if isinstance(score_response, VLLMErrorResponse): yield ErrorResponse(**score_response.model_dump()) diff --git a/python/ray/llm/_internal/serve/deployments/protocol.py b/python/ray/llm/_internal/serve/deployments/protocol.py index 464865a5e9af..6bd0e1c077d4 100644 --- a/python/ray/llm/_internal/serve/deployments/protocol.py +++ b/python/ray/llm/_internal/serve/deployments/protocol.py @@ -1,6 +1,8 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, List, Optional, Union +from fastapi import Request + if TYPE_CHECKING: from ray.llm._internal.serve.configs.openai_api_models import ( ChatCompletionRequest, @@ -40,7 +42,7 @@ async def start(self): @abstractmethod async def chat( - self, request: "ChatCompletionRequest" + self, request: "ChatCompletionRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[Union[str, "ChatCompletionResponse", "ErrorResponse"], None]: """ Inferencing to the engine for chat, and return the response. @@ -49,7 +51,7 @@ async def chat( @abstractmethod async def completions( - self, request: "CompletionRequest" + self, request: "CompletionRequest", raw_request: Optional[Request] = None ) -> AsyncGenerator[ Union[List[Union[str, "ErrorResponse"]], "CompletionResponse"], None ]: diff --git a/python/ray/llm/_internal/serve/deployments/routers/router.py b/python/ray/llm/_internal/serve/deployments/routers/router.py index 2d3b51c39689..97b6cea7a123 100644 --- a/python/ray/llm/_internal/serve/deployments/routers/router.py +++ b/python/ray/llm/_internal/serve/deployments/routers/router.py @@ -16,7 +16,7 @@ Union, ) -from fastapi import FastAPI, HTTPException, status +from fastapi import FastAPI, HTTPException, Request, status from fastapi.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse, Response, StreamingResponse @@ -416,6 +416,7 @@ async def _get_response( CompletionRequest, ChatCompletionRequest, EmbeddingRequest, ScoreRequest ], call_method: str, + raw_request: Optional[Request] = None, ) -> AsyncGenerator[ Union[ LLMChatResponse, @@ -442,7 +443,9 @@ async def _get_response( if isinstance(body, ChatCompletionRequest): body = _sanitize_chat_completion_request(body) - async for response in getattr(model_handle, call_method).remote(body): + async for response in getattr(model_handle, call_method).remote( + body, raw_request + ): yield response async def model(self, model_id: str) -> Optional[ModelCard]: @@ -507,7 +510,10 @@ async def model_data(self, model: str) -> ModelCard: return model_data async def _process_llm_request( - self, body: Union[CompletionRequest, ChatCompletionRequest], is_chat: bool + self, + body: Union[CompletionRequest, ChatCompletionRequest], + is_chat: bool, + raw_request: Optional[Request] = None, ) -> Response: NoneStreamingResponseType = ( ChatCompletionResponse if is_chat else CompletionResponse @@ -516,7 +522,9 @@ async def _process_llm_request( async with router_request_timeout(DEFAULT_LLM_ROUTER_HTTP_TIMEOUT): - gen = self._get_response(body=body, call_method=call_method) + gen = self._get_response( + body=body, call_method=call_method, raw_request=raw_request + ) # In streaming with batching enabled, this first response can be a list of chunks. initial_response, gen = await _peek_at_generator(gen) @@ -544,42 +552,47 @@ async def _process_llm_request( openai_stream_generator, media_type="text/event-stream" ) - async def completions(self, body: CompletionRequest) -> Response: + async def completions(self, body: CompletionRequest, request: Request) -> Response: """Given a prompt, the model will return one or more predicted completions, and can also return the probabilities of alternative tokens at each position. Args: - body: The CompletionRequest object. + body: The completion request. + request: The raw FastAPI request object. Returns: A response object with completions. """ - return await self._process_llm_request(body, is_chat=False) + return await self._process_llm_request(body, is_chat=False, raw_request=request) - async def chat(self, body: ChatCompletionRequest) -> Response: + async def chat(self, body: ChatCompletionRequest, request: Request) -> Response: """Given a prompt, the model will return one or more predicted completions, and can also return the probabilities of alternative tokens at each position. Args: - body: The ChatCompletionRequest object. + body: The chat completion request. + request: The raw FastAPI request object. Returns: A response object with completions. """ - return await self._process_llm_request(body, is_chat=True) + return await self._process_llm_request(body, is_chat=True, raw_request=request) - async def embeddings(self, body: EmbeddingRequest) -> Response: + async def embeddings(self, body: EmbeddingRequest, request: Request) -> Response: """Create embeddings for the provided input. Args: - body: The EmbeddingRequest object. + body: The embedding request. + request: The raw FastAPI request object. Returns: A response object with embeddings. """ async with router_request_timeout(DEFAULT_LLM_ROUTER_HTTP_TIMEOUT): - results = self._get_response(body=body, call_method="embeddings") + results = self._get_response( + body=body, call_method="embeddings", raw_request=request + ) result = await results.__anext__() if isinstance(result, ErrorResponse): raise OpenAIHTTPException( @@ -591,20 +604,23 @@ async def embeddings(self, body: EmbeddingRequest) -> Response: if isinstance(result, EmbeddingResponse): return JSONResponse(content=result.model_dump()) - async def score(self, body: ScoreRequest) -> Response: + async def score(self, body: ScoreRequest, request: Request) -> Response: """Create scores for the provided text pairs. Note: This is a vLLM specific endpoint. Args: body: The score request containing input text pairs to score. + request: The raw FastAPI request object. Returns: A response object with scores. """ async with router_request_timeout(DEFAULT_LLM_ROUTER_HTTP_TIMEOUT): - results = self._get_response(body=body, call_method="score") + results = self._get_response( + body=body, call_method="score", raw_request=request + ) result = await results.__anext__() if isinstance(result, ErrorResponse): raise OpenAIHTTPException( From 9f7ea6581ece4101fa80752d7d554759e55d56d5 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Thu, 9 Oct 2025 15:17:38 -0700 Subject: [PATCH 4/5] Missing import Signed-off-by: Seiji Eicher --- python/ray/llm/_internal/serve/deployments/llm/llm_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index 54b94d1b3c8e..c063caeb31d2 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -13,6 +13,8 @@ Union, ) +from fastapi import Request + import ray from ray import serve from ray._common.utils import import_attr From c34f8c80294f3f42a2a48179d1591a04eb1c9c53 Mon Sep 17 00:00:00 2001 From: Seiji Eicher Date: Thu, 9 Oct 2025 18:36:38 -0700 Subject: [PATCH 5/5] WIP Signed-off-by: Seiji Eicher --- .../serve/deployments/llm/llm_engine.py | 23 +++-- .../serve/deployments/llm/llm_server.py | 19 +++- .../serve/deployments/llm/vllm/vllm_engine.py | 98 ++++++++++++++++++- .../serve/deployments/routers/router.py | 13 ++- .../cpu/deployments/llm/test_llm_server.py | 54 ++++++++++ .../cpu/deployments/routers/test_router.py | 59 +++++++++++ .../llm/tests/serve/mocks/mock_vllm_engine.py | 19 +++- 7 files changed, 263 insertions(+), 22 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py index 56fa98d9a541..031d0a9d2cf9 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -1,8 +1,6 @@ import abc from typing import TYPE_CHECKING, AsyncGenerator, Optional, Union -from fastapi import Request - from ray.llm._internal.serve.configs.server_models import ( DiskMultiplexConfig, LLMConfig, @@ -18,6 +16,9 @@ EmbeddingResponse, ErrorResponse, ) + from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine import ( + RawRequestInfo, + ) class LLMEngine(abc.ABC): @@ -44,7 +45,9 @@ async def reset_prefix_cache(self) -> None: @abc.abstractmethod async def chat( - self, request: "ChatCompletionRequest", raw_request: Optional[Request] = None + self, + request: "ChatCompletionRequest", + raw_request_info: Optional["RawRequestInfo"] = None, ) -> AsyncGenerator[Union[str, "ChatCompletionResponse", "ErrorResponse"], None]: """Run a ChatCompletion with the engine. @@ -58,7 +61,7 @@ async def chat( Args: request: The chat completion request. - raw_request: The raw FastAPI request object. + raw_request_info: Optional RawRequestInfo containing headers and state from the original request. Yields: Union[str, ChatCompletionResponse, ErrorResponse]: A string representing a chunk of the response, a ChatCompletionResponse object, or an ErrorResponse object. @@ -70,7 +73,9 @@ async def chat( @abc.abstractmethod async def completions( - self, request: "CompletionRequest", raw_request: Optional[Request] = None + self, + request: "CompletionRequest", + raw_request_info: Optional["RawRequestInfo"] = None, ) -> AsyncGenerator[Union[str, "CompletionResponse", "ErrorResponse"], None]: """Run a Completion with the engine. @@ -88,7 +93,7 @@ async def completions( Args: request: The completion request. - raw_request: The raw FastAPI request object. + raw_request_info: Optional RawRequestInfo containing headers and state from the original request. Yields: Union[str, CompletionResponse, ErrorResponse]: A string @@ -102,7 +107,9 @@ async def completions( @abc.abstractmethod async def embeddings( - self, request: "EmbeddingRequest", raw_request: Optional[Request] = None + self, + request: "EmbeddingRequest", + raw_request_info: Optional["RawRequestInfo"] = None, ) -> AsyncGenerator[Union["EmbeddingResponse", "ErrorResponse"], None]: """Run an Embedding with the engine. @@ -116,7 +123,7 @@ async def embeddings( Args: request: The embedding request. - raw_request: The raw FastAPI request object. + raw_request_info: Optional RawRequestInfo containing headers and state from the original request. Returns: An async generator that yields EmbeddingResponse objects or ErrorResponse objects, and returns None when the generator is done. diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index c063caeb31d2..da43b988ae31 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -29,7 +29,10 @@ LLMConfig, ) from ray.llm._internal.serve.deployments.llm.llm_engine import LLMEngine -from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine import VLLMEngine +from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine import ( + RawRequestInfo, + VLLMEngine, +) from ray.llm._internal.serve.deployments.protocol import LLMServerProtocol from ray.llm._internal.serve.deployments.utils.batcher import Batcher from ray.llm._internal.serve.deployments.utils.server_utils import ( @@ -306,13 +309,23 @@ async def _run_request( await self._maybe_add_request_id_to_request(request) await self._maybe_resolve_lora_from_multiplex() + # Extract serializable request info from raw_request + raw_request_info = None + if raw_request is not None: + raw_request_info = RawRequestInfo( + headers=dict(raw_request.headers.items()), + state=dict(raw_request.state._state) + if hasattr(raw_request.state, "_state") + else None, + ) + is_stream = hasattr(request, "stream") and request.stream if is_stream and batch_output_stream: stream = self._batch_output_stream( - getattr(self.engine, engine_method)(request, raw_request) + getattr(self.engine, engine_method)(request, raw_request_info) ) else: - stream = getattr(self.engine, engine_method)(request, raw_request) + stream = getattr(self.engine, engine_method)(request, raw_request_info) return stream diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 7a44b416c2f0..a3ddcd6f6a03 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -1,6 +1,7 @@ import argparse import os -from typing import TYPE_CHECKING, AsyncGenerator, Optional, Tuple, Union +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, Optional, Tuple, Union from fastapi import Request from starlette.datastructures import State @@ -51,6 +52,57 @@ logger = get_logger(__name__) +@dataclass +class RawRequestInfo: + """Container for serializable request information. + + This class holds the minimal request data (headers and state) that can be + serialized and passed across Ray's boundaries, then used to reconstruct + a Request object for vLLM. + """ + + headers: Optional[Dict[str, str]] = None + state: Optional[Dict[str, Any]] = None + + +def _create_raw_request( + headers: Optional[Dict[str, str]] = None, + state: Optional[Dict[str, Any]] = None, + path: str = "/", +) -> Request: + """Create a FastAPI/Starlette Request object with optional headers and state. + + This utility creates a real Starlette Request object + with the specified HTTP headers and custom state, for cases where + only a minimal scope is required (such as vLLM's serving methods). + + Args: + headers: A dictionary of HTTP headers. + state: A dictionary with state data. + path: The request path (defaults to "/"). + + Returns: + A Request object with the specified headers and state set. + """ + from starlette.datastructures import State + from starlette.requests import Request + + # Minimal ASGI scope for an HTTP POST request. + scope = { + "type": "http", + "method": "POST", + "path": path, + "headers": [ + (k.lower().encode(), v.encode()) for k, v in (headers or {}).items() + ], + "query_string": b"", + } + req = Request(scope) + if state is not None: + req.state = State(state) + return req + + def _get_vllm_engine_config( llm_config: LLMConfig, ) -> Tuple["AsyncEngineArgs", "VllmConfig"]: @@ -344,10 +396,19 @@ async def resolve_lora(self, disk_lora_model: DiskMultiplexConfig): raise ValueError(f"Failed to load lora model: {lora_request.error.message}") async def chat( - self, request: ChatCompletionRequest, raw_request: Optional[Request] = None + self, + request: ChatCompletionRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[str, ChatCompletionResponse, ErrorResponse], None]: self._validate_openai_serving_chat() + # Create raw request from serializable request info + raw_request = None + if raw_request_info is not None: + raw_request = _create_raw_request( + raw_request_info.headers, raw_request_info.state + ) + chat_response = await self._oai_serving_chat.create_chat_completion( # type: ignore[attr-defined] request, raw_request ) @@ -366,10 +427,19 @@ async def chat( yield ChatCompletionResponse(**chat_response.model_dump()) async def completions( - self, request: CompletionRequest, raw_request: Optional[Request] = None + self, + request: CompletionRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[str, CompletionResponse, ErrorResponse], None]: self._validate_openai_serving_completion() + # Create raw request from serializable request info + raw_request = None + if raw_request_info is not None: + raw_request = _create_raw_request( + raw_request_info.headers, raw_request_info.state + ) + completion_response = await self._oai_serving_completion.create_completion( # type: ignore[attr-defined] request, raw_request ) @@ -390,10 +460,19 @@ async def completions( yield CompletionResponse(**completion_response.model_dump()) async def embeddings( - self, request: EmbeddingRequest, raw_request: Optional[Request] = None + self, + request: EmbeddingRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[EmbeddingResponse, ErrorResponse], None]: self._validate_openai_serving_embedding() + # Create raw request from serializable request info + raw_request = None + if raw_request_info is not None: + raw_request = _create_raw_request( + raw_request_info.headers, raw_request_info.state + ) + embedding_response = await self._oai_serving_embedding.create_embedding( # type: ignore[attr-defined] request, raw_request ) @@ -406,10 +485,19 @@ async def embeddings( yield EmbeddingResponse(**embedding_response.model_dump()) async def score( - self, request: ScoreRequest, raw_request: Optional[Request] = None + self, + request: ScoreRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[ScoreResponse, ErrorResponse], None]: self._validate_openai_serving_scores() + # Create raw request from serializable request info + raw_request = None + if raw_request_info is not None: + raw_request = _create_raw_request( + raw_request_info.headers, raw_request_info.state + ) + score_response = await self._oai_serving_scores.create_score( request, raw_request ) diff --git a/python/ray/llm/_internal/serve/deployments/routers/router.py b/python/ray/llm/_internal/serve/deployments/routers/router.py index 97b6cea7a123..2be7fe4217a8 100644 --- a/python/ray/llm/_internal/serve/deployments/routers/router.py +++ b/python/ray/llm/_internal/serve/deployments/routers/router.py @@ -58,6 +58,7 @@ to_model_metadata, ) from ray.llm._internal.serve.configs.server_models import LLMConfig +from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine import RawRequestInfo from ray.llm._internal.serve.deployments.protocol import DeploymentProtocol from ray.llm._internal.serve.deployments.routers.middleware import ( SetRequestIdMiddleware, @@ -443,8 +444,18 @@ async def _get_response( if isinstance(body, ChatCompletionRequest): body = _sanitize_chat_completion_request(body) + # Extract serializable request info from raw_request + raw_request_info = None + if raw_request is not None: + raw_request_info = RawRequestInfo( + headers=dict(raw_request.headers.items()), + state=dict(raw_request.state._state) + if hasattr(raw_request.state, "_state") + else None, + ) + async for response in getattr(model_handle, call_method).remote( - body, raw_request + body, raw_request_info ): yield response diff --git a/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py b/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py index 313a8d7fa3f9..d4c9aa1792c0 100644 --- a/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py +++ b/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py @@ -383,6 +383,60 @@ async def test_push_telemetry(self, mock_llm_config): await server.start() mock_push_telemetry.assert_called_once() + @pytest.mark.asyncio + async def test_raw_request_reaches_vllm_engine(self, mock_llm_config): + """Test that raw_request is passed to the vllm_engine.""" + from unittest.mock import MagicMock + + from fastapi import Request + + from ray.llm._internal.serve.configs.openai_api_models import ( + ChatCompletionRequest, + ) + + # Track if raw_request was received by the engine + captured_raw_request = [] + + # Create a mock engine that captures raw_request + class RawRequestCapturingEngine(MockVLLMEngine): + async def chat(self, request, raw_request=None): + captured_raw_request.append(raw_request) + # Call parent implementation + async for response in super().chat(request, raw_request): + yield response + + # Create server with custom engine + server = LLMServer.sync_init( + mock_llm_config, engine_cls=RawRequestCapturingEngine + ) + await server.start() + + # Create a mock FastAPI request + from starlette.datastructures import Headers + + mock_request = MagicMock(spec=Request) + mock_request.headers = Headers({"content-type": "application/json"}) + + # Create a chat request + chat_request = ChatCompletionRequest( + model="mock-model", + messages=[{"role": "user", "content": "Hello, world!"}], + max_tokens=10, + stream=False, + ) + + # Make a request through the server + response_gen = server.chat(chat_request, mock_request) + + # Consume the generator + chunks = [] + async for chunk in response_gen: + chunks.append(chunk) + + # Verify that raw_request was passed to the engine + assert len(captured_raw_request) == 1 + assert captured_raw_request[0] is mock_request + @pytest.mark.parametrize("api_type", ["chat", "completions"]) @pytest.mark.parametrize("stream", [True]) @pytest.mark.parametrize("max_tokens", [64]) diff --git a/python/ray/llm/tests/serve/cpu/deployments/routers/test_router.py b/python/ray/llm/tests/serve/cpu/deployments/routers/test_router.py index 8783a3e7d4f4..6431eff6e558 100644 --- a/python/ray/llm/tests/serve/cpu/deployments/routers/test_router.py +++ b/python/ray/llm/tests/serve/cpu/deployments/routers/test_router.py @@ -225,6 +225,65 @@ async def test_check_health(self, llm_config: LLMConfig): await router.check_health() + @pytest.mark.asyncio + async def test_raw_request_passed_to_deployment_handle(self, llm_config: LLMConfig): + """Test that raw_request is passed to the deployment handle.""" + from fastapi import Request + + from ray.llm._internal.serve.configs.openai_api_models import ( + ChatCompletionRequest, + ChatCompletionResponse, + ) + + # Track if raw_request was received + captured_raw_request = [] + + # Create a mock deployment handle that captures raw_request + async def mock_chat_generator(request, raw_request): + captured_raw_request.append(raw_request) + # Return a valid response + yield ChatCompletionResponse( + id="test_id", + choices=[ + { + "index": 0, + "message": {"role": "assistant", "content": "Hello!"}, + "finish_reason": "stop", + } + ], + model="llm_model_id", + object="chat.completion", + ) + + mock_handle = MagicMock() + mock_handle.llm_config = MagicMock() + mock_handle.llm_config.remote = AsyncMock(return_value=llm_config) + mock_handle.chat = MagicMock() + mock_handle.chat.remote = mock_chat_generator + + # Create router with mock handle + router = OpenAiIngress(llm_deployments=[mock_handle]) + await router._init_completed.wait() + + # Create a mock FastAPI request + from starlette.datastructures import Headers + + mock_request = MagicMock(spec=Request) + mock_request.headers = Headers({"content-type": "application/json"}) + + # Make a request through the router + request_body = ChatCompletionRequest( + model="llm_model_id", + messages=[{"role": "user", "content": "Hello"}], + stream=False, + ) + + await router.chat(request_body, mock_request) + + # Verify that raw_request was passed to the deployment handle + assert len(captured_raw_request) == 1 + assert captured_raw_request[0] is mock_request + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py index 068621fbfa71..932611d42c7e 100644 --- a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py +++ b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py @@ -2,7 +2,7 @@ import json import random from random import randint -from typing import AsyncGenerator, Dict, Union +from typing import AsyncGenerator, Dict, Optional, Union from ray.llm._internal.common.utils.cloud_utils import LoraMirrorConfig from ray.llm._internal.serve.configs.openai_api_models import ( @@ -21,6 +21,7 @@ LLMConfig, ) from ray.llm._internal.serve.deployments.llm.llm_engine import LLMEngine +from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine import RawRequestInfo from ray.llm._internal.serve.utils.lora_serve_utils import LoraModelLoader @@ -69,7 +70,9 @@ async def stop_profile(self) -> None: raise RuntimeError("Engine not started") async def chat( - self, request: ChatCompletionRequest + self, + request: ChatCompletionRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[str, ChatCompletionResponse, ErrorResponse], None]: """Mock chat completion.""" if not self.started: @@ -91,7 +94,9 @@ async def chat( yield response async def completions( - self, request: CompletionRequest + self, + request: CompletionRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[str, CompletionResponse, ErrorResponse], None]: """Mock text completion.""" if not self.started: @@ -107,7 +112,9 @@ async def completions( yield response async def embeddings( - self, request: EmbeddingRequest + self, + request: EmbeddingRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[str, EmbeddingResponse, ErrorResponse], None]: """Mock embeddings generation.""" if not self.started: @@ -138,7 +145,9 @@ async def embeddings( yield response async def score( - self, request: ScoreRequest + self, + request: ScoreRequest, + raw_request_info: Optional[RawRequestInfo] = None, ) -> AsyncGenerator[Union[str, ScoreResponse, ErrorResponse], None]: """Mock score generation for text pairs.""" if not self.started: