Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vLLM example for OpenAIOnlineParallelProcessor #78

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions examples/vllm_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from bespokelabs import curator
from datasets import load_dataset
import logging

dataset = load_dataset("allenai/WildChat", split="train")
dataset = dataset.select(range(10_000))

# To see more detail about how batches are being processed
logger = logging.getLogger("bespokelabs.curator")
logger.setLevel(logging.INFO)

def prompt_func(row):
return row["conversation"][0]["content"]

def parse_func(row, response):
instruction = row["conversation"][0]["content"]
return {"instruction": instruction, "new_response": response}


distill_prompter = curator.Prompter(
prompt_func=prompt_func,
parse_func=parse_func,
model_name="nvidia/Llama-3.1-Nemotron-70B-Instruct-HF",
api_base="http://192.222.54.64:8000/"
)

distilled_dataset = distill_prompter(dataset)
print(distilled_dataset)
print(distilled_dataset[0])
14 changes: 12 additions & 2 deletions src/bespokelabs/curator/prompter/prompter.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def __init__(
response_format: Optional[Type[BaseModel]] = None,
batch: bool = False,
batch_size: Optional[int] = None,
api_base: Optional[str] = "https://api.openai.com",
max_requests_per_minute: Optional[int] = None,
max_tokens_per_minute: Optional[int] = None,
):
"""Initialize a Prompter.

Expand Down Expand Up @@ -80,18 +83,25 @@ def __init__(
self.prompt_formatter = PromptFormatter(
model_name, prompt_func, parse_func, response_format
)

if api_base is not None:
api_base = api_base.rstrip("/")
url = api_base + "/v1/chat/completions"
else:
url = "https://api.openai.com/v1/chat/completions"

self.batch_mode = batch
if batch:
self._request_processor = OpenAIBatchRequestProcessor(
model=model_name, batch_size=batch_size
model=model_name, batch_size=batch_size, url=url, max_requests_per_minute=max_requests_per_minute, max_tokens_per_minute=max_tokens_per_minute
)
else:
if batch_size is not None:
logger.warning(
f"Prompter argument `batch_size` {batch_size} is ignored because `batch` is False"
)
self._request_processor = OpenAIOnlineRequestProcessor(
model=model_name
model=model_name, url=url, max_requests_per_minute=max_requests_per_minute, max_tokens_per_minute=max_tokens_per_minute
)

def __call__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(
self.model: str = model
self.url: str = url
self.api_key: str = api_key
self.max_requests_per_minute: Optional[int] = None
self.max_tokens_per_minute: Optional[int] = None

def get_rate_limits(self) -> dict:
"""
Expand All @@ -55,31 +57,49 @@ def get_rate_limits(self) -> dict:
Returns:
tuple[int, int]: A tuple containing the maximum number of requests and tokens per minute.
"""
# Send a dummy request to get rate limit information
response = requests.post(
self.url,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"model": self.model, "messages": []},
)

rpm = int(response.headers.get("x-ratelimit-limit-requests", 0))
tpm = int(response.headers.get("x-ratelimit-limit-tokens", 0))
rpm = None
tpm = None

if self.max_requests_per_minute is not None:
rpm = self.max_requests_per_minute
logger.info(f"Manually set max_requests_per_minute to {rpm}")

if self.max_tokens_per_minute is not None:
tpm = self.max_tokens_per_minute
logger.info(f"Manually set max_tokens_per_minute to {tpm}")

if tpm is None or rpm is None:
if self.url.startswith("https://api.openai.com"):
# Send a dummy request to get rate limit information
response = requests.post(
self.url,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"model": self.model, "messages": []},
)

if not rpm or not tpm:
logger.warning(
"Failed to get rate limits from OpenAI API, using default values"
)
rpm = 30_000
tpm = 150_000_000
if tpm is None:
tpm = int(response.headers.get("x-ratelimit-limit-tokens", 0))
if tpm == 0:
tpm = 150_000_000
logger.warning(f"Failed to get x-ratelimit-limit-tokens from OpenAI API, using default values {tpm}")
else:
logger.info(f"Automatically set max_tokens_per_minute to {tpm}")

if rpm is None:
rpm = int(response.headers.get("x-ratelimit-limit-requests", 0))
if rpm == 0:
rpm = 30_000
logger.warning(f"Failed to get x-ratelimit-limit-requests from OpenAI API, using default values {rpm}")
else:
logger.info(f"Automatically set max_requests_per_minute to {rpm}")
else:
raise ValueError(f"max_requests_per_minute and max_tokens_per_minute must be set if url is not https://api.openai.com")

logger.info(f"Automatically set max_requests_per_minute to {rpm}")
logger.info(f"Automatically set max_tokens_per_minute to {tpm}")

rate_limits = {
"max_requests_per_minute": rpm,
"max_tokens_per_minute": tpm,
}

return rate_limits

def create_api_specific_request(
Expand Down Expand Up @@ -579,50 +599,6 @@ def get_token_encoding_name(model: str) -> str:
)
return "cl100k_base"


def get_rate_limits(
model: str, request_url: str, api_key: str
) -> Tuple[int, int]:
"""
Function to get rate limits for a given annotator. Makes a single request to openAI API
and gets the rate limits from the response headers. These rate limits vary per model
and are determined by your organization's usage tier. View the following:
https://platform.openai.com/docs/guides/rate-limits/usage-tiers
https://platform.openai.com/settings/organization/limits

Args:
model (str): The model for which to get the rate limits.
request_url (str): The request URL for which to get the rate limits.

Returns:
Tuple[int, int]: The maximum number of requests and tokens per minute.
"""
if "api.openai.com" in request_url:
# Send a dummy request to get rate limit information
response = requests.post(
request_url,
headers={"Authorization": f"Bearer {api_key}"},
json={"model": model, "messages": []},
)
# Extract rate limit information from headers
max_requests = int(
response.headers.get("x-ratelimit-limit-requests", 30_000)
)
max_tokens = int(
response.headers.get("x-ratelimit-limit-tokens", 150_000_000)
)
elif "api.sambanova.ai" in request_url:
# Send a dummy request to get rate limit information
max_requests = 50
max_tokens = 100_000_000
else:
raise NotImplementedError(
f'Rate limits for API endpoint "{request_url}" not implemented'
)

return max_requests, max_tokens


def get_api_key(request_url: str) -> str:
"""Get the API key for a given request URL."""
if "api.openai.com" in request_url:
Expand All @@ -634,7 +610,6 @@ def get_api_key(request_url: str) -> str:
f'Default API key environment variable for API endpoint "{request_url}" not implemented'
)


def api_endpoint_from_url(request_url: str) -> str:
"""Extract the API endpoint from the request URL.
This is used to determine the number of tokens consumed by the request.
Expand Down