-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat(llm): Add AudioParser using OpenAI Whisper #160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,9 @@ | |
| import inspect | ||
| import io | ||
| import logging | ||
| import os | ||
| import re | ||
| import tempfile | ||
| import warnings | ||
| from abc import ABC, abstractmethod | ||
| from collections import defaultdict | ||
|
|
@@ -1305,3 +1307,65 @@ def metadata(page_number: int) -> dict: | |
| docs = [(concatenated_text, {"page_number": 0})] | ||
|
|
||
| return docs | ||
|
|
||
|
|
||
| class AudioParser(pw.UDF): | ||
| """ | ||
| Parse audio using OpenAI's Whisper API. | ||
|
|
||
| Args: | ||
| model: Whisper model to use (default: "whisper-1"). | ||
| api_key: OpenAI API key. | ||
| base_url: OpenAI Base URL. | ||
| capacity: Maximum number of concurrent operations. | ||
| retry_strategy: Strategy for retries. | ||
| cache_strategy: Caching strategy. | ||
| **kwargs: Additional arguments for `audio.transcriptions.create`. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| model: str = "whisper-1", | ||
| api_key: str | None = None, | ||
| base_url: str | None = None, | ||
| capacity: int | None = None, | ||
| retry_strategy: udfs.AsyncRetryStrategy | None = None, | ||
| cache_strategy: udfs.CacheStrategy | None = None, | ||
| **kwargs, | ||
| ): | ||
| with optional_imports("xpack-llm"): | ||
| import openai | ||
|
|
||
| executor = _prepare_executor( | ||
| capacity=capacity, retry_strategy=retry_strategy | ||
| ) | ||
| super().__init__(executor=executor, cache_strategy=cache_strategy) | ||
|
|
||
| self.client = openai.AsyncOpenAI( | ||
| api_key=api_key, base_url=base_url, max_retries=0 | ||
| ) | ||
| self.model = model | ||
| self.kwargs = kwargs | ||
|
|
||
| async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]: | ||
| # openai.audio.transcriptions.create requires a file-like object with a name | ||
| # or a path. We use a temporary file. | ||
| # We default to .mp3 extension as it's widely supported/compressed. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would propose to have a As far as I can see from the docs, it would look like
in the constructor arguments. |
||
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file: | ||
| tmp_file.write(contents) | ||
| tmp_path = tmp_file.name | ||
|
|
||
| try: | ||
| with open(tmp_path, "rb") as audio_file: | ||
|
Comment on lines
+1354
to
+1359
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be more efficient to have only one context manager: the one that creates tmp_file.write(contents) # save the contents as before
tmp_file.flush() # complete the write
tmp_file.seek(0) # start reading the file from scratch
# work with tmp_file as if it were opened for readingOther than simplification, it would help us to avoid reopening the file, so it's a little bit more efficient too. |
||
| transcript = await self.client.audio.transcriptions.create( | ||
| model=self.model, | ||
| file=audio_file, | ||
| **self.kwargs | ||
| ) | ||
| # The response type depends on response_format, but default is object with .text | ||
| text = getattr(transcript, "text", str(transcript)) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to pin the response format so that there is no need to have fallback logic in parsing? |
||
| return [(text, {})] | ||
| finally: | ||
| if os.path.exists(tmp_path): | ||
| os.remove(tmp_path) | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please run |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This invocation wouldn't work, since there's a required positional argument
async_mode, which is currently unspecified. You can accept it in the constructor too, similarly to how other classes in this file do.