diff --git a/ffsubsync/ffsubsync.py b/ffsubsync/ffsubsync.py index b0114cc..2d10c8d 100755 --- a/ffsubsync/ffsubsync.py +++ b/ffsubsync/ffsubsync.py @@ -30,6 +30,7 @@ from ffsubsync.speech_transformers import ( VideoSpeechTransformer, DeserializeSpeechTransformer, + PGSSpeechTransformer, make_subtitle_speech_pipeline, ) from ffsubsync.subtitle_parser import make_subtitle_parser @@ -149,8 +150,10 @@ def try_sync( continue else: srt_pipe.fit(srtin) - if not skip_infer_framerate_ratio and hasattr( - reference_pipe[-1], "num_frames" + if ( + not skip_infer_framerate_ratio + and hasattr(reference_pipe[-1], "num_frames") + and reference_pipe[-1].num_frames is not None ): inferred_framerate_ratio_from_length = ( float(reference_pipe[-1].num_frames) @@ -220,6 +223,26 @@ def try_sync( def make_reference_pipe(args: argparse.Namespace) -> Pipeline: + pgs_stream = getattr(args, "pgs_ref_stream", None) + if pgs_stream is not None: + # "auto" (bare --pgs-ref-stream flag) → let PGSSpeechTransformer auto-detect + resolved_stream: Optional[str] = None if pgs_stream == "auto" else pgs_stream + if resolved_stream is not None and not resolved_stream.startswith("0:"): + resolved_stream = "0:" + resolved_stream + return Pipeline( + [ + ( + "speech_extract", + PGSSpeechTransformer( + sample_rate=SAMPLE_RATE, + start_seconds=args.start_seconds, + ffmpeg_path=args.ffmpeg_path, + ref_stream=resolved_stream, + gui_mode=args.gui_mode, + ), + ), + ] + ) ref_format = _ref_format(args.reference) if ref_format in SUBTITLE_EXTENSIONS: if args.vad is not None: @@ -451,7 +474,7 @@ def _run_impl(args: argparse.Namespace, result: Dict[str, Any]) -> bool: def validate_and_transform_args( - parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace] + parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace], ) -> Optional[argparse.Namespace]: if isinstance(parser_or_args, argparse.Namespace): parser = None @@ -484,7 +507,7 @@ def validate_and_transform_args( def run( - parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace] + parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace], ) -> Dict[str, Any]: sync_was_successful = False result = { @@ -556,6 +579,21 @@ def add_main_args_for_cli(parser: argparse.ArgumentParser) -> None: "Example: `ffs ref.mkv -i in.srt -o out.srt --reference-stream s:2`" ), ) + parser.add_argument( + "--pgs-ref-stream", + "--pgsstream", + nargs="?", + const="auto", + default=None, + help=( + "Use a PGS (Presentation Graphic Stream) image-based subtitle track from " + "the reference MKV as the sync reference instead of audio VAD. " + "Optionally specify the stream (leading `0:` is optional, e.g. `s:0` or `3`). " + "Omit the value to auto-detect the first hdmv_pgs_subtitle track. " + "Example: `ffs ref.mkv -i in.srt -o out.srt --pgs-ref-stream` (auto) " + "or `ffs ref.mkv -i in.srt -o out.srt --pgs-ref-stream s:2` (explicit)." + ), + ) def add_cli_only_args(parser: argparse.ArgumentParser) -> None: diff --git a/ffsubsync/speech_transformers.py b/ffsubsync/speech_transformers.py index 38883d1..c6236c5 100644 --- a/ffsubsync/speech_transformers.py +++ b/ffsubsync/speech_transformers.py @@ -6,7 +6,7 @@ import subprocess import sys from datetime import timedelta -from typing import cast, Callable, Dict, List, Optional, Union +from typing import cast, Callable, Dict, List, Optional, Tuple, Union import ffmpeg import numpy as np @@ -531,3 +531,192 @@ def fit(self, fname, *_) -> "DeserializeSpeechTransformer": def transform(self, *_) -> np.ndarray: assert self.deserialized_speech_results_ is not None return self.deserialized_speech_results_ + + +def find_pgs_stream( + fname: str, + ffmpeg_path: Optional[str] = None, + gui_mode: bool = False, +) -> Optional[str]: + """Return the ffmpeg stream specifier for the first PGS subtitle track in *fname*. + + Uses ``ffprobe`` to inspect the file. Returns a string like ``"0:s:0"`` on + success, or ``None`` if the file has no ``hdmv_pgs_subtitle`` streams. + """ + try: + probe = ffmpeg.probe( + fname, + cmd=ffmpeg_bin_path("ffprobe", gui_mode, ffmpeg_resources_path=ffmpeg_path), + ) + except Exception as e: + logger.warning("ffprobe failed while searching for PGS streams: %s", e) + return None + + sub_index = 0 + for stream in probe.get("streams", []): + if stream.get("codec_type") == "subtitle": + if stream.get("codec_name") == "hdmv_pgs_subtitle": + specifier = "0:s:{}".format(sub_index) + logger.info( + "auto-detected PGS stream: %s (ffmpeg stream index %s)", + specifier, + stream.get("index"), + ) + return specifier + sub_index += 1 + + return None + + +def _get_pgs_timings_via_ffprobe( + fname: str, + stream: str, + ffmpeg_path: Optional[str] = None, + gui_mode: bool = False, +) -> Optional[List[Tuple[float, float]]]: + """Read PGS timings from container metadata using ffprobe. + + MKV stores per-packet PTS and duration for subtitle streams, so we can + get start/end timestamps without extracting or parsing the raw SUP binary. + Show events are large packets with a numeric ``duration_time``; clear events + are tiny (~30-byte) packets with ``duration_time=N/A``. + + Returns a list of ``(start_seconds, end_seconds)`` tuples, or ``None`` if + ffprobe fails or returns no usable durations. + """ + ffprobe_cmd = ffmpeg_bin_path( + "ffprobe", gui_mode, ffmpeg_resources_path=ffmpeg_path + ) + # ffprobe -select_streams does not accept the "0:" input-index prefix; + # strip it so "0:s:0" → "s:0" and "0:3" → "3". + probe_stream = stream[2:] if stream.startswith("0:") else stream + try: + probe_data = ffmpeg.probe( + fname, + cmd=ffprobe_cmd, + show_packets=None, + select_streams=probe_stream, + show_entries="packet=pts_time,duration_time,size", + ) + except Exception: + return None + + results: List[Tuple[float, float]] = [] + for packet in probe_data.get("packets", []): + pts_time_str = packet.get("pts_time") + duration_time_str = packet.get("duration_time") + size_str = packet.get("size") + if pts_time_str is None or duration_time_str is None or size_str is None: + continue + if duration_time_str == "N/A": + continue + try: + pts_time = float(pts_time_str) + duration_time = float(duration_time_str) + size = int(size_str) + except ValueError: + continue + if size > 50: # skip clear events (~30 bytes) + results.append((pts_time, pts_time + duration_time)) + + if not results: + return None + return results + + +class PGSSpeechTransformer(TransformerMixin, ComputeSpeechFrameBoundariesMixin): + """Use PGS (Presentation Graphic Stream) subtitle timings as a sync reference. + + PGS subtitles are bitmap-based (e.g. Blu-ray) and cannot be converted to + text by ffmpeg. This transformer extracts the raw SUP stream from the + video file, parses the on-screen / off-screen timestamps from the binary + Presentation Composition Segments, and builds the same kind of sparse + binary signal that :class:`SubtitleSpeechTransformer` produces for text + subtitles. The resulting signal can then be aligned against the input + subtitle file in the normal ffsubsync pipeline. + """ + + # PGS is already in the MKV timebase so its duration cannot be compared + # against the SRT to infer a framerate ratio. Returning None here prevents + # the duration-based framerate inference in try_sync from running. + @property + def num_frames(self) -> None: + return None + + def __init__( + self, + sample_rate: int, + start_seconds: int = 0, + ffmpeg_path: Optional[str] = None, + ref_stream: Optional[str] = None, + gui_mode: bool = False, + ) -> None: + super(PGSSpeechTransformer, self).__init__() + self.sample_rate: int = sample_rate + self.start_seconds: int = start_seconds + self.ffmpeg_path: Optional[str] = ffmpeg_path + self.ref_stream: Optional[str] = ref_stream + self.gui_mode: bool = gui_mode + self.pgs_speech_results_: Optional[np.ndarray] = None + + def fit(self, fname: str, *_) -> "PGSSpeechTransformer": + if self.ref_stream is None: + stream = find_pgs_stream(fname, self.ffmpeg_path, self.gui_mode) + if stream is None: + raise ValueError( + "No hdmv_pgs_subtitle stream found in {}. " + "Specify one explicitly with --pgs-ref-stream.".format(fname) + ) + else: + stream = self.ref_stream + if not stream.startswith("0:"): + stream = "0:" + stream + + logger.info("reading PGS timings for stream %s from %s...", stream, fname) + timings = _get_pgs_timings_via_ffprobe( + fname, stream, self.ffmpeg_path, self.gui_mode + ) + if timings is None: + raise ValueError( + "Failed to get PGS timings via ffprobe for stream {} from {}. " + "Make sure the stream exists and is an hdmv_pgs_subtitle track " + "(check with: ffprobe -show_streams {}).".format(stream, fname, fname) + ) + + if not timings: + raise ValueError( + "No subtitle timings found in PGS stream {}.".format(stream) + ) + + logger.info("found %d PGS subtitle segments", len(timings)) + for i, (s, e) in enumerate(timings[:8]): + logger.debug( + " PGS[%d]: %s --> %s (%.3fs)", + i, + str(timedelta(seconds=s)), + str(timedelta(seconds=e)), + e - s, + ) + + max_time = max(end for _, end in timings) + num_samples = int(max_time * self.sample_rate) + 2 + samples = np.zeros(num_samples, dtype=float) + + for start, end in timings: + start_sample = int(round((start - self.start_seconds) * self.sample_rate)) + end_sample = int(round((end - self.start_seconds) * self.sample_rate)) + start_sample = max(start_sample, 0) + end_sample = min(end_sample, num_samples) + if start_sample < end_sample: + samples[start_sample:end_sample] = 1.0 + + self.pgs_speech_results_ = samples + self.fit_boundaries(self.pgs_speech_results_) + logger.info( + "total PGS subtitle frames: %d", int(np.sum(self.pgs_speech_results_)) + ) + return self + + def transform(self, *_) -> np.ndarray: + assert self.pgs_speech_results_ is not None + return self.pgs_speech_results_ diff --git a/tests/test_pgs.py b/tests/test_pgs.py new file mode 100644 index 0000000..ea48f2e --- /dev/null +++ b/tests/test_pgs.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +from unittest.mock import patch + +import pytest + +from ffsubsync.speech_transformers import _get_pgs_timings_via_ffprobe + + +def _make_packet(pts_time, duration_time, size): + return { + "pts_time": str(pts_time), + "duration_time": "N/A" if duration_time is None else str(duration_time), + "size": str(size), + } + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_basic(mock_probe, mock_bin): + mock_probe.return_value = { + "packets": [ + _make_packet(1.0, 2.5, 1000), + _make_packet(5.0, 1.0, 800), + ] + } + result = _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") + assert result == [(1.0, 3.5), (5.0, 6.0)] + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_strips_0_prefix_from_stream(mock_probe, mock_bin): + """'0:s:0' should be passed to ffprobe as 's:0'.""" + mock_probe.return_value = {"packets": [_make_packet(0.0, 1.0, 100)]} + _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") + _, kwargs = mock_probe.call_args + assert kwargs["select_streams"] == "s:0" + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_stream_without_prefix_unchanged(mock_probe, mock_bin): + mock_probe.return_value = {"packets": [_make_packet(0.0, 1.0, 100)]} + _get_pgs_timings_via_ffprobe("test.mkv", "s:1") + _, kwargs = mock_probe.call_args + assert kwargs["select_streams"] == "s:1" + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_skips_clear_events_small_size(mock_probe, mock_bin): + """Packets with size <= 50 are clear events and must be skipped.""" + mock_probe.return_value = { + "packets": [ + _make_packet(1.0, 2.0, 1000), # show event + _make_packet(3.0, 0.001, 30), # clear event, size <= 50 + ] + } + result = _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") + assert result == [(1.0, 3.0)] + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_skips_na_duration(mock_probe, mock_bin): + """Packets with duration_time=N/A must be skipped.""" + mock_probe.return_value = { + "packets": [ + _make_packet(1.0, None, 1000), # N/A duration + _make_packet(5.0, 2.0, 900), + ] + } + result = _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") + assert result == [(5.0, 7.0)] + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_returns_none_when_no_usable_packets(mock_probe, mock_bin): + """Returns None if all packets are filtered out.""" + mock_probe.return_value = { + "packets": [ + _make_packet(1.0, None, 1000), # N/A duration + _make_packet(2.0, 1.0, 20), # too small + ] + } + assert _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") is None + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_returns_none_on_empty_packets(mock_probe, mock_bin): + mock_probe.return_value = {"packets": []} + assert _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") is None + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_returns_none_when_ffprobe_raises(mock_probe, mock_bin): + mock_probe.side_effect = Exception("ffprobe not found") + assert _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") is None + + +@patch("ffsubsync.speech_transformers.ffmpeg_bin_path", return_value="ffprobe") +@patch("ffsubsync.speech_transformers.ffmpeg.probe") +def test_skips_packets_with_missing_fields(mock_probe, mock_bin): + """Packets missing any required field are silently skipped.""" + mock_probe.return_value = { + "packets": [ + {"pts_time": "1.0", "duration_time": "2.0"}, # missing size + {"pts_time": "3.0", "size": "500"}, # missing duration_time + {"duration_time": "1.0", "size": "500"}, # missing pts_time + _make_packet(10.0, 1.0, 200), # valid + ] + } + result = _get_pgs_timings_via_ffprobe("test.mkv", "0:s:0") + assert result == [(10.0, 11.0)]