Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split core process into separate class #22

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class EngineCoreRequest(msgspec.Struct):
# due to circular imports and typing we have in data.py

request_id: str
prompt: Optional[str]
#NOTE(Nick): I don't think we need to pass prompt here since it should
# always be tokenized?
# prompt: Optional[str]
prompt_token_ids: List[int]
sampling_params: SamplingParams
eos_token_id: Optional[int]
Expand All @@ -53,5 +55,8 @@ class EngineCoreOutput:

class EngineCoreOutputs(msgspec.Struct):

#NOTE(Nick): We could consider ways to make this more compact,
# e.g. columnwise layout and using an int enum for finish/stop reason

# [num_reqs]
outputs: List[EngineCoreOutput]
58 changes: 19 additions & 39 deletions vllm/v1/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import multiprocessing
from typing import AsyncGenerator, Dict, Mapping, Optional, Type, Union

import msgspec
Expand All @@ -25,7 +24,7 @@
from vllm.v1.engine import LLM_ENGINE_CORE_READY_STR, EngineCoreOutputs
from vllm.v1.engine.async_stream import AsyncStream
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.llm_engine_core import LLMEngineCore
from vllm.v1.engine.llm_engine_core import LLMEngineCoreProcess
from vllm.v1.engine.processor import Processor
from vllm.v1.engine.protocol import LLMEngineProtocol
from vllm.v1.executor.gpu_executor import GPUExecutor
Expand Down Expand Up @@ -77,7 +76,7 @@ def __init__(
# Detokenizer (converts EngineCoreOutputs --> RequestOutput)
self.detokenizer = Detokenizer(model_config.tokenizer,
stream_mode=True)

# IPC Setup
self.ctx = zmq.asyncio.Context() # type: ignore[attr-defined]
self.encoder = msgspec.msgpack.Encoder()
Expand All @@ -96,32 +95,23 @@ def __init__(
self.input_socket = self.ctx.socket(zmq.constants.PUSH)
self.input_socket.bind(input_path)

# The current process might have CUDA context,
# so we need to spawn a new process
context = multiprocessing.get_context("spawn")

# Run LLMEngineCore busy loop in background process.
self.engine_core = context.Process(target=self.run_engine_core,
args=(
executor_class,
model_config,
cache_config,
parallel_config,
scheduler_config,
device_config,
load_config,
lora_config,
speculative_config,
decoding_config,
observability_config,
prompt_adapter_config,
),
kwargs={
"async_mode": True,
"input_path": input_path,
"output_path": output_path,
"ready_path": self.ready_path,
})
self.engine_core = LLMEngineCoreProcess.from_config(
executor_class,
model_config,
cache_config,
parallel_config,
scheduler_config,
device_config,
load_config,
lora_config,
speculative_config,
decoding_config,
observability_config,
prompt_adapter_config,
input_path=input_path,
output_path=output_path,
ready_path=self.ready_path,
)
self.engine_core.start()

# TODO: add background loop shielding
Expand All @@ -132,16 +122,6 @@ def __del__(self):
# Hack.
self.engine_core.kill()

@staticmethod
def run_engine_core(*args, **kwargs):
"""Launch EngineCore busy loop in background process."""

logger.debug("Initializing LLMEngineCore in background process.")
engine_core = LLMEngineCore(*args, **kwargs)

logger.debug("Starting LLMEngineCore busy loop in background process.")
engine_core.run_busy_loop()

@classmethod
def from_engine_args(
cls,
Expand Down
2 changes: 1 addition & 1 deletion vllm/v1/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.llm_engine_core import LLMEngineCore
from vllm.v1.engine.protocol import LLMEngineProtocol
from vllm.v1.engine.processor import Processor
from vllm.v1.engine.protocol import LLMEngineProtocol
from vllm.v1.executor.gpu_executor import GPUExecutor

logger = init_logger(__name__)
Expand Down
107 changes: 72 additions & 35 deletions vllm/v1/engine/llm_engine_core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import multiprocessing
from multiprocessing.process import BaseProcess
from typing import List, Optional, Tuple, Type

import msgspec
Expand Down Expand Up @@ -35,10 +37,6 @@ def __init__(
decoding_config: Optional[DecodingConfig],
observability_config: Optional[ObservabilityConfig],
prompt_adapter_config: Optional[PromptAdapterConfig],
async_mode: bool = False,
input_path: Optional[str] = None,
output_path: Optional[str] = None,
ready_path: Optional[str] = None,
):
assert model_config.task != "embedding"

Expand Down Expand Up @@ -100,32 +98,6 @@ def __init__(
# Setup scheduler.
self.scheduler = Scheduler(scheduler_config, cache_config, lora_config)

# Setup IPC if running in async mode.
if async_mode:
assert (input_path is not None and output_path is not None
and ready_path is not None)

self.msgpack_encoder = msgspec.msgpack.Encoder()
self.msgpack_decoder = msgspec.msgpack.Decoder(EngineCoreRequest)

self.ctx = zmq.Context() # type: ignore[attr-defined]

# Get EngineCoreRequests from the LLMEngine.
self.input_socket = self.ctx.socket(zmq.constants.PULL)
self.input_socket.connect(input_path)

# Send EngineCoreOutput to the LLMEngine.
self.output_socket = self.ctx.socket(zmq.constants.PUSH)
self.output_socket.bind(output_path)

# Send Readiness signal to LLMEngine.
try:
ready_socket = self.ctx.socket(zmq.constants.PUSH)
ready_socket.bind(ready_path)
ready_socket.send_string(LLM_ENGINE_CORE_READY_STR)
finally:
ready_socket.close(linger=0)

def _initialize_kv_caches(self,
cache_config: CacheConfig) -> Tuple[int, int]:
num_gpu_blocks, _ = self.model_executor.determine_num_available_blocks(
Expand All @@ -143,16 +115,13 @@ def _initialize_kv_caches(self,
self.model_executor.initialize_cache(num_gpu_blocks)
return num_gpu_blocks, num_cpu_blocks

def check_health(self):
self.model_executor.check_health()

def add_request(self, engine_core_request: EngineCoreRequest):
"""Add request to the scheduler."""

request = Request.from_engine_core_request(engine_core_request)
self.scheduler.add_request(request)

def step(self) -> List[EngineCoreOutputs]:
def step(self) -> List[EngineCoreOutput]:
"""Schedule, execute, and make output."""

if not self.scheduler.has_unfinished_requests():
Expand All @@ -164,6 +133,74 @@ def step(self) -> List[EngineCoreOutputs]:
scheduler_output, output)
return engine_core_outputs

def check_health(self):
self.model_executor.check_health()


class LLMEngineCoreProcess(LLMEngineCore):

@staticmethod
def from_config(*config_args, input_path: str, output_path: str,
ready_path: str) -> BaseProcess:
# The current process might have CUDA context,
# so we need to spawn a new process
context = multiprocessing.get_context("spawn")

# Run LLMEngineCore busy loop in background process.
return context.Process(target=LLMEngineCoreProcess.run_engine_core,
args=config_args,
kwargs={
"input_path": input_path,
"output_path": output_path,
"ready_path": ready_path,
})

@staticmethod
def run_engine_core(*args, **kwargs):
"""Launch EngineCore busy loop in background process."""

logger.debug("Initializing LLMEngineCore in background process.")
engine_core = LLMEngineCoreProcess(*args, **kwargs)

logger.debug("Starting LLMEngineCore busy loop in background process.")
engine_core.run_busy_loop()

def __init__(
self,
input_path: Optional[str] = None,
output_path: Optional[str] = None,
ready_path: Optional[str] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

assert (input_path is not None and output_path is not None
and ready_path is not None)

self.msgpack_encoder = msgspec.msgpack.Encoder()
self.msgpack_decoder = msgspec.msgpack.Decoder(EngineCoreRequest)

self.ctx = zmq.Context() # type: ignore[attr-defined]

# Get EngineCoreRequests from the LLMEngine.
self.input_socket = self.ctx.socket(zmq.constants.PULL)
self.input_socket.connect(input_path)

# Send EngineCoreOutput to the LLMEngine.
self.output_socket = self.ctx.socket(zmq.constants.PUSH)
self.output_socket.bind(output_path)

# Send Readiness signal to LLMEngine.
ready_socket = None
try:
ready_socket = self.ctx.socket(zmq.constants.PUSH)
ready_socket.bind(ready_path)
ready_socket.send_string(LLM_ENGINE_CORE_READY_STR)
finally:
if ready_socket:
ready_socket.close(linger=0)

def run_busy_loop(self):
"""Core busy loop of the LLMEngineCore for async mode."""

Expand Down Expand Up @@ -204,7 +241,7 @@ def _send_outputs(self,
engine_core_outputs: List[EngineCoreOutput]) -> None:
"""Serialize and send output to the AsyncLLMEngine for async mode."""

if len(engine_core_outputs) == 0:
if not engine_core_outputs:
return

outputs = EngineCoreOutputs(outputs=engine_core_outputs)
Expand Down
5 changes: 2 additions & 3 deletions vllm/v1/engine/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ def process_inputs(

# Make Request for EngineCore.
engine_core_request = EngineCoreRequest(
request_id, processed_inputs.get("prompt"),
processed_inputs.get("prompt_token_ids"), sampling_params,
eos_token_id, arrival_time, lora_request)
request_id, processed_inputs.get("prompt_token_ids"),
sampling_params, eos_token_id, arrival_time, lora_request)

return detokenizer_request, engine_core_request

Expand Down
4 changes: 2 additions & 2 deletions vllm/v1/engine/protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import multiprocessing
from abc import ABC, abstractmethod
from abc import ABC
from typing import Union

from vllm.config import (DecodingConfig, EngineConfig, LoRAConfig, ModelConfig,
Expand All @@ -21,7 +21,7 @@ class LLMEngineProtocol(ABC):
processor: Processor

# TODO: These are needed for the get_xxx_config methods
# I think these are basically dead code (other than
# I think these are basically dead code (other than
# get_model_config and mock testing)

model_config: ModelConfig
Expand Down