From afd8ba501aff7ecdfe5ffa1ff36e5d3c64db7943 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Tue, 12 Nov 2024 20:30:43 +0000 Subject: [PATCH 01/29] init commit --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 82be6aa6..892e0790 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ __pycache__ .vscode -/src/bespokelabs/curator/viewer/static \ No newline at end of file +# Static / Build +/src/bespokelabs/curator/viewer/static +/dist \ No newline at end of file From 4b5e3056a2b3fce611df27c88d794806def82a2a Mon Sep 17 00:00:00 2001 From: Ryan Marten Date: Tue, 12 Nov 2024 14:24:18 -0800 Subject: [PATCH 02/29] add fix for max files open when sending lots of requests --- .../request_processor/openai_online_request_processor.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 39f94201..400a74d8 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -6,6 +6,7 @@ import time from dataclasses import dataclass, field from typing import Any, Callable, Dict, Optional, Set, Tuple, TypeVar +import resource import aiohttp import requests @@ -163,6 +164,12 @@ def run( Returns: Dataset: Completed dataset """ + # Increase the number of open file descriptors to avoid "Too many open files" errors + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit( + resource.RLIMIT_NOFILE, (min(hard, 10 * max_requests_per_minute), hard) + ) + requests_files = self.create_request_files( dataset, working_dir, prompt_formatter ) From 0d279dc05b85b57abfb67039e43c12a5a34558f6 Mon Sep 17 00:00:00 2001 From: Ryan Marten Date: Tue, 12 Nov 2024 14:26:49 -0800 Subject: [PATCH 03/29] move within process_api --- .../openai_online_request_processor.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 400a74d8..e605369f 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -164,12 +164,6 @@ def run( Returns: Dataset: Completed dataset """ - # Increase the number of open file descriptors to avoid "Too many open files" errors - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - resource.setrlimit( - resource.RLIMIT_NOFILE, (min(hard, 10 * max_requests_per_minute), hard) - ) - requests_files = self.create_request_files( dataset, working_dir, prompt_formatter ) @@ -218,6 +212,13 @@ async def process_api_requests_from_file( resume_no_retry: bool = False, ) -> None: """Processes API requests in parallel, throttling to stay under rate limits.""" + + # Increase the number of open file descriptors to avoid "Too many open files" errors + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit( + resource.RLIMIT_NOFILE, (min(hard, 10 * max_requests_per_minute), hard) + ) + # constants seconds_to_pause_after_rate_limit_error = 15 seconds_to_sleep_each_loop = ( From c5ce56dda5cb8d165c1eb663b8ec8568015cacb8 Mon Sep 17 00:00:00 2001 From: Ryan Marten Date: Wed, 13 Nov 2024 07:52:14 -0800 Subject: [PATCH 04/29] remove file when prompt_func is invalid lint --- .../base_request_processor.py | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 2f555a12..d9bd7cac 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -267,10 +267,17 @@ def create_dataset_files( # parse_func can return a single row or a list of rows if prompt_formatter.parse_func: - dataset_rows = prompt_formatter.parse_func( - response.generic_request.original_row, - response.response_message, - ) + try: + dataset_rows = prompt_formatter.parse_func( + response.generic_request.original_row, + response.response_message, + ) + except Exception as e: + logger.error( + f"Exception raised in your `parse_func`. {error_help}" + ) + os.remove(dataset_file) + raise e if not isinstance(dataset_rows, list): dataset_rows = [dataset_rows] else: @@ -283,10 +290,13 @@ def create_dataset_files( row = row.model_dump() if not isinstance(row, dict): + os.remove(dataset_file) raise ValueError( - f"Got invalid row {row} of type {type(row)} from `parse_func`. {error_help}" + f"Got invalid row {row} of type {type(row)} from `parse_func`. " + f"This should be type . {error_help}" ) if not row: + os.remove(dataset_file) raise ValueError( f"Got empty row {row} from `parse_func`. {error_help}" ) @@ -297,6 +307,7 @@ def create_dataset_files( f"Read {total_responses_count} responses, {failed_responses_count} failed" ) if failed_responses_count == total_responses_count: + os.remove(dataset_file) raise ValueError("All requests failed") logger.info("Finalizing writer") From 783f874369ff6cfcec5c1f7fe8ddc077c9a97da3 Mon Sep 17 00:00:00 2001 From: Ryan Marten Date: Wed, 13 Nov 2024 08:50:29 -0800 Subject: [PATCH 05/29] fix parsing when json is invalid --- .../base_request_processor.py | 17 +++++++++++++++++ .../openai_batch_request_processor.py | 19 ++++++++++--------- .../openai_online_request_processor.py | 7 ++++--- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 2f555a12..3691907d 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -192,6 +192,23 @@ async def acreate_request_file( ) logger.info(f"Wrote {end_idx - start_idx} requests to {request_file}.") + def parse_response_message( + self, response_message: str, response_format: Optional[BaseModel] + ) -> tuple[Any, Optional[list[str]]]: + response_errors = None + if response_format: + try: + response_message = json.loads(response_message) + except json.JSONDecodeError: + logger.warning( + f"Failed to parse response as JSON: {response_message}, skipping this response." + ) + response_message = None + response_errors = [ + f"Failed to parse response as JSON: {response_message}" + ] + return response_message, response_errors + def create_dataset_files( self, working_dir: str, diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index c88ddde7..90184d58 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -413,15 +413,16 @@ async def download_batch_to_generic_responses_file( else: # NOTE(Ryan): can we actually parse the response into a an OpenAI ChatCompletions object? Easier to access fields? # TODO(Ryan): if you add token tokens to generic response - content = raw_response["response"]["body"]["choices"][0][ - "message" - ]["content"] - - if response_format: - content = json.loads(content) - - generic_response.response_message = content - + choices = raw_response["response"]["body"]["choices"] + # Assuming N = 1 + response_message = choices[0]["message"]["content"] + response_message, response_errors = ( + self.parse_response_message( + response_message, response_format + ) + ) + generic_response.response_message = response_message + generic_response.response_errors = response_errors f.write( json.dumps(generic_response.model_dump(), default=str) + "\n" diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index b6aa9cf0..d6127097 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -561,11 +561,12 @@ async def call_api( ) else: response_message = response["choices"][0]["message"]["content"] - if self.generic_request.response_format: - response_message = json.loads(response_message) + response_message, response_errors = self.parse_response_message( + response_message, self.generic_request.response_format + ) generic_response = GenericResponse( response_message=response_message, - response_errors=None, + response_errors=response_errors, raw_request=self.api_specific_request_json, raw_response=response, generic_request=self.generic_request, From ab4b9c8828b12e8b2c0d0d060b1a3248af335d81 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 16:59:04 +0000 Subject: [PATCH 06/29] use strict: True for structured output --- .../curator/request_processor/openai_batch_request_processor.py | 1 + .../request_processor/openai_online_request_processor.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index 90184d58..e82cba68 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -103,6 +103,7 @@ def create_api_specific_request( "messages": generic_request.messages, "response_format": { "type": "json_schema", + "strict": True, "json_schema": { # TODO(ryan): not sure if this should be something else. # TODO(ryan): also not sure if we should use strict: True diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index d6127097..34cd2ca7 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -104,7 +104,7 @@ def create_api_specific_request( request["response_format"] = { "type": "json_schema", "json_schema": { - # TODO(ryan): not sure if we should use strict: True or have name: be something else. + "strict": True, "name": "output_schema", "schema": generic_request.response_format, }, From e58a435c43dfad6b3624e76c01ba68cd070f3c0c Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 17:14:30 +0000 Subject: [PATCH 07/29] remove strict and fix a type error --- .../request_processor/openai_batch_request_processor.py | 5 ++--- .../request_processor/openai_online_request_processor.py | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index e82cba68..d5e78d33 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -3,10 +3,10 @@ import logging import os from typing import Optional, Type, TypeVar -from pydantic import BaseModel -from openai import AsyncOpenAI + import aiofiles from openai import AsyncOpenAI +from pydantic import BaseModel from tqdm import tqdm from bespokelabs.curator.dataset import Dataset @@ -103,7 +103,6 @@ def create_api_specific_request( "messages": generic_request.messages, "response_format": { "type": "json_schema", - "strict": True, "json_schema": { # TODO(ryan): not sure if this should be something else. # TODO(ryan): also not sure if we should use strict: True diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 34cd2ca7..26af7a1d 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -104,7 +104,6 @@ def create_api_specific_request( request["response_format"] = { "type": "json_schema", "json_schema": { - "strict": True, "name": "output_schema", "schema": generic_request.response_format, }, From 904f520019d6c15ba3f0c99ca0b43b95a096dd32 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 17:15:43 +0000 Subject: [PATCH 08/29] fix type error --- .../curator/request_processor/base_request_processor.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 3691907d..3ece980d 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -8,17 +8,15 @@ from typing import Optional import aiofiles +import pyarrow from datasets import Dataset from datasets.arrow_writer import ArrowWriter, SchemaInferenceError from pydantic import BaseModel -import pyarrow from bespokelabs.curator.prompter.prompt_formatter import PromptFormatter from bespokelabs.curator.request_processor.event_loop import run_in_event_loop from bespokelabs.curator.request_processor.generic_request import GenericRequest -from bespokelabs.curator.request_processor.generic_response import ( - GenericResponse, -) +from bespokelabs.curator.request_processor.generic_response import GenericResponse logger = logging.getLogger(__name__) @@ -194,7 +192,7 @@ async def acreate_request_file( def parse_response_message( self, response_message: str, response_format: Optional[BaseModel] - ) -> tuple[Any, Optional[list[str]]]: + ) -> tuple[any, Optional[list[str]]]: response_errors = None if response_format: try: From 8246be1b561abf551f15bdfe66a738c1003850ee Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 17:19:48 +0000 Subject: [PATCH 09/29] move parse_response_message outside of the class --- .../base_request_processor.py | 34 +++++++++---------- .../openai_batch_request_processor.py | 3 +- .../openai_online_request_processor.py | 3 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 3ece980d..ba306122 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -190,23 +190,6 @@ async def acreate_request_file( ) logger.info(f"Wrote {end_idx - start_idx} requests to {request_file}.") - def parse_response_message( - self, response_message: str, response_format: Optional[BaseModel] - ) -> tuple[any, Optional[list[str]]]: - response_errors = None - if response_format: - try: - response_message = json.loads(response_message) - except json.JSONDecodeError: - logger.warning( - f"Failed to parse response as JSON: {response_message}, skipping this response." - ) - response_message = None - response_errors = [ - f"Failed to parse response as JSON: {response_message}" - ] - return response_message, response_errors - def create_dataset_files( self, working_dir: str, @@ -321,3 +304,20 @@ def create_dataset_files( output_dataset = Dataset.from_file(dataset_file) return output_dataset + +def parse_response_message( + response_message: str, response_format: Optional[BaseModel] +) -> tuple[any, Optional[list[str]]]: + response_errors = None + if response_format: + try: + response_message = json.loads(response_message) + except json.JSONDecodeError: + logger.warning( + f"Failed to parse response as JSON: {response_message}, skipping this response." + ) + response_message = None + response_errors = [ + f"Failed to parse response as JSON: {response_message}" + ] + return response_message, response_errors diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index d5e78d33..2f872aea 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -15,6 +15,7 @@ BaseRequestProcessor, GenericRequest, GenericResponse, + parse_response_message, ) from bespokelabs.curator.request_processor.event_loop import run_in_event_loop @@ -417,7 +418,7 @@ async def download_batch_to_generic_responses_file( # Assuming N = 1 response_message = choices[0]["message"]["content"] response_message, response_errors = ( - self.parse_response_message( + parse_response_message( response_message, response_format ) ) diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 26af7a1d..9f623093 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -19,6 +19,7 @@ BaseRequestProcessor, GenericRequest, GenericResponse, + parse_response_message, ) from bespokelabs.curator.request_processor.event_loop import run_in_event_loop @@ -560,7 +561,7 @@ async def call_api( ) else: response_message = response["choices"][0]["message"]["content"] - response_message, response_errors = self.parse_response_message( + response_message, response_errors = parse_response_message( response_message, self.generic_request.response_format ) generic_response = GenericResponse( From 5c47dae989977b34b32efa5cab18689f801e10ee Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 17:26:42 +0000 Subject: [PATCH 10/29] fix typing --- .../curator/request_processor/base_request_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index ba306122..fea6fa67 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -307,7 +307,7 @@ def create_dataset_files( def parse_response_message( response_message: str, response_format: Optional[BaseModel] -) -> tuple[any, Optional[list[str]]]: +) -> tuple[Optional[dict | str], Optional[list[str]]]: response_errors = None if response_format: try: From c97f3c7d084cd813f332f427a05a6d2652ebd0f1 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 17:28:34 +0000 Subject: [PATCH 11/29] black --- .../curator/request_processor/base_request_processor.py | 5 ++++- .../request_processor/openai_batch_request_processor.py | 6 ++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index fea6fa67..47472ff9 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -16,7 +16,9 @@ from bespokelabs.curator.prompter.prompt_formatter import PromptFormatter from bespokelabs.curator.request_processor.event_loop import run_in_event_loop from bespokelabs.curator.request_processor.generic_request import GenericRequest -from bespokelabs.curator.request_processor.generic_response import GenericResponse +from bespokelabs.curator.request_processor.generic_response import ( + GenericResponse, +) logger = logging.getLogger(__name__) @@ -305,6 +307,7 @@ def create_dataset_files( return output_dataset + def parse_response_message( response_message: str, response_format: Optional[BaseModel] ) -> tuple[Optional[dict | str], Optional[list[str]]]: diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index 2f872aea..25b6b0f1 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -417,10 +417,8 @@ async def download_batch_to_generic_responses_file( choices = raw_response["response"]["body"]["choices"] # Assuming N = 1 response_message = choices[0]["message"]["content"] - response_message, response_errors = ( - parse_response_message( - response_message, response_format - ) + response_message, response_errors = parse_response_message( + response_message, response_format ) generic_response.response_message = response_message generic_response.response_errors = response_errors From fce9b2d3daea2313e65e86d1934768dc71d0df7c Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 17:35:20 +0000 Subject: [PATCH 12/29] also catch invalid pydantic schema --- .../base_request_processor.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 47472ff9..ebe129c8 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -10,8 +10,8 @@ import aiofiles import pyarrow from datasets import Dataset -from datasets.arrow_writer import ArrowWriter, SchemaInferenceError -from pydantic import BaseModel +from datasets.arrow_writer import ArrowWriter +from pydantic import BaseModel, ValidationError from bespokelabs.curator.prompter.prompt_formatter import PromptFormatter from bespokelabs.curator.request_processor.event_loop import run_in_event_loop @@ -259,11 +259,19 @@ def create_dataset_files( if prompt_formatter.response_format: # Response message is a string, which is converted to a dict # The dict is then used to construct the response_format Pydantic model - response.response_message = ( - prompt_formatter.response_format( - **response.response_message + try: + response.response_message = ( + prompt_formatter.response_format( + **response.response_message + ) ) - ) + except ValidationError as e: + logger.warning( + f"Pydantic failed to parse response message {response.response_message} with `response_format` {prompt_formatter.response_format}." + f"The model likely returned a JSON that does not match the schema of the `response_format`. Will skip this response." + ) + response.response_message = None + response.response_errors = [str(e)] # parse_func can return a single row or a list of rows if prompt_formatter.parse_func: From c7a50f32de9eb0bdd79bc2040ec641bd6ccf76f5 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Wed, 13 Nov 2024 17:47:00 +0000 Subject: [PATCH 13/29] add warning message to error for pydantic --- .../curator/request_processor/base_request_processor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index ebe129c8..d95f4fc5 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -266,12 +266,16 @@ def create_dataset_files( ) ) except ValidationError as e: - logger.warning( + warning_msg = ( f"Pydantic failed to parse response message {response.response_message} with `response_format` {prompt_formatter.response_format}." f"The model likely returned a JSON that does not match the schema of the `response_format`. Will skip this response." ) + + logger.warning(warning_msg) response.response_message = None - response.response_errors = [str(e)] + response.response_errors = [ + f"{warning_msg}. Original error: {str(e)}" + ] # parse_func can return a single row or a list of rows if prompt_formatter.parse_func: From 96f00f69b5d3cca35aa844267856c286ae976cae Mon Sep 17 00:00:00 2001 From: Ryan Marten Date: Wed, 13 Nov 2024 10:00:15 -0800 Subject: [PATCH 14/29] small error - we need to continue --- .../request_processor/base_request_processor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 617476b9..133ad7ff 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -252,7 +252,8 @@ def create_dataset_files( generic_response_string ) - if response.response_errors: + # response.response_errors is not None IFF response.response_message is None + if response.response_errors is not None: failed_responses_count += 1 continue @@ -270,12 +271,9 @@ def create_dataset_files( f"Pydantic failed to parse response message {response.response_message} with `response_format` {prompt_formatter.response_format}." f"The model likely returned a JSON that does not match the schema of the `response_format`. Will skip this response." ) - logger.warning(warning_msg) - response.response_message = None - response.response_errors = [ - f"{warning_msg}. Original error: {str(e)}" - ] + failed_responses_count += 1 + continue # parse_func can return a single row or a list of rows if prompt_formatter.parse_func: From 2b4edea0d26b57c1516c9c8698e75e023592c04c Mon Sep 17 00:00:00 2001 From: Ryan Marten Date: Wed, 13 Nov 2024 10:04:40 -0800 Subject: [PATCH 15/29] dump schema string --- .../curator/request_processor/base_request_processor.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 133ad7ff..dcc344b7 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -267,8 +267,12 @@ def create_dataset_files( ) ) except ValidationError as e: + schema_str = json.dumps( + prompt_formatter.response_format.model_json_schema(), + indent=2, + ) warning_msg = ( - f"Pydantic failed to parse response message {response.response_message} with `response_format` {prompt_formatter.response_format}." + f"Pydantic failed to parse response message {response.response_message} with `response_format` {schema_str}." f"The model likely returned a JSON that does not match the schema of the `response_format`. Will skip this response." ) logger.warning(warning_msg) From ebe123215ed6c223f64a19b6e73b8c3cbc02c168 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Wed, 13 Nov 2024 22:23:42 +0000 Subject: [PATCH 16/29] fixed and refactored sort and filter --- .../dataset-viewer/DatasetViewer.tsx | 31 ++----------------- .../components/ui/sortable-table.tsx | 25 ++++++++++++--- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx b/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx index d230fa0d..6b0be000 100644 --- a/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx +++ b/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx @@ -33,9 +33,6 @@ interface DatasetViewerProps { export function DatasetViewer({ runHash, batchMode }: DatasetViewerProps) { const [data, setData] = useState([]) - const [sortColumn] = useState(null) - const [sortDirection] = useState<"asc" | "desc">("asc") - const [filters] = useState>({}) const [theme, setTheme] = useState<"light" | "dark">("light") const [mounted, setMounted] = useState(false) const [selectedDistribution, setSelectedDistribution] = useState("total_tokens") @@ -65,30 +62,6 @@ export function DatasetViewer({ runHash, batchMode }: DatasetViewerProps) { localStorage.setItem('theme', theme) }, [theme, mounted]) - const filteredData = useMemo(() => { - const dataArray = Array.isArray(data) ? data : [] - - return dataArray.filter((item) => { - return Object.entries(filters).every(([column, filterValue]) => { - if (!filterValue) return true - const cellValue = getColumnValue(item, column) - return cellValue.toLowerCase().includes(filterValue.toLowerCase()) - }) - }) - }, [data, filters]) - - const sortedData = useMemo(() => { - if (!sortColumn) return filteredData - - return [...filteredData].sort((a, b) => { - const aValue = getColumnValue(a, sortColumn) - const bValue = getColumnValue(b, sortColumn) - - const comparison = aValue.localeCompare(bValue) - return sortDirection === "asc" ? comparison : -comparison - }) - }, [filteredData, sortColumn, sortDirection]) - const fetchNewResponses = useCallback(async () => { if (!runHash) return @@ -262,7 +235,7 @@ export function DatasetViewer({ runHash, batchMode }: DatasetViewerProps) { {selectedDistribution && (
@@ -273,7 +246,7 @@ export function DatasetViewer({ runHash, batchMode }: DatasetViewerProps) { item.raw_response.id} getCellContent={(item, columnKey) => getColumnValue(item, columnKey)} onRowClick={(item) => setSelectedItem(item)} diff --git a/bespoke-dataset-viewer/components/ui/sortable-table.tsx b/bespoke-dataset-viewer/components/ui/sortable-table.tsx index f73f891e..89cd3019 100644 --- a/bespoke-dataset-viewer/components/ui/sortable-table.tsx +++ b/bespoke-dataset-viewer/components/ui/sortable-table.tsx @@ -27,6 +27,10 @@ import { const MAX_VISIBLE_PAGES = 5 +function isNumeric(value: any): boolean { + return !isNaN(parseFloat(value)) && isFinite(value) +} + export function SortableTable({ columns, data, @@ -85,14 +89,25 @@ export function SortableTable({ } }) - // Apply sorting + // Apply sorting with numeric support if (sortColumn) { result.sort((a, b) => { - const aValue = String(getCellContent(a, sortColumn)) - const bValue = String(getCellContent(b, sortColumn)) + const aValue = getCellContent(a, sortColumn) + const bValue = getCellContent(b, sortColumn) + + // Handle numeric sorting + if (isNumeric(aValue) && isNumeric(bValue)) { + const aNum = parseFloat(String(aValue)) + const bNum = parseFloat(String(bValue)) + return sortDirection === "asc" + ? aNum - bNum + : bNum - aNum + } + + // Fall back to string sorting return sortDirection === "asc" - ? aValue.localeCompare(bValue) - : bValue.localeCompare(aValue) + ? String(aValue).localeCompare(String(bValue)) + : String(bValue).localeCompare(String(aValue)) }) } From 7759486ff4b48a5e4ec907f36e486d7bd5d1a929 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Wed, 13 Nov 2024 22:26:19 +0000 Subject: [PATCH 17/29] robust handling for checking whether a given any type value is numeric --- .../components/ui/sortable-table.tsx | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/bespoke-dataset-viewer/components/ui/sortable-table.tsx b/bespoke-dataset-viewer/components/ui/sortable-table.tsx index 89cd3019..54ab55af 100644 --- a/bespoke-dataset-viewer/components/ui/sortable-table.tsx +++ b/bespoke-dataset-viewer/components/ui/sortable-table.tsx @@ -28,7 +28,21 @@ import { const MAX_VISIBLE_PAGES = 5 function isNumeric(value: any): boolean { - return !isNaN(parseFloat(value)) && isFinite(value) + // Handle null/undefined + if (value == null) return false; + + // Handle numbers directly + if (typeof value === 'number') return !isNaN(value); + + // Handle strings that represent numbers + if (typeof value === 'string') { + // Trim whitespace + const trimmed = value.trim(); + if (trimmed === '') return false; + return !isNaN(Number(trimmed)) + } + + return false; } export function SortableTable({ From 4b284a2e3df9ee38ea432ecd34849947ed927c37 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Wed, 13 Nov 2024 22:30:00 +0000 Subject: [PATCH 18/29] robust numeric check, move to utils --- .../components/ui/sortable-table.tsx | 20 +------------------ bespoke-dataset-viewer/lib/utils.ts | 5 +++++ 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/bespoke-dataset-viewer/components/ui/sortable-table.tsx b/bespoke-dataset-viewer/components/ui/sortable-table.tsx index 54ab55af..8b868a2b 100644 --- a/bespoke-dataset-viewer/components/ui/sortable-table.tsx +++ b/bespoke-dataset-viewer/components/ui/sortable-table.tsx @@ -15,7 +15,7 @@ import { SortableTableProps, SortDirection } from "@/types/table" import { Tooltip } from "@/components/ui/tooltip" import { TooltipContent, TooltipProvider, TooltipTrigger } from "@/components/ui/tooltip" import { motion } from "framer-motion" -import { cn } from "@/lib/utils" +import { cn, isNumeric } from "@/lib/utils" import { Pagination, PaginationContent, @@ -27,24 +27,6 @@ import { const MAX_VISIBLE_PAGES = 5 -function isNumeric(value: any): boolean { - // Handle null/undefined - if (value == null) return false; - - // Handle numbers directly - if (typeof value === 'number') return !isNaN(value); - - // Handle strings that represent numbers - if (typeof value === 'string') { - // Trim whitespace - const trimmed = value.trim(); - if (trimmed === '') return false; - return !isNaN(Number(trimmed)) - } - - return false; -} - export function SortableTable({ columns, data, diff --git a/bespoke-dataset-viewer/lib/utils.ts b/bespoke-dataset-viewer/lib/utils.ts index b5d5b1d4..f231a651 100644 --- a/bespoke-dataset-viewer/lib/utils.ts +++ b/bespoke-dataset-viewer/lib/utils.ts @@ -2,6 +2,11 @@ import { type ClassValue, clsx } from "clsx" import { twMerge } from "tailwind-merge" import { DataItem } from "../types/dataset" +export function isNumeric(value: any): boolean { + if (value === null || value === undefined) return false; + return Number.isFinite(Number(value)); +} + export const getColumnValue = (item: DataItem, column: string): string => { if (!item) return "N/A" From e096da093c675f2dea7192a273b5fd6ee3ddd9fa Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Wed, 13 Nov 2024 23:44:14 +0000 Subject: [PATCH 19/29] address empty file in online streaming --- .../app/api/responses/[runHash]/route.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/bespoke-dataset-viewer/app/api/responses/[runHash]/route.ts b/bespoke-dataset-viewer/app/api/responses/[runHash]/route.ts index 8546cd98..14314051 100644 --- a/bespoke-dataset-viewer/app/api/responses/[runHash]/route.ts +++ b/bespoke-dataset-viewer/app/api/responses/[runHash]/route.ts @@ -57,6 +57,18 @@ export async function GET( } else { // Online streaming mode const responsesPath = join(runDir, 'responses_0.jsonl') + + // Check if the file exists before trying to read it + if (!existsSync(responsesPath)) { + return NextResponse.json({ + data: [], + totalLines: 0, + isBatchMode: false, + processedFiles: null, + message: "No responses file found yet" + }) + } + const content = await fs.readFile(responsesPath, 'utf-8') const lines = content.split('\n').filter(line => line.trim() !== '') From 44c56a2639a201c554bf77a9c0716e943f175f24 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Wed, 13 Nov 2024 23:44:31 +0000 Subject: [PATCH 20/29] no data view and actionable message for users --- .../dataset-viewer/DatasetViewer.tsx | 78 +++++++++++++------ 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx b/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx index 6b0be000..87872ddf 100644 --- a/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx +++ b/bespoke-dataset-viewer/components/dataset-viewer/DatasetViewer.tsx @@ -14,8 +14,8 @@ import { cn, getColumnValue } from "@/lib/utils" import { DataItem } from "@/types/dataset" import { Column } from "@/types/table" import { AnimatePresence } from "framer-motion" -import { Loader2 } from "lucide-react" -import { useCallback, useEffect, useMemo, useState } from "react" +import { FileText, Loader2, RefreshCcw } from "lucide-react" +import { useCallback, useEffect, useState } from "react" import { DetailsSidebar } from "./DetailsSidebar" import { DistributionChart } from "./DistributionChart" @@ -31,6 +31,32 @@ interface DatasetViewerProps { batchMode: boolean } +function NoDataView({ batchMode, isPolling }: { batchMode: boolean, isPolling: boolean }) { + return ( +
+
+ +
+

No responses available yet

+

+ {batchMode + ? "Waiting for the first batch to complete. Once finished, responses will appear here in batches." + : "Responses will appear here as they are generated. The table will update automatically. Check if the curator is still running."} +

+
+ {isPolling ? ( + <> + + Polling for new responses... + + ) : ( + Polling is paused + )} +
+
+ ) +} + export function DatasetViewer({ runHash, batchMode }: DatasetViewerProps) { const [data, setData] = useState([]) const [theme, setTheme] = useState<"light" | "dark">("light") @@ -197,29 +223,31 @@ export function DatasetViewer({ runHash, batchMode }: DatasetViewerProps) {

Dataset Details

View and analyze your dataset responses

- - - - - - setSelectedDistribution(null)}> - None - - {["total_tokens", "prompt_tokens", "completion_tokens"].map((column) => ( - setSelectedDistribution(column)}> - {column === "total_tokens" ? "Total Tokens" : - column === "prompt_tokens" ? "Prompt Tokens" : - "Completion Tokens"} + {data.length > 0 && ( + + + + + + setSelectedDistribution(null)}> + None - ))} - - + {["total_tokens", "prompt_tokens", "completion_tokens"].map((column) => ( + setSelectedDistribution(column)}> + {column === "total_tokens" ? "Total Tokens" : + column === "prompt_tokens" ? "Prompt Tokens" : + "Completion Tokens"} + + ))} + + + )} {isInitialLoad ? ( @@ -229,6 +257,8 @@ export function DatasetViewer({ runHash, batchMode }: DatasetViewerProps) {

Loading dataset...

+ ) : data.length === 0 ? ( + ) : ( <>
From 4d66feee22c39c80f71f503c25e2b82696ef1182 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Thu, 14 Nov 2024 00:28:23 +0000 Subject: [PATCH 21/29] cleanup, created a more generic function for this --- build_pkg.py | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/build_pkg.py b/build_pkg.py index 86c886c8..ee8950c7 100644 --- a/build_pkg.py +++ b/build_pkg.py @@ -14,6 +14,25 @@ def npm_install(): run_command("npm install", cwd="bespoke-dataset-viewer") +def copy_with_excludes(source, target, excludes=None): + """Copy files/directories while excluding specified paths""" + if excludes is None: + excludes = [] + + if source.is_file(): + shutil.copy2(source, target) + print(f"Copied file {source} to {target}") + elif source.is_dir(): + if target.exists(): + shutil.rmtree(target) + + def ignore_patterns(path, names): + return [n for n in names if str(Path(path) / n) in excludes] + + shutil.copytree(source, target, ignore=ignore_patterns) + print(f"Copied directory {source} to {target}") + + def nextjs_build(): print("Running Next.js build") run_command("npm run build", cwd="bespoke-dataset-viewer") @@ -28,11 +47,11 @@ def nextjs_build(): shutil.rmtree(target_base) target_base.mkdir(parents=True, exist_ok=True) - # Copy only the necessary files, excluding node_modules + # Files and directories to copy files_to_copy = [ ".next", "app", - "components", + "components", "lib", "public", "types", @@ -46,19 +65,17 @@ def nextjs_build(): "components.json", ] + # Paths to exclude + exclude_paths = [ + str(source_base / ".next" / "cache") + ] + for item in files_to_copy: source = source_base / item target = target_base / item if source.exists(): - if source.is_file(): - shutil.copy2(source, target) - print(f"Copied file {source} to {target}") - elif source.is_dir(): - if target.exists(): - shutil.rmtree(target) - shutil.copytree(source, target) - print(f"Copied directory {source} to {target}") + copy_with_excludes(source, target, exclude_paths) else: print(f"Warning: {source} not found") From 57baacf9c0797089e68da8ecef39de5223c3bdb5 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Thu, 14 Nov 2024 00:28:57 +0000 Subject: [PATCH 22/29] unnecessary whitespace --- build_pkg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build_pkg.py b/build_pkg.py index ee8950c7..f418d66d 100644 --- a/build_pkg.py +++ b/build_pkg.py @@ -51,7 +51,7 @@ def nextjs_build(): files_to_copy = [ ".next", "app", - "components", + "components", "lib", "public", "types", From 082a04660effdc6fb8e6e68b5025db715db3666d Mon Sep 17 00:00:00 2001 From: Ryan Marten Date: Wed, 13 Nov 2024 19:44:52 -0800 Subject: [PATCH 23/29] small bug in batch on schema --- build_pkg.py | 8 +++----- .../request_processor/openai_batch_request_processor.py | 2 +- .../request_processor/openai_online_request_processor.py | 3 ++- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/build_pkg.py b/build_pkg.py index f418d66d..80de2549 100644 --- a/build_pkg.py +++ b/build_pkg.py @@ -25,10 +25,10 @@ def copy_with_excludes(source, target, excludes=None): elif source.is_dir(): if target.exists(): shutil.rmtree(target) - + def ignore_patterns(path, names): return [n for n in names if str(Path(path) / n) in excludes] - + shutil.copytree(source, target, ignore=ignore_patterns) print(f"Copied directory {source} to {target}") @@ -66,9 +66,7 @@ def nextjs_build(): ] # Paths to exclude - exclude_paths = [ - str(source_base / ".next" / "cache") - ] + exclude_paths = [str(source_base / ".next" / "cache")] for item in files_to_copy: source = source_base / item diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index 25b6b0f1..73f336ba 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -108,7 +108,7 @@ def create_api_specific_request( # TODO(ryan): not sure if this should be something else. # TODO(ryan): also not sure if we should use strict: True "name": "output_schema", - "schema": generic_request.response_format.model_json_schema(), + "schema": generic_request.response_format, }, }, } diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 96f8d16e..379d3fe3 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -195,7 +195,8 @@ async def process_generic_requests_from_file( # Increase the number of open file descriptors to avoid "Too many open files" errors soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) resource.setrlimit( - resource.RLIMIT_NOFILE, (min(hard, 10 * max_requests_per_minute), hard) + resource.RLIMIT_NOFILE, + (min(hard, 10 * max_requests_per_minute), hard), ) # constants From 68881cb7266587ab73f9aef5af43e7d5aad35569 Mon Sep 17 00:00:00 2001 From: Charlie Cheng-Jie Ji Date: Thu, 14 Nov 2024 03:59:22 +0000 Subject: [PATCH 24/29] fix typing --- bespoke-dataset-viewer/lib/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bespoke-dataset-viewer/lib/utils.ts b/bespoke-dataset-viewer/lib/utils.ts index f231a651..9dc3a6a0 100644 --- a/bespoke-dataset-viewer/lib/utils.ts +++ b/bespoke-dataset-viewer/lib/utils.ts @@ -2,7 +2,7 @@ import { type ClassValue, clsx } from "clsx" import { twMerge } from "tailwind-merge" import { DataItem } from "../types/dataset" -export function isNumeric(value: any): boolean { +export function isNumeric(value: unknown): boolean { if (value === null || value === undefined) return false; return Number.isFinite(Number(value)); } From 1d2baa49a8fb651fcfe96a617b3b296aa48da844 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Thu, 14 Nov 2024 04:25:32 +0000 Subject: [PATCH 25/29] fix asyncio confusing error handling --- .../curator/request_processor/event_loop.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/event_loop.py b/src/bespokelabs/curator/request_processor/event_loop.py index b920ea75..240f90ca 100644 --- a/src/bespokelabs/curator/request_processor/event_loop.py +++ b/src/bespokelabs/curator/request_processor/event_loop.py @@ -16,11 +16,14 @@ def run_in_event_loop(coroutine): # If there is an event loop running (the call # above doesn't raise an exception), we can # use nest_asyncio to patch the event loop. - nest_asyncio.apply() + nest_asyncio.apply()asd return asyncio.run(coroutine) except RuntimeError as e: - # If no event loop is running, asyncio will - # return a RuntimeError (https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop). - # In that case, we can just use asyncio.run. - return asyncio.run(coroutine) + # Explicitly pass, since we want to fallback to asyncio.run + pass + + # If no event loop is running, asyncio will + # return a RuntimeError (https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop). + # In that case, we can just use asyncio.run. + return asyncio.run(coroutine) \ No newline at end of file From 25f46fd3e542279527fc56f31f3e3d0c7b954a50 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Thu, 14 Nov 2024 04:25:55 +0000 Subject: [PATCH 26/29] remove asd --- src/bespokelabs/curator/request_processor/event_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bespokelabs/curator/request_processor/event_loop.py b/src/bespokelabs/curator/request_processor/event_loop.py index 240f90ca..17140662 100644 --- a/src/bespokelabs/curator/request_processor/event_loop.py +++ b/src/bespokelabs/curator/request_processor/event_loop.py @@ -16,7 +16,7 @@ def run_in_event_loop(coroutine): # If there is an event loop running (the call # above doesn't raise an exception), we can # use nest_asyncio to patch the event loop. - nest_asyncio.apply()asd + nest_asyncio.apply() return asyncio.run(coroutine) except RuntimeError as e: From c05dd2e41f359699505445972d9dba5901ccd6fd Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Thu, 14 Nov 2024 04:27:20 +0000 Subject: [PATCH 27/29] black --- src/bespokelabs/curator/request_processor/event_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bespokelabs/curator/request_processor/event_loop.py b/src/bespokelabs/curator/request_processor/event_loop.py index 17140662..92120e7d 100644 --- a/src/bespokelabs/curator/request_processor/event_loop.py +++ b/src/bespokelabs/curator/request_processor/event_loop.py @@ -26,4 +26,4 @@ def run_in_event_loop(coroutine): # If no event loop is running, asyncio will # return a RuntimeError (https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop). # In that case, we can just use asyncio.run. - return asyncio.run(coroutine) \ No newline at end of file + return asyncio.run(coroutine) From 02f4fcb8397fb3ed87a3551f5b9e28df33c6f2c9 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Thu, 14 Nov 2024 05:08:04 +0000 Subject: [PATCH 28/29] explicitly close batch clients --- .../openai_batch_request_processor.py | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index 73f336ba..c6266b0f 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -134,6 +134,7 @@ def create_api_specific_request( return request async def asubmit_batch(self, batch_file: str) -> dict: + async_client = AsyncOpenAI() # Create a list to store API-specific requests api_specific_requests = [] @@ -147,13 +148,13 @@ async def asubmit_batch(self, batch_file: str) -> dict: # Join requests with newlines and encode to bytes for upload file_content = "\n".join(api_specific_requests).encode() - batch_file_upload = await self.async_client.files.create( + batch_file_upload = await async_client.files.create( file=file_content, purpose="batch" ) logger.info(f"File uploaded: {batch_file_upload}") - batch_object = await self.async_client.batches.create( + batch_object = await async_client.batches.create( input_file_id=batch_file_upload.id, endpoint="/v1/chat/completions", completion_window="24h", @@ -164,7 +165,10 @@ async def asubmit_batch(self, batch_file: str) -> dict: logger.info( f"Batch request submitted, received batch object: {batch_object}" ) - + # Explicitly close the client. Otherwise we get something like + # future: > + await async_client.close() + return batch_object def run( @@ -198,8 +202,6 @@ def run( ) else: # upload requests files and submit batches - self.async_client = AsyncOpenAI() - # asyncio gather preserves order async def submit_all_batches(): tasks = [ @@ -226,18 +228,22 @@ async def submit_all_batches(): # TODO(Ryan): This creates responses_0.jsonl, responses_1.jsonl, etc. errors named same way? or errors_0.jsonl, errors_1.jsonl? # TODO(Ryan): retries, resubmits on lagging batches - need to study this a little closer # TODO(Ryan): likely can add some logic for smarter check_interval based on batch size and if the batch has started or not, fine to do a dumb ping for now - batch_watcher = BatchWatcher( - working_dir, check_interval=self.check_interval - ) - # NOTE(Ryan): If we allow for multiple heterogeneous requests per dataset row, we will need to update this. total_requests = 1 if dataset is None else len(dataset) - run_in_event_loop( - batch_watcher.watch( + async def watch_batches(): + batch_watcher = BatchWatcher( + working_dir, check_interval=self.check_interval + ) + await batch_watcher.watch( prompt_formatter.response_format, total_requests ) - ) + # Explicitly close the client. Otherwise we get something like + # future: > + await batch_watcher.close_client() + + + run_in_event_loop(watch_batches()) dataset = self.create_dataset_files( working_dir, parse_func_hash, prompt_formatter @@ -266,6 +272,9 @@ def __init__(self, working_dir: str, check_interval) -> None: self.check_interval = check_interval self.working_dir = working_dir + async def close_client(self): + await self.client.close() + async def check_batch_status(self, batch_id: str) -> tuple[str, str]: """Check the status of a batch by its ID. From 25c1ffe17036b71633f57c34aadc4a83d1c5eb6f Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Thu, 14 Nov 2024 05:10:13 +0000 Subject: [PATCH 29/29] black --- .../request_processor/openai_batch_request_processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index c6266b0f..cad2eb38 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -168,7 +168,7 @@ async def asubmit_batch(self, batch_file: str) -> dict: # Explicitly close the client. Otherwise we get something like # future: > await async_client.close() - + return batch_object def run( @@ -242,7 +242,6 @@ async def watch_batches(): # future: > await batch_watcher.close_client() - run_in_event_loop(watch_batches()) dataset = self.create_dataset_files(