diff --git a/vllm/v1/engine/__init__.py b/vllm/v1/engine/__init__.py index b003e969f220d..820362e8f244d 100644 --- a/vllm/v1/engine/__init__.py +++ b/vllm/v1/engine/__init__.py @@ -33,7 +33,9 @@ class EngineCoreRequest(msgspec.Struct): # due to circular imports and typing we have in data.py request_id: str - prompt: Optional[str] + #NOTE(Nick): I don't think we need to pass prompt here since it should + # always be tokenized? + # prompt: Optional[str] prompt_token_ids: List[int] sampling_params: SamplingParams eos_token_id: Optional[int] @@ -53,5 +55,8 @@ class EngineCoreOutput: class EngineCoreOutputs(msgspec.Struct): + #NOTE(Nick): We could consider ways to make this more compact, + # e.g. columnwise layout and using an int enum for finish/stop reason + # [num_reqs] outputs: List[EngineCoreOutput] diff --git a/vllm/v1/engine/async_llm_engine.py b/vllm/v1/engine/async_llm_engine.py index d921b888e1aea..0625833da6d6d 100644 --- a/vllm/v1/engine/async_llm_engine.py +++ b/vllm/v1/engine/async_llm_engine.py @@ -1,5 +1,4 @@ import asyncio -import multiprocessing from typing import AsyncGenerator, Dict, Mapping, Optional, Type, Union import msgspec @@ -25,7 +24,7 @@ from vllm.v1.engine import LLM_ENGINE_CORE_READY_STR, EngineCoreOutputs from vllm.v1.engine.async_stream import AsyncStream from vllm.v1.engine.detokenizer import Detokenizer -from vllm.v1.engine.llm_engine_core import LLMEngineCore +from vllm.v1.engine.llm_engine_core import LLMEngineCoreProcess from vllm.v1.engine.processor import Processor from vllm.v1.engine.protocol import LLMEngineProtocol from vllm.v1.executor.gpu_executor import GPUExecutor @@ -78,7 +77,7 @@ def __init__( # Detokenizer (converts EngineCoreOutputs --> RequestOutput) self.detokenizer = Detokenizer(model_config.tokenizer, stream_mode=True) - + # IPC Setup self.ctx = zmq.asyncio.Context() # type: ignore[attr-defined] self.encoder = msgspec.msgpack.Encoder() @@ -97,32 +96,23 @@ def __init__( self.input_socket = self.ctx.socket(zmq.constants.PUSH) self.input_socket.bind(input_path) - # The current process might have CUDA context, - # so we need to spawn a new process - context = multiprocessing.get_context("spawn") - - # Run LLMEngineCore busy loop in background process. - self.engine_core = context.Process(target=self.run_engine_core, - args=( - executor_class, - model_config, - cache_config, - parallel_config, - scheduler_config, - device_config, - load_config, - lora_config, - speculative_config, - decoding_config, - observability_config, - prompt_adapter_config, - ), - kwargs={ - "async_mode": True, - "input_path": input_path, - "output_path": output_path, - "ready_path": self.ready_path, - }) + self.engine_core = LLMEngineCoreProcess.from_config( + executor_class, + model_config, + cache_config, + parallel_config, + scheduler_config, + device_config, + load_config, + lora_config, + speculative_config, + decoding_config, + observability_config, + prompt_adapter_config, + input_path=input_path, + output_path=output_path, + ready_path=self.ready_path, + ) self.engine_core.start() # TODO: add background loop shielding @@ -133,16 +123,6 @@ def __del__(self): # Hack. self.engine_core.kill() - @staticmethod - def run_engine_core(*args, **kwargs): - """Launch EngineCore busy loop in background process.""" - - logger.debug("Initializing LLMEngineCore in background process.") - engine_core = LLMEngineCore(*args, **kwargs) - - logger.debug("Starting LLMEngineCore busy loop in background process.") - engine_core.run_busy_loop() - @classmethod def from_engine_args( cls, diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index b1d58bff1eb2c..3775a80c6d8e3 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -17,8 +17,8 @@ from vllm.usage.usage_lib import UsageContext from vllm.v1.engine.detokenizer import Detokenizer from vllm.v1.engine.llm_engine_core import LLMEngineCore -from vllm.v1.engine.protocol import LLMEngineProtocol from vllm.v1.engine.processor import Processor +from vllm.v1.engine.protocol import LLMEngineProtocol from vllm.v1.executor.gpu_executor import GPUExecutor logger = init_logger(__name__) diff --git a/vllm/v1/engine/llm_engine_core.py b/vllm/v1/engine/llm_engine_core.py index 928635c3cc29e..9fa472df1c4e5 100644 --- a/vllm/v1/engine/llm_engine_core.py +++ b/vllm/v1/engine/llm_engine_core.py @@ -1,3 +1,5 @@ +import multiprocessing +from multiprocessing.process import BaseProcess from typing import List, Optional, Tuple, Type import msgspec @@ -35,10 +37,6 @@ def __init__( decoding_config: Optional[DecodingConfig], observability_config: Optional[ObservabilityConfig], prompt_adapter_config: Optional[PromptAdapterConfig], - async_mode: bool = False, - input_path: Optional[str] = None, - output_path: Optional[str] = None, - ready_path: Optional[str] = None, ): assert model_config.task != "embedding" @@ -100,32 +98,6 @@ def __init__( # Setup scheduler. self.scheduler = Scheduler(scheduler_config, cache_config, lora_config) - # Setup IPC if running in async mode. - if async_mode: - assert (input_path is not None and output_path is not None - and ready_path is not None) - - self.msgpack_encoder = msgspec.msgpack.Encoder() - self.msgpack_decoder = msgspec.msgpack.Decoder(EngineCoreRequest) - - self.ctx = zmq.Context() # type: ignore[attr-defined] - - # Get EngineCoreRequests from the LLMEngine. - self.input_socket = self.ctx.socket(zmq.constants.PULL) - self.input_socket.connect(input_path) - - # Send EngineCoreOutput to the LLMEngine. - self.output_socket = self.ctx.socket(zmq.constants.PUSH) - self.output_socket.bind(output_path) - - # Send Readiness signal to LLMEngine. - try: - ready_socket = self.ctx.socket(zmq.constants.PUSH) - ready_socket.bind(ready_path) - ready_socket.send_string(LLM_ENGINE_CORE_READY_STR) - finally: - ready_socket.close(linger=0) - def _initialize_kv_caches(self, cache_config: CacheConfig) -> Tuple[int, int]: num_gpu_blocks, _ = self.model_executor.determine_num_available_blocks( @@ -143,16 +115,13 @@ def _initialize_kv_caches(self, self.model_executor.initialize_cache(num_gpu_blocks) return num_gpu_blocks, num_cpu_blocks - def check_health(self): - self.model_executor.check_health() - def add_request(self, engine_core_request: EngineCoreRequest): """Add request to the scheduler.""" request = Request.from_engine_core_request(engine_core_request) self.scheduler.add_request(request) - def step(self) -> List[EngineCoreOutputs]: + def step(self) -> List[EngineCoreOutput]: """Schedule, execute, and make output.""" if not self.scheduler.has_unfinished_requests(): @@ -164,6 +133,74 @@ def step(self) -> List[EngineCoreOutputs]: scheduler_output, output) return engine_core_outputs + def check_health(self): + self.model_executor.check_health() + + +class LLMEngineCoreProcess(LLMEngineCore): + + @staticmethod + def from_config(*config_args, input_path: str, output_path: str, + ready_path: str) -> BaseProcess: + # The current process might have CUDA context, + # so we need to spawn a new process + context = multiprocessing.get_context("spawn") + + # Run LLMEngineCore busy loop in background process. + return context.Process(target=LLMEngineCoreProcess.run_engine_core, + args=config_args, + kwargs={ + "input_path": input_path, + "output_path": output_path, + "ready_path": ready_path, + }) + + @staticmethod + def run_engine_core(*args, **kwargs): + """Launch EngineCore busy loop in background process.""" + + logger.debug("Initializing LLMEngineCore in background process.") + engine_core = LLMEngineCoreProcess(*args, **kwargs) + + logger.debug("Starting LLMEngineCore busy loop in background process.") + engine_core.run_busy_loop() + + def __init__( + self, + input_path: Optional[str] = None, + output_path: Optional[str] = None, + ready_path: Optional[str] = None, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + + assert (input_path is not None and output_path is not None + and ready_path is not None) + + self.msgpack_encoder = msgspec.msgpack.Encoder() + self.msgpack_decoder = msgspec.msgpack.Decoder(EngineCoreRequest) + + self.ctx = zmq.Context() # type: ignore[attr-defined] + + # Get EngineCoreRequests from the LLMEngine. + self.input_socket = self.ctx.socket(zmq.constants.PULL) + self.input_socket.connect(input_path) + + # Send EngineCoreOutput to the LLMEngine. + self.output_socket = self.ctx.socket(zmq.constants.PUSH) + self.output_socket.bind(output_path) + + # Send Readiness signal to LLMEngine. + ready_socket = None + try: + ready_socket = self.ctx.socket(zmq.constants.PUSH) + ready_socket.bind(ready_path) + ready_socket.send_string(LLM_ENGINE_CORE_READY_STR) + finally: + if ready_socket: + ready_socket.close(linger=0) + def run_busy_loop(self): """Core busy loop of the LLMEngineCore for async mode.""" @@ -204,7 +241,7 @@ def _send_outputs(self, engine_core_outputs: List[EngineCoreOutput]) -> None: """Serialize and send output to the AsyncLLMEngine for async mode.""" - if len(engine_core_outputs) == 0: + if not engine_core_outputs: return outputs = EngineCoreOutputs(outputs=engine_core_outputs) diff --git a/vllm/v1/engine/processor.py b/vllm/v1/engine/processor.py index f6b3538fe8990..7c516afd32b17 100644 --- a/vllm/v1/engine/processor.py +++ b/vllm/v1/engine/processor.py @@ -98,9 +98,8 @@ def process_inputs( # Make Request for EngineCore. engine_core_request = EngineCoreRequest( - request_id, processed_inputs.get("prompt"), - processed_inputs.get("prompt_token_ids"), sampling_params, - eos_token_id, arrival_time, lora_request) + request_id, processed_inputs.get("prompt_token_ids"), + sampling_params, eos_token_id, arrival_time, lora_request) return detokenizer_request, engine_core_request diff --git a/vllm/v1/engine/protocol.py b/vllm/v1/engine/protocol.py index f96800dcf14f5..5dff96468b684 100644 --- a/vllm/v1/engine/protocol.py +++ b/vllm/v1/engine/protocol.py @@ -1,5 +1,5 @@ import multiprocessing -from abc import ABC, abstractmethod +from abc import ABC from typing import Union from vllm.config import (DecodingConfig, EngineConfig, LoRAConfig, ModelConfig, @@ -21,7 +21,7 @@ class LLMEngineProtocol(ABC): processor: Processor # TODO: These are needed for the get_xxx_config methods - # I think these are basically dead code (other than + # I think these are basically dead code (other than # get_model_config and mock testing) model_config: ModelConfig