Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
Varun Sundar Rabindranath committed Nov 4, 2024
1 parent d3752ad commit fc507d5
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 36 deletions.
5 changes: 3 additions & 2 deletions vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
import enum
from dataclasses import dataclass
from typing import List, Optional, Union

import msgspec
Expand Down Expand Up @@ -72,6 +72,7 @@ class EngineCoreAbortRequest(msgspec.Struct):
request_ids: List[str]
request_status: RequestStatus


class EngineCoreRequestType(enum.Enum):
AddRequest = enum.auto()
AddRequest = enum.auto()
AbortRequest = enum.auto()
16 changes: 9 additions & 7 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from typing import AsyncGenerator, Dict, Mapping, Optional, Type, Union, Iterable
from typing import (AsyncGenerator, Dict, Iterable, Mapping, Optional, Type,
Union)

from vllm.config import EngineConfig, ModelConfig
from vllm.engine.arg_utils import AsyncEngineArgs
Expand All @@ -14,13 +15,13 @@
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.async_stream import AsyncStream
from vllm.v1.engine import EngineCoreAbortRequest
from vllm.v1.request import RequestStatus
from vllm.v1.engine.async_stream import AsyncStream
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
from vllm.v1.executor.gpu_executor import GPUExecutor
from vllm.v1.request import RequestStatus

logger = init_logger(__name__)

Expand Down Expand Up @@ -112,13 +113,14 @@ def from_engine_args(
def _get_executor_cls(cls, engine_config: EngineConfig):
return GPUExecutor

async def abort_request(self, request_id: Union[str, Iterable[str]]) -> None:
async def abort_request(self, request_id: Union[str,
Iterable[str]]) -> None:
if isinstance(request_id, str):
request_id = [request_id]
self.engine_core.abort_requests_async(
EngineCoreAbortRequest(
request_ids = request_id,
request_status = RequestStatus.FINISHED_STOPPED))
EngineCoreAbortRequest(
request_ids=request_id,
request_status=RequestStatus.FINISHED_STOPPED))

async def add_request(
self,
Expand Down
1 change: 1 addition & 0 deletions vllm/v1/engine/async_stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from typing import Any, AsyncGenerator, Callable, Optional, Type, Union

from vllm.outputs import EmbeddingRequestOutput, RequestOutput


Expand Down
12 changes: 5 additions & 7 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.v1.core.scheduler import Scheduler
from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreOutput,
EngineCoreOutputs,
EngineCoreRequest,
EngineCoreAbortRequest,
EngineCoreRequestType)
from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreAbortRequest,
EngineCoreOutput, EngineCoreOutputs,
EngineCoreRequest, EngineCoreRequestType)
from vllm.v1.executor.gpu_executor import GPUExecutor
from vllm.v1.request import Request
from vllm.version import __version__ as VLLM_VERSION
Expand Down Expand Up @@ -284,11 +282,11 @@ def process_abort_requests(request_data: bytes) -> None:
# frames[0] identifies the type of the request and
# frames[1] is the actual request data
assert len(frames) == 2
request_type, request_data = frames[0], frames[1]
request_type, request_data = frames[0], frames[1]

# Determine request_type
request_type = self.msgpack_request_type_decoder.decode(
frames[0].buffer)
frames[0].buffer)

# Process request_data based on request_type
if request_type == EngineCoreRequestType.AddRequest:
Expand Down
34 changes: 18 additions & 16 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

from vllm.logger import init_logger
from vllm.utils import get_open_zmq_ipc_path
from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreOutput,
EngineCoreOutputs,
EngineCoreRequest,
EngineCoreAbortRequest,
EngineCoreRequestType)
from vllm.v1.engine import (POLLING_TIMEOUT_MS, EngineCoreAbortRequest,
EngineCoreOutput, EngineCoreOutputs,
EngineCoreRequest, EngineCoreRequestType)
from vllm.v1.engine.core import EngineCore, EngineCoreProc

logger = init_logger(__name__)
Expand Down Expand Up @@ -160,18 +158,20 @@ def send_input(self,
request_type: EngineCoreRequestType,
request: Union[EngineCoreRequest, EngineCoreAbortRequest])\
-> None:
self.input_socket.send_multipart(
(self.encoder.encode(request_type),
self.encoder.encode(request), ),
copy=False,
flags=zmq.NOBLOCK)
self.input_socket.send_multipart((
self.encoder.encode(request_type),
self.encoder.encode(request),
),
copy=False,
flags=zmq.NOBLOCK)

def add_request(self, request: EngineCoreRequest) -> None:
self.send_input(EngineCoreRequestType.AddRequest, request)

def abort_requests(self, request: EngineCoreAbortRequest) -> None:
self.send_input(EngineCoreRequestType.AbortRequest, request)


class AsyncMPClient(MPClient):
"""Asyncio-compatible client for multi-proc EngineCore."""

Expand All @@ -192,14 +192,16 @@ async def send_input(self,
request_type: EngineCoreRequestType,
request: Union[EngineCoreRequest, EngineCoreAbortRequest])\
-> None:
await self.input_socket.send_multipart(
(self.encoder.encode(request_type),
self.encoder.encode(request), ),
copy=False,
flags=zmq.NOBLOCK)
await self.input_socket.send_multipart((
self.encoder.encode(request_type),
self.encoder.encode(request),
),
copy=False,
flags=zmq.NOBLOCK)

async def add_request_async(self, request: EngineCoreRequest) -> None:
await self.send_input(EngineCoreRequestType.AddRequest, request)

async def abort_requests_async(self, request: EngineCoreAbortRequest) -> None:
async def abort_requests_async(self,
request: EngineCoreAbortRequest) -> None:
await self.send_input(EngineCoreRequestType.AbortRequest, request)
8 changes: 4 additions & 4 deletions vllm/v1/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine import EngineCoreAbortRequest
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
from vllm.v1.executor.gpu_executor import GPUExecutor
from vllm.v1.engine import EngineCoreAbortRequest
from vllm.v1.request import RequestStatus

logger = init_logger(__name__)
Expand Down Expand Up @@ -113,9 +113,9 @@ def abort_request(self, request_id: Union[str, Iterable[str]]) -> None:
if isinstance(request_id, str):
request_id = [request_id]
self.engine_core.abort_requests(
EngineCoreAbortRequest(
request_ids = request_id,
request_status = RequestStatus.FINISHED_STOPPED))
EngineCoreAbortRequest(
request_ids=request_id,
request_status=RequestStatus.FINISHED_STOPPED))

def add_request(
self,
Expand Down

0 comments on commit fc507d5

Please sign in to comment.