Skip to content

Commit 919c9f7

Browse files
committed
delete input and output files for successful batches
1 parent 84eae2d commit 919c9f7

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

src/bespokelabs/curator/prompter/prompter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def __init__(
5151
top_p: Optional[float] = None,
5252
presence_penalty: Optional[float] = None,
5353
frequency_penalty: Optional[float] = None,
54+
delete_successful_batch_files: bool = True,
55+
delete_failed_batch_files: bool = False, # To allow users to debug failed batches
5456
):
5557
"""Initialize a Prompter.
5658
@@ -99,6 +101,8 @@ def __init__(
99101
top_p=top_p,
100102
presence_penalty=presence_penalty,
101103
frequency_penalty=frequency_penalty,
104+
delete_successful_batch_files=delete_successful_batch_files,
105+
delete_failed_batch_files=delete_failed_batch_files,
102106
)
103107
else:
104108
if batch_size is not None:

src/bespokelabs/curator/request_processor/openai_batch_request_processor.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def __init__(
4040
self,
4141
batch_size: int,
4242
model: str,
43+
delete_successful_batch_files: bool,
44+
delete_failed_batch_files: bool,
4345
temperature: float | None = None,
4446
top_p: float | None = None,
4547
check_interval: int = 10,
@@ -63,6 +65,8 @@ def __init__(
6365
self.presence_penalty: float | None = presence_penalty
6466
self.frequency_penalty: float | None = frequency_penalty
6567
self._file_lock = asyncio.Lock()
68+
self.delete_successful_batch_files: bool = delete_successful_batch_files
69+
self.delete_failed_batch_files: bool = delete_failed_batch_files
6670

6771
def get_rate_limits(self) -> dict:
6872
"""
@@ -324,6 +328,8 @@ async def watch_batches():
324328
check_interval=self.check_interval,
325329
n_submitted_requests=n_submitted_requests,
326330
prompt_formatter=prompt_formatter,
331+
delete_successful_batch_files=self.delete_successful_batch_files,
332+
delete_failed_batch_files=self.delete_failed_batch_files,
327333
)
328334
await batch_watcher.watch()
329335
await batch_watcher.close_client()
@@ -362,6 +368,8 @@ def __init__(
362368
check_interval: int,
363369
prompt_formatter: PromptFormatter,
364370
n_submitted_requests: int,
371+
delete_successful_batch_files: bool,
372+
delete_failed_batch_files: bool,
365373
) -> None:
366374
"""Initialize BatchWatcher with batch objects file and check interval.
367375
@@ -386,6 +394,8 @@ def __init__(
386394
self.remaining_batch_ids = set(self.batch_ids)
387395
self.prompt_formatter = prompt_formatter
388396
self.semaphore = asyncio.Semaphore(MAX_CONCURRENT_BATCH_OPERATIONS)
397+
self.delete_successful_batch_files = delete_successful_batch_files
398+
self.delete_failed_batch_files = delete_failed_batch_files
389399

390400
async def close_client(self):
391401
await self.client.close()
@@ -521,9 +531,28 @@ async def watch(self) -> None:
521531
"Please check the logs above and https://platform.openai.com/batches for errors."
522532
)
523533

534+
async def delete_file(self, file_id: str, semaphore: asyncio.Semaphore):
535+
"""
536+
Delete a file by its ID.
537+
538+
Args:
539+
file_id (str): The ID of the file to delete.
540+
semaphore (asyncio.Semaphore): Semaphore to limit concurrent operations.
541+
"""
542+
async with semaphore:
543+
delete_response = await self.client.files.delete(file_id)
544+
if delete_response.deleted:
545+
logger.info(f"Deleted file {file_id}")
546+
else:
547+
logger.warning(f"Failed to delete file {file_id}")
548+
524549
async def download_batch_to_generic_responses_file(self, batch: Batch) -> str | None:
525550
"""Download the result of a completed batch to file.
526551
552+
To prevent an accumulation of files, we delete the batch input and output files
553+
Without this the 100GB limit for files will be reached very quickly
554+
The user can control this behavior with delete_successful_batch_files and delete_failed_batch_files
555+
527556
Args:
528557
batch: The batch object to download results from.
529558
@@ -537,16 +566,23 @@ async def download_batch_to_generic_responses_file(self, batch: Batch) -> str |
537566
elif batch.status == "failed" and batch.error_file_id:
538567
file_content = await self.client.files.content(batch.error_file_id)
539568
logger.warning(f"Batch {batch.id} failed\n. Errors will be parsed below.")
569+
if self.delete_failed_batch_files:
570+
await self.delete_file(batch.input_file_id, self.semaphore)
571+
await self.delete_file(batch.error_file_id, self.semaphore)
540572
elif batch.status == "failed" and not batch.error_file_id:
541573
errors = "\n".join([str(error) for error in batch.errors.data])
542574
logger.error(
543575
f"Batch {batch.id} failed and likely failed validation. "
544576
f"Batch errors: {errors}. "
545577
f"Check https://platform.openai.com/batches/{batch.id} for more details."
546578
)
579+
if self.delete_failed_batch_files:
580+
await self.delete_file(batch.input_file_id, self.semaphore)
547581
return None
548582
elif batch.status == "cancelled" or batch.status == "expired":
549583
logger.warning(f"Batch {batch.id} was cancelled or expired")
584+
if self.delete_failed_batch_files:
585+
await self.delete_file(batch.input_file_id, self.semaphore)
550586
return None
551587

552588
# Naming is consistent with the request file (e.g. requests_0.jsonl -> responses_0.jsonl)
@@ -627,5 +663,11 @@ async def download_batch_to_generic_responses_file(self, batch: Batch) -> str |
627663
response_cost=cost,
628664
)
629665
f.write(json.dumps(generic_response.model_dump(), default=str) + "\n")
666+
630667
logger.info(f"Batch {batch.id} written to {response_file}")
668+
669+
if self.delete_successful_batch_files:
670+
await self.delete_file(batch.input_file_id, self.semaphore)
671+
await self.delete_file(batch.output_file_id, self.semaphore)
672+
631673
return response_file

0 commit comments

Comments
 (0)