diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 7d80420bf7401..862e117b6c9af 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -1,8 +1,5 @@ import asyncio import time -import zmq -import zmq.asyncio -import pickle from functools import partial from typing import (AsyncGenerator, Callable, Dict, Iterable, List, Mapping, Optional, Set, Tuple, Type, Union) @@ -257,22 +254,13 @@ class _AsyncLLMEngine(LLMEngine): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.logger_ctx = zmq.asyncio.Context() - - self.to_logger = self.logger_ctx.socket(zmq.constants.PUSH) - self.to_logger.bind("inproc://doesitwork") - - self.from_engine = self.logger_ctx.socket(zmq.constants.PULL) - self.from_engine.connect("inproc://doesitwork") - self.logging_task = asyncio.create_task(self.run_logging_loop()) - + self.logging_queue = asyncio.Queue() async def run_logging_loop(self): - while True: - msg = await self.from_engine.recv_string() - self.do_log_stats() + data = await self.logging_queue.get() + self.do_log_stats(data[0], data[1]) async def step_async( self, virtual_engine: int @@ -311,13 +299,9 @@ async def step_async( output, scheduler_outputs.scheduled_seq_groups, scheduler_outputs.ignored_seq_groups, seq_group_metadata_list) - # Log stats. - # log_task = asyncio.create_task(self.do_log_stats_async( - # scheduler_outputs, output)) - # _running_tasks.add(log_task) - # log_task.add_done_callback(_running_tasks.discard) - # self.do_log_stats(scheduler_outputs, output) - await self.to_logger.send_string("Do log") + self.logging_queue.put_nowait(( + scheduler_outputs, output + )) # Tracing self.do_tracing(scheduler_outputs)