Skip to content

Commit

Permalink
Merge pull request #264 from bespokelabsai/ryanm/retry-and-content-fi…
Browse files Browse the repository at this point in the history
…lter

Small changes to retry and content filter
  • Loading branch information
RyanMarten authored Dec 16, 2024
2 parents 739c153 + 9fd48cf commit 75b6439
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def run(
self.process_requests_from_file(
generic_request_filepath=request_file,
save_filepath=response_file,
max_attempts=self.max_retries,
resume=True,
)
)
Expand All @@ -233,7 +232,6 @@ async def process_requests_from_file(
self,
generic_request_filepath: str,
save_filepath: str,
max_attempts: int,
resume: bool,
resume_no_retry: bool = False,
) -> None:
Expand Down Expand Up @@ -353,7 +351,7 @@ async def process_requests_from_file(
task_id=status_tracker.num_tasks_started,
generic_request=generic_request,
api_specific_request=self.create_api_specific_request(generic_request),
attempts_left=max_attempts,
attempts_left=self.max_retries,
prompt_formatter=self.prompt_formatter,
)

Expand Down Expand Up @@ -406,10 +404,10 @@ async def process_requests_from_file(
token_estimate = self.estimate_total_tokens(
retry_request.generic_request.messages
)
attempt_number = 1 + self.max_retries - retry_request.attempts_left
logger.info(
f"Processing retry for request {retry_request.task_id} "
f"(attempt #{attempt_number} of {self.max_retries}). "
attempt_number = self.max_retries - retry_request.attempts_left
logger.debug(
f"Retrying request {retry_request.task_id} "
f"(attempt #{attempt_number} of {self.max_retries})"
f"Previous errors: {retry_request.result}"
)

Expand Down Expand Up @@ -482,18 +480,15 @@ async def handle_single_request_with_retries(
status_tracker.pbar.update(1)

except Exception as e:
logger.warning(
f"Request {request.task_id} failed with Exception {e}, attempts left {request.attempts_left}"
)
status_tracker.num_other_errors += 1
request.result.append(e)

if request.attempts_left > 0:
request.attempts_left -= 1
# Add retry queue logging
logger.info(
f"Adding request {request.task_id} to retry queue. Will retry in next available slot. "
f"Attempts remaining: {request.attempts_left}"
logger.warning(
f"Encountered '{e.__class__.__name__}: {e}' during attempt "
f"{self.max_retries - request.attempts_left} of {self.max_retries} "
f"while processing request {request.task_id}"
)
retry_queue.put_nowait(request)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ def run(
"""
pass

def _verify_existing_request_files(self, working_dir: str, dataset: Optional[Dataset]) -> List[int]:
def _verify_existing_request_files(
self, working_dir: str, dataset: Optional[Dataset]
) -> List[int]:
"""
Verify integrity of the cache (each request file has associated metadata, and the number of rows is correct),
and return the indices of request files that need to be regenerated (so that no work is repeated).
Args:
working_dir (str): Working directory where cache files are expected to be (requests.jsonl, metadata.json)
dataset (Optional[Dataset]): The dataset that we want to create requests from
Returns:
List[int]: Indices of missing files
"""
Expand Down Expand Up @@ -114,11 +116,15 @@ def _verify_existing_request_files(self, working_dir: str, dataset: Optional[Dat
if num_jobs != expected_num_jobs:
incomplete_files.append(i)

logger.info(f"Cache missing {len(incomplete_files)} complete request files - regenerating missing ones.")
logger.info(
f"Cache missing {len(incomplete_files)} complete request files - regenerating missing ones."
)
return incomplete_files

except:
logger.info("Cache verification failed for unexpected reasons - regenerating all request files.")
logger.info(
"Cache verification failed for unexpected reasons - regenerating all request files."
)
incomplete_files = list(range(expected_num_files))
return incomplete_files

Expand Down Expand Up @@ -195,13 +201,16 @@ async def create_all_request_files():
metadata_files[i],
start_idx=i * self.batch_size,
)
for i in range(num_batches) if i in incomplete_files
for i in range(num_batches)
if i in incomplete_files
]
await asyncio.gather(*tasks)

run_in_event_loop(create_all_request_files())
else:
run_in_event_loop(self.acreate_request_file(dataset, prompt_formatter, request_file, metadata_file))
run_in_event_loop(
self.acreate_request_file(dataset, prompt_formatter, request_file, metadata_file)
)

return request_files

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,16 @@ async def call_single_request(
except litellm.NotFoundError as e:
cost = 0

finish_reason = completion_obj.choices[0].finish_reason
if finish_reason != "stop":
logger.debug(
f"finish_reason {finish_reason} was not 'stop' with raw response {completion_obj.model_dump()} for request {request.generic_request.messages}"
)
raise ValueError(f"finish_reason was {finish_reason}")

if response_message is None:
raise ValueError(
f"Request {request.task_id} returned no response message with raw response {completion_obj.model_dump()}"
f"response_message was None with raw response {completion_obj.model_dump()}"
)

# Create and return response
Expand Down

0 comments on commit 75b6439

Please sign in to comment.