Skip to content

Commit

Permalink
split core process into separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
njhill committed Oct 31, 2024
1 parent 75ff707 commit 669648f
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 81 deletions.
7 changes: 6 additions & 1 deletion vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class EngineCoreRequest(msgspec.Struct):
# due to circular imports and typing we have in data.py

request_id: str
prompt: Optional[str]
#NOTE(Nick): I don't think we need to pass prompt here since it should
# always be tokenized?
# prompt: Optional[str]
prompt_token_ids: List[int]
sampling_params: SamplingParams
eos_token_id: Optional[int]
Expand All @@ -53,5 +55,8 @@ class EngineCoreOutput:

class EngineCoreOutputs(msgspec.Struct):

#NOTE(Nick): We could consider ways to make this more compact,
# e.g. columnwise layout and using an int enum for finish/stop reason

# [num_reqs]
outputs: List[EngineCoreOutput]
58 changes: 19 additions & 39 deletions vllm/v1/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import multiprocessing
from typing import AsyncGenerator, Dict, Mapping, Optional, Type, Union

import msgspec
Expand All @@ -25,7 +24,7 @@
from vllm.v1.engine import LLM_ENGINE_CORE_READY_STR, EngineCoreOutputs
from vllm.v1.engine.async_stream import AsyncStream
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.llm_engine_core import LLMEngineCore
from vllm.v1.engine.llm_engine_core import LLMEngineCoreProcess
from vllm.v1.engine.processor import Processor
from vllm.v1.engine.protocol import LLMEngineProtocol
from vllm.v1.executor.gpu_executor import GPUExecutor
Expand Down Expand Up @@ -77,7 +76,7 @@ def __init__(
# Detokenizer (converts EngineCoreOutputs --> RequestOutput)
self.detokenizer = Detokenizer(model_config.tokenizer,
stream_mode=True)

# IPC Setup
self.ctx = zmq.asyncio.Context() # type: ignore[attr-defined]
self.encoder = msgspec.msgpack.Encoder()
Expand All @@ -96,32 +95,23 @@ def __init__(
self.input_socket = self.ctx.socket(zmq.constants.PUSH)
self.input_socket.bind(input_path)

# The current process might have CUDA context,
# so we need to spawn a new process
context = multiprocessing.get_context("spawn")

# Run LLMEngineCore busy loop in background process.
self.engine_core = context.Process(target=self.run_engine_core,
args=(
executor_class,
model_config,
cache_config,
parallel_config,
scheduler_config,
device_config,
load_config,
lora_config,
speculative_config,
decoding_config,
observability_config,
prompt_adapter_config,
),
kwargs={
"async_mode": True,
"input_path": input_path,
"output_path": output_path,
"ready_path": self.ready_path,
})
self.engine_core = LLMEngineCoreProcess.from_config(
executor_class,
model_config,
cache_config,
parallel_config,
scheduler_config,
device_config,
load_config,
lora_config,
speculative_config,
decoding_config,
observability_config,
prompt_adapter_config,
input_path=input_path,
output_path=output_path,
ready_path=self.ready_path,
)
self.engine_core.start()

# TODO: add background loop shielding
Expand All @@ -132,16 +122,6 @@ def __del__(self):
# Hack.
self.engine_core.kill()

@staticmethod
def run_engine_core(*args, **kwargs):
"""Launch EngineCore busy loop in background process."""

logger.debug("Initializing LLMEngineCore in background process.")
engine_core = LLMEngineCore(*args, **kwargs)

logger.debug("Starting LLMEngineCore busy loop in background process.")
engine_core.run_busy_loop()

@classmethod
def from_engine_args(
cls,
Expand Down
2 changes: 1 addition & 1 deletion vllm/v1/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.llm_engine_core import LLMEngineCore
from vllm.v1.engine.protocol import LLMEngineProtocol
from vllm.v1.engine.processor import Processor
from vllm.v1.engine.protocol import LLMEngineProtocol
from vllm.v1.executor.gpu_executor import GPUExecutor

logger = init_logger(__name__)
Expand Down
107 changes: 72 additions & 35 deletions vllm/v1/engine/llm_engine_core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import multiprocessing
from multiprocessing.process import BaseProcess
from typing import List, Optional, Tuple, Type

import msgspec
Expand Down Expand Up @@ -35,10 +37,6 @@ def __init__(
decoding_config: Optional[DecodingConfig],
observability_config: Optional[ObservabilityConfig],
prompt_adapter_config: Optional[PromptAdapterConfig],
async_mode: bool = False,
input_path: Optional[str] = None,
output_path: Optional[str] = None,
ready_path: Optional[str] = None,
):
assert model_config.task != "embedding"

Expand Down Expand Up @@ -100,32 +98,6 @@ def __init__(
# Setup scheduler.
self.scheduler = Scheduler(scheduler_config, cache_config, lora_config)

# Setup IPC if running in async mode.
if async_mode:
assert (input_path is not None and output_path is not None
and ready_path is not None)

self.msgpack_encoder = msgspec.msgpack.Encoder()
self.msgpack_decoder = msgspec.msgpack.Decoder(EngineCoreRequest)

self.ctx = zmq.Context() # type: ignore[attr-defined]

# Get EngineCoreRequests from the LLMEngine.
self.input_socket = self.ctx.socket(zmq.constants.PULL)
self.input_socket.connect(input_path)

# Send EngineCoreOutput to the LLMEngine.
self.output_socket = self.ctx.socket(zmq.constants.PUSH)
self.output_socket.bind(output_path)

# Send Readiness signal to LLMEngine.
try:
ready_socket = self.ctx.socket(zmq.constants.PUSH)
ready_socket.bind(ready_path)
ready_socket.send_string(LLM_ENGINE_CORE_READY_STR)
finally:
ready_socket.close(linger=0)

def _initialize_kv_caches(self,
cache_config: CacheConfig) -> Tuple[int, int]:
num_gpu_blocks, _ = self.model_executor.determine_num_available_blocks(
Expand All @@ -143,16 +115,13 @@ def _initialize_kv_caches(self,
self.model_executor.initialize_cache(num_gpu_blocks)
return num_gpu_blocks, num_cpu_blocks

def check_health(self):
self.model_executor.check_health()

def add_request(self, engine_core_request: EngineCoreRequest):
"""Add request to the scheduler."""

request = Request.from_engine_core_request(engine_core_request)
self.scheduler.add_request(request)

def step(self) -> List[EngineCoreOutputs]:
def step(self) -> List[EngineCoreOutput]:
"""Schedule, execute, and make output."""

if not self.scheduler.has_unfinished_requests():
Expand All @@ -164,6 +133,74 @@ def step(self) -> List[EngineCoreOutputs]:
scheduler_output, output)
return engine_core_outputs

def check_health(self):
self.model_executor.check_health()


class LLMEngineCoreProcess(LLMEngineCore):

@staticmethod
def from_config(*config_args, input_path: str, output_path: str,
ready_path: str) -> BaseProcess:
# The current process might have CUDA context,
# so we need to spawn a new process
context = multiprocessing.get_context("spawn")

# Run LLMEngineCore busy loop in background process.
return context.Process(target=LLMEngineCoreProcess.run_engine_core,
args=config_args,
kwargs={
"input_path": input_path,
"output_path": output_path,
"ready_path": ready_path,
})

@staticmethod
def run_engine_core(*args, **kwargs):
"""Launch EngineCore busy loop in background process."""

logger.debug("Initializing LLMEngineCore in background process.")
engine_core = LLMEngineCoreProcess(*args, **kwargs)

logger.debug("Starting LLMEngineCore busy loop in background process.")
engine_core.run_busy_loop()

def __init__(
self,
input_path: Optional[str] = None,
output_path: Optional[str] = None,
ready_path: Optional[str] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

assert (input_path is not None and output_path is not None
and ready_path is not None)

self.msgpack_encoder = msgspec.msgpack.Encoder()
self.msgpack_decoder = msgspec.msgpack.Decoder(EngineCoreRequest)

self.ctx = zmq.Context() # type: ignore[attr-defined]

# Get EngineCoreRequests from the LLMEngine.
self.input_socket = self.ctx.socket(zmq.constants.PULL)
self.input_socket.connect(input_path)

# Send EngineCoreOutput to the LLMEngine.
self.output_socket = self.ctx.socket(zmq.constants.PUSH)
self.output_socket.bind(output_path)

# Send Readiness signal to LLMEngine.
ready_socket = None
try:
ready_socket = self.ctx.socket(zmq.constants.PUSH)
ready_socket.bind(ready_path)
ready_socket.send_string(LLM_ENGINE_CORE_READY_STR)
finally:
if ready_socket:
ready_socket.close(linger=0)

def run_busy_loop(self):
"""Core busy loop of the LLMEngineCore for async mode."""

Expand Down Expand Up @@ -204,7 +241,7 @@ def _send_outputs(self,
engine_core_outputs: List[EngineCoreOutput]) -> None:
"""Serialize and send output to the AsyncLLMEngine for async mode."""

if len(engine_core_outputs) == 0:
if not engine_core_outputs:
return

outputs = EngineCoreOutputs(outputs=engine_core_outputs)
Expand Down
5 changes: 2 additions & 3 deletions vllm/v1/engine/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ def process_inputs(

# Make Request for EngineCore.
engine_core_request = EngineCoreRequest(
request_id, processed_inputs.get("prompt"),
processed_inputs.get("prompt_token_ids"), sampling_params,
eos_token_id, arrival_time, lora_request)
request_id, processed_inputs.get("prompt_token_ids"),
sampling_params, eos_token_id, arrival_time, lora_request)

return detokenizer_request, engine_core_request

Expand Down
4 changes: 2 additions & 2 deletions vllm/v1/engine/protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import multiprocessing
from abc import ABC, abstractmethod
from abc import ABC
from typing import Union

from vllm.config import (DecodingConfig, EngineConfig, LoRAConfig, ModelConfig,
Expand All @@ -21,7 +21,7 @@ class LLMEngineProtocol(ABC):
processor: Processor

# TODO: These are needed for the get_xxx_config methods
# I think these are basically dead code (other than
# I think these are basically dead code (other than
# get_model_config and mock testing)

model_config: ModelConfig
Expand Down

0 comments on commit 669648f

Please sign in to comment.