Skip to content

Commit

Permalink
Audio passthrough
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh committed Jan 24, 2025
1 parent cd56616 commit 93a3427
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 12 deletions.
7 changes: 6 additions & 1 deletion runner/app/live/streamer/protocol/trickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ def dequeue_jpeg():
jpeg_bytes = self.subscribe_queue.get()
if not jpeg_bytes:
return None
image = jpeg_bytes['image'];
if not image:
return None
try:
return from_jpeg_bytes(jpeg_bytes)
if image.mode != "RGBA":
image = image.convert("RGBA")
return image
except Exception as e:
logging.error(f"Error decoding JPEG: {e}")
raise e
Expand Down
199 changes: 199 additions & 0 deletions runner/app/live/trickle/decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import sys
import av
from PIL import Image

def decode_av(pipe_input, frame_callback, container_format=None):
"""
Reads from a pipe (or file-like object). If both audio and video
streams exist, for each decoded video frame, we gather all audio frames
whose PTS is <= the video frame's PTS, then call `frame_callback`.
Cases handled:
- No audio (video only).
- No video (audio only).
- Both audio and video.
:param pipe_input: File path, 'pipe:', sys.stdin, or another file-like object.
:param frame_callback: A function that accepts a dictionary, e.g.:
{
'video_pts': int or None,
'video_time_sec': float or None,
'image': PIL.Image or None,
'audio_frames': list of PyAV AudioFrame,
'audio_pts_list': list of int,
'metadata': {
'width': int,
'height': int,
'pict_type': str,
...
},
'audio_metadata': dict or None # e.g., sample_rate, channels, layout
}
:param container_format: Optional format hint for PyAV (e.g., 'mov', 'mp4', etc.).
"""
container = av.open(pipe_input, format=container_format)

# Locate the first video and first audio stream (if they exist)
video_stream = None
audio_stream = None
for s in container.streams:
if s.type == 'video' and video_stream is None:
video_stream = s
elif s.type == 'audio' and audio_stream is None:
audio_stream = s

# Prepare a list of streams to demux
streams_to_demux = []
if video_stream is not None:
streams_to_demux.append(video_stream)
if audio_stream is not None:
streams_to_demux.append(audio_stream)

if not streams_to_demux:
print("No audio or video streams found in the input.")
container.close()
return

# Prepare audio-related metadata (if audio is present)
audio_metadata = None
if audio_stream is not None:
audio_metadata = {
"codec": audio_stream.codec_context.name,
"sample_rate": audio_stream.codec_context.sample_rate,
"format": audio_stream.codec_context.format,
"channels": audio_stream.codec_context.channels,
"layout": str(audio_stream.layout),
"time_base": str(audio_stream.time_base),
"bit_rate": audio_stream.codec_context.bit_rate,
}

print(f"Audio metadata: {audio_metadata}")

# We'll store decoded audio frames in a buffer if both audio and video exist.
# If there's no video, we have the option to either:
# (A) call the callback for each audio frame, or
# (B) accumulate them and do something else.
# Here, we'll do (A) for the audio-only case, and the original logic if video also exists.
audio_buffer = []

# Helper function to create a "result entry" for calling the callback
def create_result_entry(
video_pts=None,
video_time=None,
pil_img=None,
matched_audio_frames=None,
matched_audio_pts=None
):
return {
"video_pts": video_pts,
"video_time_sec": video_time,
"image": pil_img, # None if no video
"audio_frames": matched_audio_frames if matched_audio_frames else [],
"audio_pts_list": matched_audio_pts if matched_audio_pts else [],
"metadata": {
# If we have a video frame, store width, height, etc.
"width": pil_img.width if pil_img else None,
"height": pil_img.height if pil_img else None,
"pict_type": str(pil_img.info.get("pict_type")) if pil_img else None,
},
"audio_metadata": audio_metadata,
}

try:
for packet in container.demux(streams_to_demux):
if packet.dts is None:
continue

if audio_stream and packet.stream == audio_stream:
# Decode audio frames
for aframe in packet.decode():
if aframe.pts is None:
continue

if video_stream:
# If we also have video, buffer the audio frames
audio_buffer.append((aframe, aframe.pts))
else:
# If there's no video, we can call the callback immediately
# for each audio frame (audio-only use case).
# We set video_pts, image, etc. to None.
result_entry = create_result_entry(
video_pts=None,
video_time=None,
pil_img=None,
matched_audio_frames=[aframe],
matched_audio_pts=[aframe.pts],
)
frame_callback(result_entry)

elif video_stream and packet.stream == video_stream:
# Decode video frames
for frame in packet.decode():
if frame.pts is None:
continue

video_pts = frame.pts
video_time = float(video_pts * video_stream.time_base)
pil_img = frame.to_image()

# If there's no audio stream, we can just call the callback with empty audio
if not audio_stream:
result_entry = create_result_entry(
video_pts=video_pts,
video_time=video_time,
pil_img=pil_img
)
frame_callback(result_entry)
continue

# Otherwise, gather audio frames up to this video_pts
matched_audio_frames = []
leftover_audio_buffer = []
for (aframe, apts) in audio_buffer:
if apts <= video_pts:
matched_audio_frames.append((aframe, apts))
else:
leftover_audio_buffer.append((aframe, apts))

# Remove matched frames from the buffer
audio_buffer = leftover_audio_buffer

# Build the callback entry
result_entry = create_result_entry(
video_pts=video_pts,
video_time=video_time,
pil_img=pil_img,
matched_audio_frames=[af[0] for af in matched_audio_frames],
matched_audio_pts=[af[1] for af in matched_audio_frames],
)
frame_callback(result_entry)

# Optionally handle leftover audio frames if both audio and video exist
# and you need to associate leftover audio with the final video frame, etc.

except KeyboardInterrupt:
print("Received Ctrl-C: stopping decode gracefully...")

finally:
container.close()

# ------------------------------------------------------------------------------
# Example usage:
def example_callback(result_entry):
# This callback is invoked once per video frame (when video exists),
# or once per audio frame (when no video exists).
v_pts = result_entry["video_pts"]
v_time = result_entry["video_time_sec"]
n_audio = len(result_entry["audio_frames"])

if v_pts is not None:
print(
f"Video frame PTS={v_pts} "
f"(time={v_time:.3f}s), matched {n_audio} audio frames."
)
else:
print(f"Audio-only frame, matched {n_audio} audio frames (PTS unknown for video).")

if __name__ == "__main__":
# Example: reading from stdin. E.g., `python script.py < inputfile`
decode_av(sys.stdin.buffer, example_callback, container_format="mov")
30 changes: 25 additions & 5 deletions runner/app/live/trickle/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .trickle_subscriber import TrickleSubscriber
from .trickle_publisher import TricklePublisher
from .jpeg_parser import JPEGStreamParser
from .decoder import decode_av
from . import segmenter

# target framerate
Expand All @@ -19,11 +20,10 @@
async def run_subscribe(subscribe_url: str, image_callback):
# TODO add some pre-processing parameters, eg image size
try:
ffmpeg = await launch_ffmpeg()
logging_task = asyncio.create_task(log_pipe_async(ffmpeg.stderr))
subscribe_task = asyncio.create_task(subscribe(subscribe_url, ffmpeg.stdin))
jpeg_task = asyncio.create_task(parse_jpegs(ffmpeg.stdout, image_callback))
await asyncio.gather(ffmpeg.wait(), logging_task, subscribe_task, jpeg_task)
read_fd, write_fd = os.pipe()
parse_task = asyncio.create_task(decode_in(read_fd, image_callback))
subscribe_task = asyncio.create_task(subscribe(subscribe_url, await AsyncifyFdWriter(write_fd)))
await asyncio.gather(subscribe_task, parse_task)
logging.info("run_subscribe complete")
except Exception as e:
logging.error(f"preprocess got error {e}", e)
Expand Down Expand Up @@ -119,6 +119,26 @@ async def parse_jpegs(in_pipe, image_callback):
break
await parser.feed(chunk)

async def AsyncifyFdWriter(write_fd):
loop = asyncio.get_event_loop()
write_protocol = asyncio.StreamReaderProtocol(asyncio.StreamReader(), loop=loop)
write_transport, _ = await loop.connect_write_pipe( lambda: write_protocol, os.fdopen(write_fd, 'wb'))
writer = asyncio.StreamWriter(write_transport, write_protocol, None, loop)
return writer

async def decode_in(in_pipe, frame_callback):
def decode_runner():
try:
decode_av(f"pipe:{in_pipe}", frame_callback)
except Exception as e:
logging.error(f"Decoding error {e}", exc_info=True)
finally:
os.close(in_pipe)
logging.info("Decoding finished")

loop = asyncio.get_running_loop()
await loop.run_in_executor(None, decode_runner)

def feed_ffmpeg(ffmpeg_fd, image_generator):
while True:
image = image_generator()
Expand Down
6 changes: 6 additions & 0 deletions runner/docker/Dockerfile.live-app-noop
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ RUN pyenv install $PYTHON_VERSION && \
# Upgrade pip and install required packages
ARG PIP_VERSION=23.3.2

ENV PKG_CONFIG_PATH=/usr/local/lib/pkgconfig

RUN ls /usr/local/lib && ls /usr/local/lib/pkgconfig && ls -lha /usr/local/bin && echo $PKG_CONFIG_PATH

# Install any additional Python packages
RUN pip install --no-cache-dir \
uvicorn==0.30.0 \
Expand All @@ -29,6 +33,8 @@ RUN pip install --no-cache-dir \
pyzmq==26.2.0 \
nvidia-ml-py==12.560.30 \
opencv-python==4.10.0.84 \
--no-binary=av \
av==14.0.1 \
psutil==6.0.0

# Set environment variables
Expand Down
10 changes: 5 additions & 5 deletions runner/docker/Dockerfile.live-base
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ENV PATH $PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH

RUN apt-get update && apt-get install -y --no-install-recommends \
autoconf automake build-essential cmake git-core libtool pkg-config wget \
nasm yasm zlib1g-dev libpng-dev libx264-dev && \
nasm yasm zlib1g-dev libpng-dev libx264-dev libopus-dev libfdk-aac-dev && \
rm -rf /var/lib/apt/lists/*

# Set up environment variables
Expand All @@ -42,21 +42,21 @@ RUN git clone --branch n7.0.2 --depth 1 https://github.com/FFmpeg/FFmpeg.git ffm
# Build FFmpeg with static linking and the desired hardware-accelerated features
RUN cd ffmpeg && \
./configure --prefix=/compiled \
--pkg-config-flags="--static" \
--extra-cflags="-I/ffmpeg_build/include -I/usr/local/cuda/include" \
--extra-ldflags="-L/ffmpeg_build/lib -L/usr/local/cuda/lib64" \
--extra-libs="-lpthread -lm" \
--enable-nonfree \
--enable-cuda-nvcc \
--enable-cuda \
--enable-libnpp \
--enable-cuvid \
--enable-nvenc \
--enable-nvdec \
--enable-static \
--enable-gpl \
--enable-shared \
--disable-static \
--enable-libx264 \
--disable-shared \
--enable-libopus \
--enable-libfdk-aac \
--disable-debug \
--disable-doc && \
make -j$(nproc) && \
Expand Down
3 changes: 2 additions & 1 deletion runner/requirements.live-ai.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ deepcache==0.1.1
safetensors==0.4.3
scipy==1.13.0
numpy==1.26.4
av==12.1.0
--no-binary=av
av==14.0.1
sentencepiece== 0.2.0
bitsandbytes==0.43.3
psutil==6.0.0
Expand Down

0 comments on commit 93a3427

Please sign in to comment.