From 50ee9e79e5b2f691c70e1d25872065fe0e542265 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Wed, 4 Dec 2024 02:12:28 +0000 Subject: [PATCH] avoid sequentially process the retried entries. do parallel async --- .../request_processor/base_online_request_processor.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_online_request_processor.py b/src/bespokelabs/curator/request_processor/base_online_request_processor.py index a710d21f..7bdd129f 100644 --- a/src/bespokelabs/curator/request_processor/base_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_online_request_processor.py @@ -329,6 +329,7 @@ async def process_requests_from_file( await asyncio.gather(*pending_requests) # Process any remaining retries in the queue + pending_retries = [] while not queue_of_requests_to_retry.empty(): retry_request = await queue_of_requests_to_retry.get() token_estimate = self.estimate_total_tokens(retry_request.generic_request.messages) @@ -356,7 +357,12 @@ async def process_requests_from_file( status_tracker=status_tracker, ) ) - await task + pending_retries.append(task) + await asyncio.sleep(0.1) # Allow other tasks to run + + # Wait for all retry tasks to complete + if pending_retries: + await asyncio.gather(*pending_retries) status_tracker.pbar.close() @@ -406,7 +412,7 @@ async def handle_single_request_with_retries( except Exception as e: logger.warning( - f"Request {request.task_id} failed with Exception {e}, attempts left {request.attempts_left-1}" + f"Request {request.task_id} failed with Exception {e}, attempts left {request.attempts_left}" ) status_tracker.num_other_errors += 1 request.result.append(e)