diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py new file mode 100644 index 00000000..4c10b480 --- /dev/null +++ b/src/guidellm/request/session.py @@ -0,0 +1,68 @@ +import itertools +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from guidellm.backend.response import ResponseSummary +from guidellm.request.request import GenerationRequest + +__all__ = ["RequestSession"] + +# TODO: Replace with specific types that implement needed features +RequestT = TypeVar("RequestT") +ResponseT = TypeVar("ResponseT") + + +class RequestSession(ABC, Generic[RequestT, ResponseT]): + @abstractmethod + def get_next_request(self) -> RequestT: ... + + @abstractmethod + def get_next_delay(self) -> float: ... + + @abstractmethod + def push_response(self, response: ResponseT) -> None: ... + + @property + @abstractmethod + def complete(self) -> bool: ... + + +# FIXME: Bad implementation. Can only handle string requests +class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]): + def __init__(self, prompts: list[GenerationRequest]) -> None: + if not prompts: + raise ValueError("Prompts cannot be empty") + + self.prompts = prompts + self.responses: list[str] = [] + + def get_request(self) -> GenerationRequest: + completed_responses = len(self.responses) + base_request = self.prompts[completed_responses].model_copy() + base_request.content = "".join( + itertools.chain.from_iterable( + zip((x.content for x in self.prompts), self.responses) + ) + ) + base_request.stats["prompt_tokens"] = sum( + x.stats["prompt_tokens"] for x in self.prompts[: completed_responses + 1] + ) + base_request.constraints["output_tokens"] = sum( + x.constraints["output_tokens"] for x in self.prompts[:completed_responses] + ) + + return base_request + + def push_response(self, response: ResponseSummary) -> None: + if len(self.responses) < len(self.prompts): + if response.response_output_tokens is not None: + self.prompts[len(self.responses)].constraints["output_tokens"] = ( + response.response_output_tokens + ) + self.responses.append(response.value) + else: + raise ValueError("Response list full") + + @property + def complete(self) -> bool: + return len(self.responses) >= len(self.prompts) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 06203827..4c75f57d 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -1,10 +1,10 @@ import asyncio import math -import multiprocessing -import multiprocessing.queues import time from collections.abc import AsyncGenerator, Iterable, Iterator from concurrent.futures import ProcessPoolExecutor +from multiprocessing import Manager, Queue +from queue import Empty as QueueEmpty from typing import ( Any, Generic, @@ -15,17 +15,22 @@ from loguru import logger from guidellm.config import settings +from guidellm.request.session import RequestSession from guidellm.scheduler.result import ( SchedulerRequestResult, SchedulerResult, SchedulerRunInfo, ) from guidellm.scheduler.strategy import SchedulingStrategy -from guidellm.scheduler.types import RequestT, ResponseT +from guidellm.scheduler.types import ( + MPQueues, + RequestT, + ResponseT, + WorkerProcessRequestTime, + WorkerProcessResult, +) from guidellm.scheduler.worker import ( RequestsWorker, - WorkerProcessRequest, - WorkerProcessResult, ) __all__ = ["Scheduler"] @@ -114,13 +119,13 @@ async def run( raise ValueError(f"Invalid max_duration: {max_duration}") with ( - multiprocessing.Manager() as manager, + Manager() as manager, ProcessPoolExecutor( max_workers=scheduling_strategy.processes_limit ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - futures, requests_queue, responses_queue = await self._start_processes( + futures, queues = await self._start_processes( manager, executor, scheduling_strategy ) run_info, requests_iter, times_iter = self._run_setup( @@ -149,13 +154,14 @@ async def run( requests_iter = self._add_requests( requests_iter, times_iter, - requests_queue, + queues.requests, + queues.times, run_info, ) await asyncio.sleep(0) # enable requests to start iter_result = self._check_result_ready( - responses_queue, + queues.responses, run_info, ) if iter_result is not None: @@ -171,7 +177,7 @@ async def run( run_info=run_info, ) - await self._stop_processes(futures, requests_queue) + await self._stop_processes(futures, queues.requests) async def _start_processes( self, @@ -180,14 +186,16 @@ async def _start_processes( scheduling_strategy: SchedulingStrategy, ) -> tuple[ list[asyncio.Future], - multiprocessing.Queue, - multiprocessing.Queue, + MPQueues[RequestT, ResponseT], ]: await self.worker.prepare_multiprocessing() - requests_queue = manager.Queue( - maxsize=scheduling_strategy.queued_requests_limit + queues: MPQueues[RequestT, ResponseT] = MPQueues( + requests=manager.Queue( + maxsize=scheduling_strategy.processing_requests_limit + ), + times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit), + responses=manager.Queue(), ) - responses_queue = manager.Queue() num_processes = min( scheduling_strategy.processes_limit, @@ -212,36 +220,20 @@ async def _start_processes( futures = [] loop = asyncio.get_event_loop() for id_, requests_limit in zip(process_ids, process_requests_limits): - if scheduling_strategy.processing_mode == "sync": - futures.append( - loop.run_in_executor( - executor, - self.worker.process_loop_synchronous, - requests_queue, - responses_queue, - id_, - ) - ) - elif scheduling_strategy.processing_mode == "async": - futures.append( - loop.run_in_executor( - executor, - self.worker.process_loop_asynchronous, - requests_queue, - responses_queue, - requests_limit, - id_, - ) - ) - else: - raise ValueError( - f"Invalid processing mode: {scheduling_strategy.processing_mode} " - f"for strategy: {scheduling_strategy}" + futures.append( + loop.run_in_executor( + executor, + self.worker.process_loop_asynchronous, + queues, + False, # TODO: Make configurable + requests_limit, + id_, ) + ) await asyncio.sleep(0.1) # give time for processes to start - return futures, requests_queue, responses_queue + return futures, queues def _run_setup( self, @@ -284,7 +276,8 @@ def _add_requests( self, requests_iter: Optional[Iterator[Any]], times_iter: Iterator[float], - requests_queue: multiprocessing.Queue, + requests_queue: Queue[RequestSession[RequestT, ResponseT]], + times_queue: Queue[WorkerProcessRequestTime], run_info: SchedulerRunInfo, ) -> Optional[Iterator[Any]]: if requests_iter is not None: @@ -298,23 +291,24 @@ def _add_requests( if run_info.created_requests >= run_info.end_number: raise StopIteration - if ( - request_time := next(times_iter) - ) >= run_info.end_time or time.time() >= run_info.end_time: - raise StopIteration - - request = next(requests_iter) - work_req: WorkerProcessRequest[RequestT] = WorkerProcessRequest( - request=request, - start_time=request_time, - timeout_time=run_info.end_time, - queued_time=time.time(), - ) - requests_queue.put(work_req) - - run_info.created_requests += 1 - run_info.queued_requests += 1 - added_count += 1 + session = next(requests_iter) + requests_queue.put(session) + for _ in range(len(session)): + if ( + request_time := next(times_iter) + ) >= run_info.end_time or time.time() >= run_info.end_time: + raise StopIteration + + work_req = WorkerProcessRequestTime( + start_time=request_time, + timeout_time=run_info.end_time, + queued_time=time.time(), + ) + times_queue.put(work_req) + + run_info.created_requests += 1 + run_info.queued_requests += 1 + added_count += 1 except StopIteration: # we've reached the limit number, limit time, or exhausted the requests # set to None to stop adding more and tell the loop no more requests @@ -324,14 +318,14 @@ def _add_requests( def _check_result_ready( self, - responses_queue: multiprocessing.Queue, + responses_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], run_info: SchedulerRunInfo, ) -> Optional[SchedulerRequestResult[RequestT, ResponseT]]: try: process_response: WorkerProcessResult[RequestT, ResponseT] = ( responses_queue.get_nowait() ) - except multiprocessing.queues.Empty: # type: ignore[attr-defined] + except QueueEmpty: return None if process_response.type_ == "request_scheduled": @@ -374,8 +368,9 @@ def _check_result_ready( async def _stop_processes( self, futures: list[asyncio.Future], - requests_queue: multiprocessing.Queue, + requests_queue: Queue[RequestSession[RequestT, ResponseT]], ): + # FIXME: Need new method for stopping workers for _ in futures: requests_queue.put(None) diff --git a/src/guidellm/scheduler/strategy.py b/src/guidellm/scheduler/strategy.py index 200c799e..60dd799e 100644 --- a/src/guidellm/scheduler/strategy.py +++ b/src/guidellm/scheduler/strategy.py @@ -226,7 +226,9 @@ def processes_limit(self) -> int: :return: {self.streams} for the concurrent scheduling strategy to limit the worker processes to the number of streams. """ - return self.streams + cpu_cores = os.cpu_count() or 1 + + return min(max(1, cpu_cores - 1), self.streams) @property def queued_requests_limit(self) -> int: diff --git a/src/guidellm/scheduler/types.py b/src/guidellm/scheduler/types.py index 42535d71..e4efa288 100644 --- a/src/guidellm/scheduler/types.py +++ b/src/guidellm/scheduler/types.py @@ -1,7 +1,43 @@ -from typing import TypeVar +from dataclasses import dataclass +from multiprocessing import Queue +from typing import Generic, Literal, Optional, TypeVar -__all__ = ["RequestT", "ResponseT"] +from guidellm.request.session import RequestSession +from guidellm.scheduler.result import SchedulerRequestInfo + +__all__ = [ + "MPQueues", + "RequestT", + "ResponseT", + "WorkerProcessRequestTime", + "WorkerProcessResult", +] RequestT = TypeVar("RequestT") ResponseT = TypeVar("ResponseT") + + +# TODO: Move dataclasses somewhere else + + +@dataclass +class WorkerProcessRequestTime: + start_time: float + timeout_time: float + queued_time: float + + +@dataclass +class WorkerProcessResult(Generic[RequestT, ResponseT]): + type_: Literal["request_scheduled", "request_start", "request_complete"] + request: RequestT + response: Optional[ResponseT] + info: SchedulerRequestInfo + + +@dataclass +class MPQueues(Generic[RequestT, ResponseT]): + requests: Queue[RequestSession[RequestT, ResponseT]] + times: Queue[WorkerProcessRequestTime] + responses: Queue[WorkerProcessResult[RequestT, ResponseT]] diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index a53b14c2..fcbc85d7 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -1,11 +1,11 @@ import asyncio import math -import multiprocessing -import multiprocessing.queues import time from abc import ABC, abstractmethod from collections.abc import AsyncGenerator from dataclasses import dataclass +from multiprocessing import Queue +from queue import Empty as QueueEmpty from typing import ( Any, Generic, @@ -26,8 +26,9 @@ ) from guidellm.objects import StandardBaseModel from guidellm.request import GenerationRequest +from guidellm.request.session import RequestSession from guidellm.scheduler.result import SchedulerRequestInfo -from guidellm.scheduler.types import RequestT, ResponseT +from guidellm.scheduler.types import MPQueues, RequestT, ResponseT, WorkerProcessResult __all__ = [ "GenerativeRequestsWorker", @@ -35,27 +36,9 @@ "RequestsWorker", "ResolveStatus", "WorkerDescription", - "WorkerProcessRequest", - "WorkerProcessResult", ] -@dataclass -class WorkerProcessRequest(Generic[RequestT]): - request: RequestT - start_time: float - timeout_time: float - queued_time: float - - -@dataclass -class WorkerProcessResult(Generic[RequestT, ResponseT]): - type_: Literal["request_scheduled", "request_start", "request_complete"] - request: RequestT - response: Optional[ResponseT] - info: SchedulerRequestInfo - - @dataclass class ResolveStatus: requested: bool @@ -120,28 +103,25 @@ async def resolve( """ ... - async def get_request( - self, requests_queue: multiprocessing.Queue - ) -> Optional[WorkerProcessRequest[RequestT]]: - return await asyncio.to_thread(requests_queue.get) # type: ignore[attr-defined] - async def send_result( self, - results_queue: multiprocessing.Queue, + results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], result: WorkerProcessResult[RequestT, ResponseT], ): await asyncio.to_thread(results_queue.put, result) # type: ignore[attr-defined] async def resolve_scheduler_request( self, - request: Any, + request_session: RequestSession[RequestT, ResponseT], queued_time: float, dequeued_time: float, start_time: float, timeout_time: float, - results_queue: multiprocessing.Queue, + results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], process_id: int, - ): + ) -> Any: + request = request_session.get_next_request() + info = SchedulerRequestInfo( targeted_start_time=start_time, queued_time=queued_time, @@ -185,74 +165,76 @@ async def resolve_scheduler_request( ) asyncio.create_task(self.send_result(results_queue, result)) - def process_loop_synchronous( - self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, - process_id: int, - ): - async def _process_runner(): - while ( - process_request := await self.get_request(requests_queue) - ) is not None: - dequeued_time = time.time() - - await self.resolve_scheduler_request( - request=process_request.request, - queued_time=process_request.queued_time, - dequeued_time=dequeued_time, - start_time=process_request.start_time, - timeout_time=process_request.timeout_time, - results_queue=results_queue, - process_id=process_id, - ) - - try: - asyncio.run(_process_runner()) - except Exception as exc: # noqa: BLE001 - logger.error( - f"Error in worker process {process_id}: {exc}", - exc_info=True, - stack_info=True, - ) + request_session.push_response(response) + return request def process_loop_asynchronous( self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, + queues: MPQueues[RequestT, ResponseT], + prioritize_sessions: bool, max_concurrency: int, process_id: int, ): async def _process_runner(): - pending = asyncio.Semaphore(max_concurrency) - - if pending.locked(): - raise ValueError("Async worker called with max_concurrency < 1") - - while ( - process_request := await self.get_request(requests_queue) - ) is not None: - dequeued_time = time.time() + lock = asyncio.Semaphore(max_concurrency) + pending_sessions: list[RequestSession[RequestT, ResponseT]] = [] - await pending.acquire() + while True: # TODO: Exit condition + await lock.acquire() - def _task_done(_: asyncio.Task): - nonlocal pending - pending.release() + try: + request_session = ( + pending_sessions.pop() + if pending_sessions + else queues.requests.get_nowait() + ) + dequeued_time = time.time() + request_times = queues.times.get() + except (QueueEmpty, IndexError): + lock.release() + continue + + async def wait_then_requeue( + session: RequestSession[RequestT, ResponseT], + ): + # Wait to requeue the request session if it specifies a delay + if delay := session.get_next_delay(): + await asyncio.sleep(delay) + + # Push session to the stack + pending_sessions.append(session) + if prioritize_sessions: + # Release the lock with the session on top of the stack + lock.release() + + async def _request_callback( + session_future: asyncio.Future[RequestSession[RequestT, ResponseT]], + ): + # If we are prioritizing sessions, hold + # the lock until the session is done + nonlocal lock + if not prioritize_sessions: + lock.release() + + session = await session_future + if not session.complete: + asyncio.create_task(wait_then_requeue(session)) + elif prioritize_sessions: + # no more requests in this session, release the lock + lock.release() task = asyncio.create_task( self.resolve_scheduler_request( - request=process_request.request, - queued_time=process_request.queued_time, + request_session=request_session, + queued_time=request_times.queued_time, dequeued_time=dequeued_time, - start_time=process_request.start_time, - timeout_time=process_request.timeout_time, - results_queue=results_queue, + start_time=request_times.start_time, + timeout_time=request_times.timeout_time, + results_queue=queues.responses, process_id=process_id, ) ) - task.add_done_callback(_task_done) - await asyncio.sleep(0) # enable start task immediately + task.add_done_callback(_request_callback) try: asyncio.run(_process_runner()) @@ -309,30 +291,17 @@ async def prepare_multiprocessing(self): """ await self.backend.prepare_multiprocessing() - def process_loop_synchronous( - self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, - process_id: int, - ): - asyncio.run(self.backend.validate()) - super().process_loop_synchronous( - requests_queue=requests_queue, - results_queue=results_queue, - process_id=process_id, - ) - def process_loop_asynchronous( self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, + queues: MPQueues[GenerationRequest, ResponseSummary], + prioritize_sessions: bool, max_concurrency: int, process_id: int, ): asyncio.run(self.backend.validate()) super().process_loop_asynchronous( - requests_queue=requests_queue, - results_queue=results_queue, + queues=queues, + prioritize_sessions=prioritize_sessions, max_concurrency=max_concurrency, process_id=process_id, )