diff --git a/.gitignore b/.gitignore index 892e0790..e95f9df8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .venv +.DS_Store __pycache__ .vscode diff --git a/README.md b/README.md index b7bf7940..81224738 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@

-### Overview +## Overview Bespoke Curator makes it very easy to create high-quality synthetic data at scale, which you can use to finetune models or use for structured data extraction at scale. @@ -38,7 +38,7 @@ Bespoke Curator is an open-source project: * A Curator Viewer which makes it easy to view the datasets, thus aiding in the dataset creation. * We will also be releasing high-quality datasets that should move the needle on post-training. -### Key Features +## Key Features 1. **Programmability and Structured Outputs**: Synthetic data generation is lot more than just using a single prompt -- it involves calling LLMs multiple times and orchestrating control-flow. Curator treats structured outputs as first class citizens and helps you design complex pipelines. 2. **Built-in Performance Optimization**: We often see calling LLMs in loops, or inefficient implementation of multi-threading. We have baked in performance optimizations so that you don't need to worry about those! @@ -46,48 +46,91 @@ Bespoke Curator is an open-source project: 4. **Native HuggingFace Dataset Integration**: Work directly on HuggingFace Dataset objects throughout your pipeline. Your synthetic data is immediately ready for fine-tuning! 5. **Interactive Curator Viewer**: Improve and iterate on your prompts using our built-in viewer. Inspect LLM requests and responses in real-time, allowing you to iterate and refine your data generation strategy with immediate feedback. -### Installation +## Installation ```bash pip install bespokelabs-curator ``` -### Usage +## Usage +To run the examples below, make sure to set your OpenAI API key in +the environment variable `OPENAI_API_KEY` by running `export OPENAI_API_KEY=sk-...` in your terminal. + +### Hello World with `SimpleLLM`: A simple interface for calling LLMs + +```python +from bespokelabs import curator +llm = curator.SimpleLLM(model_name="gpt-4o-mini") +poem = llm("Write a poem about the importance of data in AI.") +print(poem) +# Or you can pass a list of prompts to generate multiple responses. +poems = llm(["Write a poem about the importance of data in AI.", + "Write a haiku about the importance of data in AI."]) +print(poems) +``` +Note that retries and caching are enabled by default. +So now if you run the same prompt again, you will get the same response, pretty much instantly. +You can delete the cache at `~/.cache/curator`. + +#### Use LiteLLM backend for calling other models +You can use the [LiteLLM](https://docs.litellm.ai/docs/providers) backend for calling other models. +```python +from bespokelabs import curator +llm = curator.SimpleLLM(model_name="claude-3-5-sonnet-20240620", backend="litellm") +poem = llm("Write a poem about the importance of data in AI.") +print(poem) +``` + +### Visualize in Curator Viewer +Run `curator-viewer` on the command line to see the dataset in the viewer. + +You can click on a run and then click on a specific row to see the LLM request and response. +![Curator Responses](docs/curator-responses.png) +More examples below. + +### `LLM`: A more powerful interface for synthetic data generation + +Let's use structured outputs to generate poems. ```python from bespokelabs import curator from datasets import Dataset from pydantic import BaseModel, Field from typing import List -# Create a dataset object for the topics you want to create the poems. topics = Dataset.from_dict({"topic": [ "Urban loneliness in a bustling city", "Beauty of Bespoke Labs's Curator library" ]}) +``` -# Define a class to encapsulate a list of poems. +Define a class to encapsulate a list of poems. +```python class Poem(BaseModel): poem: str = Field(description="A poem.") class Poems(BaseModel): poems_list: List[Poem] = Field(description="A list of poems.") +``` - -# We define an `LLM` object that generates poems which gets applied to the topics dataset. +We define an `LLM` object that generates poems which gets applied to the topics dataset. +```python poet = curator.LLM( - # `prompt_func` takes a row of the dataset as input. - # `row` is a dictionary with a single key 'topic' in this case. prompt_func=lambda row: f"Write two poems about {row['topic']}.", model_name="gpt-4o-mini", response_format=Poems, - # `row` is the input row, and `poems` is the `Poems` class which - # is parsed from the structured output from the LLM. parse_func=lambda row, poems: [ {"topic": row["topic"], "poem": p.poem} for p in poems.poems_list ], ) +``` +Here: +* `prompt_func` takes a row of the dataset as input and returns the prompt for the LLM. +* `response_format` is the structured output class we defined above. +* `parse_func` takes the input (`row`) and the structured output (`poems`) and converts it to a list of dictionaries. This is so that we can easily convert the output to a HuggingFace Dataset object. +Now we can apply the `LLM` object to the dataset, which reads very pythonic. +```python poem = poet(topics) print(poem.to_pandas()) # Example output: @@ -102,9 +145,6 @@ and we can scale this up to create tens of thousands of diverse poems. You can see a more detailed example in the [examples/poem.py](https://github.com/bespokelabsai/curator/blob/mahesh/update_doc/examples/poem.py) file, and other examples in the [examples](https://github.com/bespokelabsai/curator/blob/mahesh/update_doc/examples) directory. -To run the examples, make sure to set your OpenAI API key in -the environment variable `OPENAI_API_KEY` by running `export OPENAI_API_KEY=sk-...` in your terminal. - See the [docs](https://docs.bespokelabs.ai/) for more details as well as for troubleshooting information. @@ -118,6 +158,12 @@ curator-viewer This will pop up a browser window with the viewer running on `127.0.0.1:3000` by default if you haven't specified a different host and port. +The dataset viewer shows all the different runs you have made. +![Curator Runs](docs/curator-runs.png) + +You can also see the dataset and the responses from the LLM. +![Curator Dataset](docs/curator-dataset.png) + Optional parameters to run the viewer on a different host and port: ```bash diff --git a/docs/curator-dataset.png b/docs/curator-dataset.png new file mode 100644 index 00000000..33138ac3 Binary files /dev/null and b/docs/curator-dataset.png differ diff --git a/docs/curator-responses.png b/docs/curator-responses.png new file mode 100644 index 00000000..a78277e0 Binary files /dev/null and b/docs/curator-responses.png differ diff --git a/docs/curator-runs.png b/docs/curator-runs.png new file mode 100644 index 00000000..d076d9b1 Binary files /dev/null and b/docs/curator-runs.png differ diff --git a/examples/simple_poem.py b/examples/simple_poem.py new file mode 100644 index 00000000..8b1f5106 --- /dev/null +++ b/examples/simple_poem.py @@ -0,0 +1,25 @@ +"""Curator example that uses `SimpleLLM` to generate poems. + +Please see the poem.py for more complex use cases. +""" + +from bespokelabs import curator + +# Use GPT-4o-mini for this example. +llm = curator.SimpleLLM(model_name="gpt-4o-mini") +poem = llm("Write a poem about the importance of data in AI.") +print(poem) + +# Use Claude 3.5 Sonnet for this example. +llm = curator.SimpleLLM(model_name="claude-3-5-sonnet-20240620", backend="litellm") +poem = llm("Write a poem about the importance of data in AI.") +print(poem) + +# Note that we can also pass a list of prompts to generate multiple responses. +poems = llm( + [ + "Write a sonnet about the importance of data in AI.", + "Write a haiku about the importance of data in AI.", + ] +) +print(poems) diff --git a/src/bespokelabs/curator/__init__.py b/src/bespokelabs/curator/__init__.py index a9d4cc6e..5ef73092 100644 --- a/src/bespokelabs/curator/__init__.py +++ b/src/bespokelabs/curator/__init__.py @@ -1,2 +1,3 @@ from .dataset import Dataset from .llm.llm import LLM +from .llm.simple_llm import SimpleLLM diff --git a/src/bespokelabs/curator/file_utilities.py b/src/bespokelabs/curator/file_utilities.py new file mode 100644 index 00000000..6ee606e7 --- /dev/null +++ b/src/bespokelabs/curator/file_utilities.py @@ -0,0 +1,14 @@ +# https://stackoverflow.com/questions/845058/how-to-get-the-line-count-of-a-large-file-cheaply-in-python +# https://stackoverflow.com/a/68385697 +def _file_gen(reader): + b = reader(1024 * 1024) + while b: + yield b + b = reader(1024 * 1024) + + +# Instead of requiring counting lines, we can store metadata file that has the number of requests in each file +def count_lines(filename): + f = open(filename, "rb") + f_gen = _file_gen(f.raw.read) + return sum(buf.count(b"\n") for buf in f_gen) diff --git a/src/bespokelabs/curator/llm/llm.py b/src/bespokelabs/curator/llm/llm.py index e5eadaa5..dcf3df83 100644 --- a/src/bespokelabs/curator/llm/llm.py +++ b/src/bespokelabs/curator/llm/llm.py @@ -29,6 +29,7 @@ _CURATOR_DEFAULT_CACHE_DIR = "~/.cache/curator" T = TypeVar("T") +_DictOrBaseModel = Union[Dict[str, Any], BaseModel] logger = logger = logging.getLogger(__name__) @@ -36,62 +37,12 @@ class LLM: """Interface for prompting LLMs.""" - @staticmethod - def _determine_backend( - model_name: str, response_format: Optional[Type[BaseModel]] = None - ) -> str: - """Determine which backend to use based on model name and response format. - - Args: - model_name (str): Name of the model - response_format (Optional[Type[BaseModel]]): Response format if specified - - Returns: - str: Backend to use ("openai" or "litellm") - """ - model_name = model_name.lower() - - # GPT-4o models with response format should use OpenAI - if ( - response_format - and OpenAIOnlineRequestProcessor(model_name).check_structured_output_support() - ): - logger.info(f"Requesting structured output from {model_name}, using OpenAI backend") - return "openai" - - # GPT models and O1 models without response format should use OpenAI - if not response_format and any(x in model_name for x in ["gpt-", "o1-preview", "o1-mini"]): - logger.info(f"Requesting text output from {model_name}, using OpenAI backend") - return "openai" - - # Default to LiteLLM for all other cases - logger.info( - f"Requesting {f'structured' if response_format else 'text'} output from {model_name}, using LiteLLM backend" - ) - return "litellm" - - @staticmethod - def _convert_response_to_dict(response): - if hasattr(response, "model_dump"): - return response.model_dump() - elif isinstance(response, dict): - return response - elif hasattr(response, "__dict__"): - return response.__dict__ - return response - def __init__( self, model_name: str, - prompt_func: Callable[[Union[Dict[str, Any], BaseModel]], Dict[str, str]], + prompt_func: Callable[[_DictOrBaseModel], _DictOrBaseModel], parse_func: Optional[ - Callable[ - [ - Union[Dict[str, Any], BaseModel], - Union[Dict[str, Any], BaseModel], - ], - T, - ] + Callable[[_DictOrBaseModel, _DictOrBaseModel], _DictOrBaseModel] ] = None, response_format: Optional[Type[BaseModel]] = None, backend: Optional[str] = None, @@ -106,6 +57,8 @@ def __init__( top_p: Optional[float] = None, presence_penalty: Optional[float] = None, frequency_penalty: Optional[float] = None, + max_retries: Optional[int] = None, + require_all_responses: Optional[bool] = None, ): """Initialize a LLM. @@ -127,6 +80,8 @@ def __init__( top_p: The top_p to use for the LLM, only used if batch is False presence_penalty: The presence_penalty to use for the LLM, only used if batch is False frequency_penalty: The frequency_penalty to use for the LLM, only used if batch is False + max_retries: The maximum number of retries to use for the LLM + require_all_responses: Whether to require all responses """ self.prompt_formatter = PromptFormatter( model_name, prompt_func, parse_func, response_format @@ -162,6 +117,8 @@ def __init__( frequency_penalty=frequency_penalty, delete_successful_batch_files=delete_successful_batch_files, delete_failed_batch_files=delete_failed_batch_files, + max_retries=max_retries, + require_all_responses=require_all_responses, ) else: if batch_size is not None: @@ -176,6 +133,8 @@ def __init__( frequency_penalty=frequency_penalty, max_requests_per_minute=max_requests_per_minute, max_tokens_per_minute=max_tokens_per_minute, + max_retries=max_retries, + require_all_responses=require_all_responses, ) elif self.backend == "litellm": if batch: @@ -190,10 +149,46 @@ def __init__( frequency_penalty=frequency_penalty, max_requests_per_minute=max_requests_per_minute, max_tokens_per_minute=max_tokens_per_minute, + max_retries=max_retries, + require_all_responses=require_all_responses, ) else: raise ValueError(f"Unknown backend: {self.backend}") + @staticmethod + def _determine_backend( + model_name: str, response_format: Optional[Type[BaseModel]] = None + ) -> str: + """Determine which backend to use based on model name and response format. + + Args: + model_name (str): Name of the model + response_format (Optional[Type[BaseModel]]): Response format if specified + + Returns: + str: Backend to use ("openai" or "litellm") + """ + model_name = model_name.lower() + + # GPT-4o models with response format should use OpenAI + if ( + response_format + and OpenAIOnlineRequestProcessor(model_name).check_structured_output_support() + ): + logger.info(f"Requesting structured output from {model_name}, using OpenAI backend") + return "openai" + + # GPT models and O1 models without response format should use OpenAI + if not response_format and any(x in model_name for x in ["gpt-", "o1-preview", "o1-mini"]): + logger.info(f"Requesting text output from {model_name}, using OpenAI backend") + return "openai" + + # Default to LiteLLM for all other cases + logger.info( + f"Requesting {f'structured' if response_format else 'text'} output from {model_name}, using LiteLLM backend" + ) + return "litellm" + def __call__( self, dataset: Optional[Iterable] = None, diff --git a/src/bespokelabs/curator/llm/simple_llm.py b/src/bespokelabs/curator/llm/simple_llm.py new file mode 100644 index 00000000..7cc62dd0 --- /dev/null +++ b/src/bespokelabs/curator/llm/simple_llm.py @@ -0,0 +1,33 @@ +from bespokelabs.curator.llm.llm import LLM +from datasets import Dataset +from typing import Union, List + + +class SimpleLLM: + """A simpler interface for the LLM class. + + Usage: + llm = SimpleLLM(model_name="gpt-4o-mini") + llm("Do you know about the bitter lesson?") + llm(["What is the capital of France?", "What is the capital of Germany?"]) + For more complex use cases (e.g. structured outputs and custom prompt functions), see the LLM class. + """ + + def __init__(self, model_name: str, backend: str = "openai"): + self._model_name = model_name + self._backend = backend + + def __call__(self, prompt: Union[str, List[str]]) -> Union[str, List[str]]: + prompt_list = [prompt] if isinstance(prompt, str) else prompt + dataset: Dataset = Dataset.from_dict({"prompt": prompt_list}) + + llm = LLM( + prompt_func=lambda row: row["prompt"], + model_name=self._model_name, + response_format=None, + backend=self._backend, + ) + response = llm(dataset) + if isinstance(prompt, str): + return response["response"][0] + return response["response"] diff --git a/src/bespokelabs/curator/request_processor/base_online_request_processor.py b/src/bespokelabs/curator/request_processor/base_online_request_processor.py index af7d2ae5..79fb2189 100644 --- a/src/bespokelabs/curator/request_processor/base_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_online_request_processor.py @@ -39,8 +39,8 @@ logger = logging.getLogger(__name__) -DEFAULT_REQUESTS_PER_MINUTE = 100 -DEFAULT_TOKENS_PER_MINUTE = 100_000 +DEFAULT_MAX_REQUESTS_PER_MINUTE = 100 +DEFAULT_MAX_TOKENS_PER_MINUTE = 100_000 # Create a shared console instance console = Console() @@ -51,6 +51,7 @@ format="%(asctime)s - %(levelname)s - %(message)s", handlers=[RichHandler(console=console, rich_tracebacks=True)] ) +DEFAULT_MAX_RETRIES = 10 @dataclass @@ -371,53 +372,58 @@ def __init__( frequency_penalty: Optional[float] = None, max_requests_per_minute: Optional[int] = None, max_tokens_per_minute: Optional[int] = None, + require_all_responses: bool = None, + max_retries: Optional[int] = None, ): - super().__init__(batch_size=None) + super().__init__(batch_size=None, require_all_responses=require_all_responses) self.model: str = model self.temperature: float | None = temperature self.top_p: float | None = top_p self.presence_penalty: float | None = presence_penalty self.frequency_penalty: float | None = frequency_penalty self.prompt_formatter: Optional[PromptFormatter] = None - self.max_requests_per_minute: Optional[int] = max_requests_per_minute - self.max_tokens_per_minute: Optional[int] = max_tokens_per_minute - self.DEFAULT_MAX_REQUESTS_PER_MINUTE = DEFAULT_REQUESTS_PER_MINUTE - self.DEFAULT_MAX_TOKENS_PER_MINUTE = DEFAULT_TOKENS_PER_MINUTE - - def get_rate_limit(self, name, header_value): - """Uses manual values if set, otherwise uses headers if available, and if not available uses defaults.""" - manual_value = getattr(self, name) - default_value = getattr(self, f"DEFAULT_{name.upper()}") - if manual_value is not None: - logger.info(f"Manually set {name} to {manual_value}") - return manual_value - elif header_value != 0: - logger.info(f"Automatically set {name} to {header_value}") - return header_value + self.manual_max_requests_per_minute: Optional[int] = max_requests_per_minute + self.manual_max_tokens_per_minute: Optional[int] = max_tokens_per_minute + if max_retries is None: + self.max_retries = DEFAULT_MAX_RETRIES + else: + self.max_retries = max_retries + + @property + def max_requests_per_minute(self) -> int: + if self.manual_max_requests_per_minute: + logger.info( + f"Manually set max_requests_per_minute to {self.manual_max_requests_per_minute}" + ) + return self.manual_max_requests_per_minute + elif self.header_based_max_requests_per_minute: + logger.info( + f"Automatically set max_requests_per_minute to {self.header_based_max_requests_per_minute}" + ) + return self.header_based_max_requests_per_minute else: logger.warning( - f"No manual {name} set, and headers based detection failed, using default value of {default_value}" + f"No manual max_requests_per_minute set, and headers based detection failed, using default value of {DEFAULT_MAX_REQUESTS_PER_MINUTE}" ) - return default_value - - def get_rate_limits(self) -> dict: - """Get rate limits for the API. Returns a dictionary with max_requests_per_minute and max_tokens_per_minute""" + return DEFAULT_MAX_REQUESTS_PER_MINUTE - # Get values from headers - header_based_rate_limits = self.get_header_based_rate_limits() - header_tpm = header_based_rate_limits["max_tokens_per_minute"] - header_rpm = header_based_rate_limits["max_requests_per_minute"] - - # Determine final rate limit - tpm = self.get_rate_limit("max_tokens_per_minute", header_tpm) - rpm = self.get_rate_limit("max_requests_per_minute", header_rpm) - - return {"max_requests_per_minute": rpm, "max_tokens_per_minute": tpm} - - @abstractmethod - def get_header_based_rate_limits(self) -> dict: - """Get rate limits for the API from headers. Returns a dictionary with max_requests_per_minute and max_tokens_per_minute""" - pass + @property + def max_tokens_per_minute(self) -> int: + if self.manual_max_tokens_per_minute: + logger.info( + f"Manually set max_tokens_per_minute to {self.manual_max_tokens_per_minute}" + ) + return self.manual_max_tokens_per_minute + elif self.header_based_max_tokens_per_minute: + logger.info( + f"Automatically set max_tokens_per_minute to {self.header_based_max_tokens_per_minute}" + ) + return self.header_based_max_tokens_per_minute + else: + logger.warning( + f"No manual max_tokens_per_minute set, and headers based detection failed, using default value of {DEFAULT_MAX_TOKENS_PER_MINUTE}" + ) + return DEFAULT_MAX_TOKENS_PER_MINUTE @abstractmethod def estimate_total_tokens(self, messages: list) -> int: @@ -460,7 +466,7 @@ def run( self.process_requests_from_file( generic_request_filepath=request_file, save_filepath=response_file, - max_attempts=5, + max_attempts=self.max_retries, resume=True, ) ) @@ -625,21 +631,21 @@ async def process_requests_from_file( if pending_requests: await asyncio.gather(*pending_requests) - # Process any remaining retries in the queue - pending_retries = set() - while not queue_of_requests_to_retry.empty() or pending_retries: - # Process new items from the queue if we have capacity - if not queue_of_requests_to_retry.empty(): - retry_request = await queue_of_requests_to_retry.get() - token_estimate = self.estimate_total_tokens( - retry_request.generic_request.messages - ) - attempt_number = 6 - retry_request.attempts_left - logger.info( - f"Processing retry for request {retry_request.task_id} " - f"(attempt #{attempt_number} of 5). " - f"Previous errors: {retry_request.result}" - ) + # Process any remaining retries in the queue + pending_retries = set() + while not queue_of_requests_to_retry.empty() or pending_retries: + # Process new items from the queue if we have capacity + if not queue_of_requests_to_retry.empty(): + retry_request = await queue_of_requests_to_retry.get() + token_estimate = self.estimate_total_tokens( + retry_request.generic_request.messages + ) + attempt_number = 1 + self.max_retries - retry_request.attempts_left + logger.info( + f"Processing retry for request {retry_request.task_id} " + f"(attempt #{attempt_number} of {self.max_retries}). " + f"Previous errors: {retry_request.result}" + ) # Wait for capacity if needed while not status_tracker.has_capacity(token_estimate): @@ -731,7 +737,7 @@ async def handle_single_request_with_retries( retry_queue.put_nowait(request) else: logger.error( - f"Request {request.task_id} failed permanently after exhausting all 5 retry attempts. " + f"Request {request.task_id} failed permanently after exhausting all {self.max_retries} retry attempts. " f"Errors: {[str(e) for e in request.result]}" ) generic_response = GenericResponse( diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index b5163e34..b9fb09f4 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -14,6 +14,7 @@ from datasets.arrow_writer import ArrowWriter from pydantic import BaseModel, ValidationError +from bespokelabs.curator.file_utilities import count_lines from bespokelabs.curator.llm.prompt_formatter import PromptFormatter from bespokelabs.curator.request_processor.event_loop import run_in_event_loop from bespokelabs.curator.request_processor.generic_request import GenericRequest @@ -29,8 +30,9 @@ class BaseRequestProcessor(ABC): Base class for all request processors. """ - def __init__(self, batch_size: Optional[int] = None): + def __init__(self, batch_size: Optional[int] = None, require_all_responses: bool = True): self.batch_size = batch_size + self.require_all_responses = require_all_responses # Increase the number of open file descriptors to avoid "Too many open files" errors soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) desired_limit = min(10_000_000, hard) @@ -39,16 +41,6 @@ def __init__(self, batch_size: Optional[int] = None): ) resource.setrlimit(resource.RLIMIT_NOFILE, (desired_limit, hard)) - @abstractmethod - def get_rate_limits(self) -> dict: - """ - Returns the rate limits for the API. - - Returns: - dict: A dictionary containing the rate limit information. - """ - pass - @abstractmethod def create_api_specific_request(self, generic_request: GenericRequest) -> dict: """ @@ -216,9 +208,6 @@ def create_dataset_files( Returns: Dataset: Completed dataset """ - total_responses_count = 0 - failed_responses_count = 0 - responses_files = glob.glob(f"{working_dir}/responses_*.jsonl") if len(responses_files) == 0: raise ValueError(f"No responses files found in {working_dir}") @@ -230,6 +219,8 @@ def create_dataset_files( ) # Process all response files + total_responses_count = 0 + failed_responses_count = 0 dataset_file = f"{working_dir}/{parse_func_hash}.arrow" with ArrowWriter(path=dataset_file) as writer: for responses_file in responses_files: @@ -319,14 +310,35 @@ def create_dataset_files( writer.write(row) - logger.info(f"Read {total_responses_count} responses, {failed_responses_count} failed") + logger.info("Finalizing writer") + writer.finalize() + + logger.info(f"Read {total_responses_count} responses.") if failed_responses_count == total_responses_count: os.remove(dataset_file) raise ValueError("All requests failed") - logger.info("Finalizing writer") + if failed_responses_count > 0: + logger.warning(f"{failed_responses_count} requests failed.") + if self.require_all_responses: + os.remove(dataset_file) + raise ValueError(f"Some requests failed and require_all_responses is True") - writer.finalize() + # number of responses matches number of requests + request_files = glob.glob(f"{working_dir}/requests_*.jsonl") + n_requests = 0 + for request_file in request_files: + n_requests += count_lines(request_file) + + if n_requests != total_responses_count: + logger.warning( + f"{n_requests - total_responses_count} requests do not have responses. n_requests is {n_requests} and n_responses is {total_responses_count}" + ) + if self.require_all_responses: + os.remove(dataset_file) + raise ValueError( + f"Some requests do not have responses and require_all_responses is True." + ) return Dataset.from_file(dataset_file) diff --git a/src/bespokelabs/curator/request_processor/litellm_online_request_processor.py b/src/bespokelabs/curator/request_processor/litellm_online_request_processor.py index 0881ea74..e0a3788f 100644 --- a/src/bespokelabs/curator/request_processor/litellm_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/litellm_online_request_processor.py @@ -49,6 +49,8 @@ def __init__( frequency_penalty: Optional[float] = None, max_requests_per_minute: Optional[int] = None, max_tokens_per_minute: Optional[int] = None, + require_all_responses: bool = False, + max_retries: Optional[int] = None, ): super().__init__( model=model, @@ -58,8 +60,13 @@ def __init__( frequency_penalty=frequency_penalty, max_requests_per_minute=max_requests_per_minute, max_tokens_per_minute=max_tokens_per_minute, + require_all_responses=require_all_responses, + max_retries=max_retries, ) self.client = instructor.from_litellm(litellm.acompletion) + self.header_based_max_requests_per_minute, self.header_based_max_tokens_per_minute = ( + self.get_header_based_rate_limits() + ) def check_structured_output_support(self): """Verify if the model supports structured output via instructor. @@ -154,11 +161,11 @@ def test_call(self): logger.info(f"Test call headers: {headers}") return headers - def get_header_based_rate_limits(self) -> dict: + def get_header_based_rate_limits(self) -> tuple[int, int]: """Retrieve rate limits from the LLM provider via LiteLLM. Returns: - dict: Contains 'max_requests_per_minute' and 'max_tokens_per_minute' + tuple[int, int]: Contains 'max_requests_per_minute' and 'max_tokens_per_minute' Note: - Makes a test request to get rate limit information from response headers. @@ -170,7 +177,7 @@ def get_header_based_rate_limits(self) -> dict: rpm = int(headers.get("x-ratelimit-limit-requests", 0)) tpm = int(headers.get("x-ratelimit-limit-tokens", 0)) - return {"max_requests_per_minute": rpm, "max_tokens_per_minute": tpm} + return rpm, tpm def create_api_specific_request(self, generic_request: GenericRequest) -> dict: """Convert a generic request into a LiteLLM-compatible format. diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index 202c214c..9dcf97d4 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -5,13 +5,14 @@ import logging import os from dataclasses import dataclass, field -from typing import Callable +from typing import Callable, Optional import litellm from openai import AsyncOpenAI, NotFoundError from openai.types import Batch from tqdm import tqdm +from bespokelabs.curator.file_utilities import count_lines from bespokelabs.curator.dataset import Dataset from bespokelabs.curator.llm.prompt_formatter import PromptFormatter from bespokelabs.curator.request_processor.base_request_processor import ( @@ -47,6 +48,8 @@ def __init__( url: str = "https://api.openai.com/v1/chat/completions", presence_penalty: float | None = None, frequency_penalty: float | None = None, + require_all_responses: bool = None, + max_retries: Optional[int] = None, ): if batch_size > MAX_REQUESTS_PER_BATCH: raise ValueError( @@ -54,7 +57,7 @@ def __init__( f"{MAX_REQUESTS_PER_BATCH:,} requests per batch that OpenAI supports. " f"Please set your batch_size to be less than or equal to {MAX_REQUESTS_PER_BATCH:,}." ) - super().__init__(batch_size) + super().__init__(batch_size, require_all_responses=require_all_responses) self.model = model self.url: str = url self.check_interval: int = batch_check_interval @@ -64,48 +67,10 @@ def __init__( self.frequency_penalty: float | None = frequency_penalty self.delete_successful_batch_files: bool = delete_successful_batch_files self.delete_failed_batch_files: bool = delete_failed_batch_files - - def get_rate_limits(self) -> dict: - """ - Function to get rate limits for a given annotator. Not available via response headers, so - the following is based on tier 5 limits on Nov 6th, 2024. - - These rate limits vary per model - and are determined by your organization's usage tier. View the following: - https://platform.openai.com/docs/guides/rate-limits/usage-tiers - https://platform.openai.com/settings/organization/limits - - Args: - model (str): The model for which to get the rate limits. - request_url (str): The request URL for which to get the rate limits. - - Returns: - dict: A dictionary containing max_tokens_per_day - """ - model_tpd = { - "gpt-3.5-turbo": 5_000_000_000, - "gpt-3.5-turbo-0125": 5_000_000_000, - "gpt-3.5-turbo-1106": 5_000_000_000, - "gpt-3.5-turbo-16k": 5_000_000_000, - "gpt-3.5-turbo-instruct": 200_000, - "gpt-3.5-turbo-instruct-0914": 200_000, - "gpt-4": 150_000_000, - "gpt-4-0613": 150_000_000, - "gpt-4-turbo": 300_000_000, - "gpt-4o": 10_000_000_000, - "gpt-4o-mini": 15_000_000_000, - } - - if self.model not in model_tpd: - tpd = 1_000_000_000 + if max_retries is None: + self.max_retries = MAX_RETRIES_PER_OPERATION else: - tpd = model_tpd[self.model] - - logger.info(f"Automatically set max_tokens_per_day to {tpd}, model: {self.model} ") - - rate_limits = {"max_tokens_per_day": tpd} - - return rate_limits + self.max_retries = max_retries def create_api_specific_request(self, generic_request: GenericRequest) -> dict: """ @@ -335,6 +300,7 @@ def run( prompt_formatter, delete_successful_batch_files=self.delete_successful_batch_files, delete_failed_batch_files=self.delete_failed_batch_files, + max_retries=self.max_retries, ) run_in_event_loop(self.run_batch_operations(batch_manager, request_files)) @@ -353,6 +319,7 @@ def cancel_batches(self, working_dir: str) -> Dataset: self.check_interval, delete_successful_batch_files=self.delete_successful_batch_files, delete_failed_batch_files=self.delete_failed_batch_files, + max_retries=self.max_retries, ) run_in_event_loop(batch_manager.cancel_batches()) @@ -510,6 +477,7 @@ def __init__( prompt_formatter: PromptFormatter | None = None, delete_successful_batch_files: bool = False, delete_failed_batch_files: bool = False, + max_retries: Optional[int] = None, ) -> None: """Initialize BatchManager to handle OpenAI batch processing operations. @@ -523,7 +491,7 @@ def __init__( delete_failed_batch_files (bool): Whether to delete input/error files from OpenAI after batch failure. """ - self.client = AsyncOpenAI(max_retries=MAX_RETRIES_PER_OPERATION) + self.client = AsyncOpenAI(max_retries=max_retries) self.check_interval = check_interval self.working_dir = working_dir self.tracker = BatchStatusTracker() @@ -775,8 +743,7 @@ async def track_already_submitted_batches(self): # Edge case where the batch is still validating, and we need to know the total number of requests if batch_object.status == "validating": - n_requests = len(open(request_file_name, "r").readlines()) - batch_object.request_counts.total = n_requests + batch_object.request_counts.total = count_lines(request_file_name) else: n_requests = batch_object.request_counts.total diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 19a76ee1..5efdb6d6 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -81,6 +81,8 @@ def __init__( frequency_penalty: Optional[float] = None, max_requests_per_minute: Optional[int] = None, max_tokens_per_minute: Optional[int] = None, + require_all_responses: bool = None, + max_retries: Optional[int] = None, ): super().__init__( model=model, @@ -90,16 +92,21 @@ def __init__( frequency_penalty=frequency_penalty, max_requests_per_minute=max_requests_per_minute, max_tokens_per_minute=max_tokens_per_minute, + require_all_responses=require_all_responses, + max_retries=max_retries, ) self.url = url self.api_key = api_key self.token_encoding = tiktoken.get_encoding(get_token_encoding_name(model)) + self.header_based_max_requests_per_minute, self.header_based_max_tokens_per_minute = ( + self.get_header_based_rate_limits() + ) - def get_header_based_rate_limits(self) -> dict: + def get_header_based_rate_limits(self) -> tuple[int, int]: """Get rate limits from OpenAI API headers. Returns: - dict: Contains 'max_requests_per_minute' and 'max_tokens_per_minute' + tuple[int, int]: Contains 'max_requests_per_minute' and 'max_tokens_per_minute' Note: - Makes a dummy request to get actual rate limits @@ -117,7 +124,7 @@ def get_header_based_rate_limits(self) -> dict: rpm = int(response.headers.get("x-ratelimit-limit-requests", 0)) tpm = int(response.headers.get("x-ratelimit-limit-tokens", 0)) - return {"max_requests_per_minute": rpm, "max_tokens_per_minute": tpm} + return rpm, tpm def estimate_output_tokens(self) -> int: """Estimate number of tokens in the response. diff --git a/tests/simple_online.py b/tests/simple_online.py index fd850592..4d5f90df 100644 --- a/tests/simple_online.py +++ b/tests/simple_online.py @@ -18,6 +18,8 @@ def main(args): model_name=args.model, max_requests_per_minute=args.max_requests_per_minute, max_tokens_per_minute=args.max_tokens_per_minute, + max_retries=args.max_retries, + require_all_responses=not args.partial_responses, ) dataset = prompter(dataset, batch_cancel=args.cancel) @@ -41,5 +43,12 @@ def main(args): parser.add_argument( "--max-tokens-per-minute", type=int, help="Max tokens per minute", default=None ) + parser.add_argument("--max-retries", type=int, help="Max retries", default=None) + parser.add_argument( + "--partial-responses", + action="store_true", + default=False, + help="Require all responses", + ) args = parser.parse_args() main(args)