Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions python/pathway/xpacks/llm/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import inspect
import io
import logging
import os
import re
import tempfile
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
Expand Down Expand Up @@ -1305,3 +1307,65 @@ def metadata(page_number: int) -> dict:
docs = [(concatenated_text, {"page_number": 0})]

return docs


class AudioParser(pw.UDF):
"""
Parse audio using OpenAI's Whisper API.

Args:
model: Whisper model to use (default: "whisper-1").
api_key: OpenAI API key.
base_url: OpenAI Base URL.
capacity: Maximum number of concurrent operations.
retry_strategy: Strategy for retries.
cache_strategy: Caching strategy.
**kwargs: Additional arguments for `audio.transcriptions.create`.
"""

def __init__(
self,
model: str = "whisper-1",
api_key: str | None = None,
base_url: str | None = None,
capacity: int | None = None,
retry_strategy: udfs.AsyncRetryStrategy | None = None,
cache_strategy: udfs.CacheStrategy | None = None,
**kwargs,
):
with optional_imports("xpack-llm"):
import openai

executor = _prepare_executor(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invocation wouldn't work, since there's a required positional argument async_mode, which is currently unspecified. You can accept it in the constructor too, similarly to how other classes in this file do.

capacity=capacity, retry_strategy=retry_strategy
)
super().__init__(executor=executor, cache_strategy=cache_strategy)

self.client = openai.AsyncOpenAI(
api_key=api_key, base_url=base_url, max_retries=0
)
self.model = model
self.kwargs = kwargs

async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
# openai.audio.transcriptions.create requires a file-like object with a name
# or a path. We use a temporary file.
# We default to .mp3 extension as it's widely supported/compressed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose to have a Literal for the supported audio formats and add it to the parameters of the class constructor to work with the formats other than mp3 gracefully.

As far as I can see from the docs, it would look like

audio_format: Literal["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"] = "mp3"

in the constructor arguments.

with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file:
tmp_file.write(contents)
tmp_path = tmp_file.name

try:
with open(tmp_path, "rb") as audio_file:
Comment on lines +1354 to +1359
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more efficient to have only one context manager: the one that creates tempfile.NamedTemporaryFile without delete=False. Then, one could perform all the work within it, without having to manually delete the file. It would have looked like this:

    tmp_file.write(contents)  # save the contents as before
    tmp_file.flush()  # complete the write
    tmp_file.seek(0)  # start reading the file from scratch

    # work with tmp_file as if it were opened for reading

Other than simplification, it would help us to avoid reopening the file, so it's a little bit more efficient too.

transcript = await self.client.audio.transcriptions.create(
model=self.model,
file=audio_file,
**self.kwargs
)
# The response type depends on response_format, but default is object with .text
text = getattr(transcript, "text", str(transcript))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to pin the response format so that there is no need to have fallback logic in parsing?

return [(text, {})]
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run flake8, mypy, and black linters on the code, and fix any issues introduced by your change.
In particular, flake8 reports an extra newline at the end of the file.