diff --git a/vllm/v1/engine/__init__.py b/vllm/v1/engine/__init__.py index af33270c5658b..901246e7e1909 100644 --- a/vllm/v1/engine/__init__.py +++ b/vllm/v1/engine/__init__.py @@ -1,5 +1,5 @@ -from dataclasses import dataclass import enum +from dataclasses import dataclass from typing import List, Optional, Union import msgspec @@ -72,6 +72,7 @@ class EngineCoreAbortRequest(msgspec.Struct): request_ids: List[str] request_status: RequestStatus + class EngineCoreRequestType(enum.Enum): - AddRequest = enum.auto() + AddRequest = enum.auto() AbortRequest = enum.auto() diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 4c6a04b239440..7ba5851ee0b78 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -1,5 +1,6 @@ import asyncio -from typing import AsyncGenerator, Dict, Mapping, Optional, Type, Union, Iterable +from typing import (AsyncGenerator, Dict, Iterable, Mapping, Optional, Type, + Union) from vllm.config import EngineConfig, ModelConfig from vllm.engine.arg_utils import AsyncEngineArgs @@ -14,13 +15,13 @@ from vllm.transformers_utils.tokenizer import AnyTokenizer from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.usage.usage_lib import UsageContext -from vllm.v1.engine.async_stream import AsyncStream from vllm.v1.engine import EngineCoreAbortRequest -from vllm.v1.request import RequestStatus +from vllm.v1.engine.async_stream import AsyncStream from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.processor import Processor from vllm.v1.executor.gpu_executor import GPUExecutor +from vllm.v1.request import RequestStatus logger = init_logger(__name__) @@ -112,13 +113,14 @@ def from_engine_args( def _get_executor_cls(cls, engine_config: EngineConfig): return GPUExecutor - async def abort_request(self, request_id: Union[str, Iterable[str]]) -> None: + async def abort_request(self, request_id: Union[str, + Iterable[str]]) -> None: if isinstance(request_id, str): request_id = [request_id] self.engine_core.abort_requests_async( - EngineCoreAbortRequest( - request_ids = request_id, - request_status = RequestStatus.FINISHED_STOPPED)) + EngineCoreAbortRequest( + request_ids=request_id, + request_status=RequestStatus.FINISHED_STOPPED)) async def add_request( self, diff --git a/vllm/v1/engine/async_stream.py b/vllm/v1/engine/async_stream.py index e79f1562a0e67..4bd57b7a9ef89 100644 --- a/vllm/v1/engine/async_stream.py +++ b/vllm/v1/engine/async_stream.py @@ -1,5 +1,6 @@ import asyncio from typing import Any, AsyncGenerator, Callable, Optional, Type, Union + from vllm.outputs import EmbeddingRequestOutput, RequestOutput diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index be31dd630d231..1bd3f061615b7 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -10,11 +10,9 @@ from vllm.logger import init_logger from vllm.usage.usage_lib import UsageContext from vllm.v1.core.scheduler import Scheduler -from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreOutput, - EngineCoreOutputs, - EngineCoreRequest, - EngineCoreAbortRequest, - EngineCoreRequestType) +from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreAbortRequest, + EngineCoreOutput, EngineCoreOutputs, + EngineCoreRequest, EngineCoreRequestType) from vllm.v1.executor.gpu_executor import GPUExecutor from vllm.v1.request import Request from vllm.version import __version__ as VLLM_VERSION @@ -284,11 +282,11 @@ def process_abort_requests(request_data: bytes) -> None: # frames[0] identifies the type of the request and # frames[1] is the actual request data assert len(frames) == 2 - request_type, request_data = frames[0], frames[1] + request_type, request_data = frames[0], frames[1] # Determine request_type request_type = self.msgpack_request_type_decoder.decode( - frames[0].buffer) + frames[0].buffer) # Process request_data based on request_type if request_type == EngineCoreRequestType.AddRequest: diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 3643ff4a4de6d..4ae654dfe7ddb 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -6,11 +6,9 @@ from vllm.logger import init_logger from vllm.utils import get_open_zmq_ipc_path -from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreOutput, - EngineCoreOutputs, - EngineCoreRequest, - EngineCoreAbortRequest, - EngineCoreRequestType) +from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreAbortRequest, + EngineCoreOutput, EngineCoreOutputs, + EngineCoreRequest, EngineCoreRequestType) from vllm.v1.engine.core import EngineCore, EngineCoreProc logger = init_logger(__name__) @@ -160,11 +158,12 @@ def send_input(self, request_type: EngineCoreRequestType, request: Union[EngineCoreRequest, EngineCoreAbortRequest])\ -> None: - self.input_socket.send_multipart( - (self.encoder.encode(request_type), - self.encoder.encode(request), ), - copy=False, - flags=zmq.NOBLOCK) + self.input_socket.send_multipart(( + self.encoder.encode(request_type), + self.encoder.encode(request), + ), + copy=False, + flags=zmq.NOBLOCK) def add_request(self, request: EngineCoreRequest) -> None: self.send_input(EngineCoreRequestType.AddRequest, request) @@ -172,6 +171,7 @@ def add_request(self, request: EngineCoreRequest) -> None: def abort_requests(self, request: EngineCoreAbortRequest) -> None: self.send_input(EngineCoreRequestType.AbortRequest, request) + class AsyncMPClient(MPClient): """Asyncio-compatible client for multi-proc EngineCore.""" @@ -192,14 +192,16 @@ async def send_input(self, request_type: EngineCoreRequestType, request: Union[EngineCoreRequest, EngineCoreAbortRequest])\ -> None: - await self.input_socket.send_multipart( - (self.encoder.encode(request_type), - self.encoder.encode(request), ), - copy=False, - flags=zmq.NOBLOCK) + await self.input_socket.send_multipart(( + self.encoder.encode(request_type), + self.encoder.encode(request), + ), + copy=False, + flags=zmq.NOBLOCK) async def add_request_async(self, request: EngineCoreRequest) -> None: await self.send_input(EngineCoreRequestType.AddRequest, request) - async def abort_requests_async(self, request: EngineCoreAbortRequest) -> None: + async def abort_requests_async(self, + request: EngineCoreAbortRequest) -> None: await self.send_input(EngineCoreRequestType.AbortRequest, request) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 56005635ab225..44f8524c30934 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -13,11 +13,11 @@ from vllm.sampling_params import SamplingParams from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs from vllm.usage.usage_lib import UsageContext +from vllm.v1.engine import EngineCoreAbortRequest from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.processor import Processor from vllm.v1.executor.gpu_executor import GPUExecutor -from vllm.v1.engine import EngineCoreAbortRequest from vllm.v1.request import RequestStatus logger = init_logger(__name__) @@ -113,9 +113,9 @@ def abort_request(self, request_id: Union[str, Iterable[str]]) -> None: if isinstance(request_id, str): request_id = [request_id] self.engine_core.abort_requests( - EngineCoreAbortRequest( - request_ids = request_id, - request_status = RequestStatus.FINISHED_STOPPED)) + EngineCoreAbortRequest( + request_ids=request_id, + request_status=RequestStatus.FINISHED_STOPPED)) def add_request( self,