Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions ffsubsync/ffsubsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ffsubsync.speech_transformers import (
VideoSpeechTransformer,
DeserializeSpeechTransformer,
PGSSpeechTransformer,
make_subtitle_speech_pipeline,
)
from ffsubsync.subtitle_parser import make_subtitle_parser
Expand Down Expand Up @@ -149,8 +150,10 @@ def try_sync(
continue
else:
srt_pipe.fit(srtin)
if not skip_infer_framerate_ratio and hasattr(
reference_pipe[-1], "num_frames"
if (
not skip_infer_framerate_ratio
and hasattr(reference_pipe[-1], "num_frames")
and reference_pipe[-1].num_frames is not None
):
inferred_framerate_ratio_from_length = (
float(reference_pipe[-1].num_frames)
Expand Down Expand Up @@ -220,6 +223,26 @@ def try_sync(


def make_reference_pipe(args: argparse.Namespace) -> Pipeline:
pgs_stream = getattr(args, "pgs_ref_stream", None)
if pgs_stream is not None:
# "auto" (bare --pgs-ref-stream flag) → let PGSSpeechTransformer auto-detect
resolved_stream: Optional[str] = None if pgs_stream == "auto" else pgs_stream
if resolved_stream is not None and not resolved_stream.startswith("0:"):
resolved_stream = "0:" + resolved_stream
return Pipeline(
[
(
"speech_extract",
PGSSpeechTransformer(
sample_rate=SAMPLE_RATE,
start_seconds=args.start_seconds,
ffmpeg_path=args.ffmpeg_path,
ref_stream=resolved_stream,
gui_mode=args.gui_mode,
),
),
]
)
ref_format = _ref_format(args.reference)
if ref_format in SUBTITLE_EXTENSIONS:
if args.vad is not None:
Expand Down Expand Up @@ -451,7 +474,7 @@ def _run_impl(args: argparse.Namespace, result: Dict[str, Any]) -> bool:


def validate_and_transform_args(
parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace]
parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace],
) -> Optional[argparse.Namespace]:
if isinstance(parser_or_args, argparse.Namespace):
parser = None
Expand Down Expand Up @@ -484,7 +507,7 @@ def validate_and_transform_args(


def run(
parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace]
parser_or_args: Union[argparse.ArgumentParser, argparse.Namespace],
) -> Dict[str, Any]:
sync_was_successful = False
result = {
Expand Down Expand Up @@ -556,6 +579,21 @@ def add_main_args_for_cli(parser: argparse.ArgumentParser) -> None:
"Example: `ffs ref.mkv -i in.srt -o out.srt --reference-stream s:2`"
),
)
parser.add_argument(
"--pgs-ref-stream",
"--pgsstream",
nargs="?",
const="auto",
default=None,
help=(
"Use a PGS (Presentation Graphic Stream) image-based subtitle track from "
"the reference MKV as the sync reference instead of audio VAD. "
"Optionally specify the stream (leading `0:` is optional, e.g. `s:0` or `3`). "
"Omit the value to auto-detect the first hdmv_pgs_subtitle track. "
"Example: `ffs ref.mkv -i in.srt -o out.srt --pgs-ref-stream` (auto) "
"or `ffs ref.mkv -i in.srt -o out.srt --pgs-ref-stream s:2` (explicit)."
),
)


def add_cli_only_args(parser: argparse.ArgumentParser) -> None:
Expand Down
191 changes: 190 additions & 1 deletion ffsubsync/speech_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import subprocess
import sys
from datetime import timedelta
from typing import cast, Callable, Dict, List, Optional, Union
from typing import cast, Callable, Dict, List, Optional, Tuple, Union

import ffmpeg
import numpy as np
Expand Down Expand Up @@ -531,3 +531,192 @@ def fit(self, fname, *_) -> "DeserializeSpeechTransformer":
def transform(self, *_) -> np.ndarray:
assert self.deserialized_speech_results_ is not None
return self.deserialized_speech_results_


def find_pgs_stream(
fname: str,
ffmpeg_path: Optional[str] = None,
gui_mode: bool = False,
) -> Optional[str]:
"""Return the ffmpeg stream specifier for the first PGS subtitle track in *fname*.

Uses ``ffprobe`` to inspect the file. Returns a string like ``"0:s:0"`` on
success, or ``None`` if the file has no ``hdmv_pgs_subtitle`` streams.
"""
try:
probe = ffmpeg.probe(
fname,
cmd=ffmpeg_bin_path("ffprobe", gui_mode, ffmpeg_resources_path=ffmpeg_path),
)
except Exception as e:
logger.warning("ffprobe failed while searching for PGS streams: %s", e)
return None

sub_index = 0
for stream in probe.get("streams", []):
if stream.get("codec_type") == "subtitle":
if stream.get("codec_name") == "hdmv_pgs_subtitle":
specifier = "0:s:{}".format(sub_index)
logger.info(
"auto-detected PGS stream: %s (ffmpeg stream index %s)",
specifier,
stream.get("index"),
)
return specifier
sub_index += 1

return None


def _get_pgs_timings_via_ffprobe(
fname: str,
stream: str,
ffmpeg_path: Optional[str] = None,
gui_mode: bool = False,
) -> Optional[List[Tuple[float, float]]]:
"""Read PGS timings from container metadata using ffprobe.

MKV stores per-packet PTS and duration for subtitle streams, so we can
get start/end timestamps without extracting or parsing the raw SUP binary.
Show events are large packets with a numeric ``duration_time``; clear events
are tiny (~30-byte) packets with ``duration_time=N/A``.

Returns a list of ``(start_seconds, end_seconds)`` tuples, or ``None`` if
ffprobe fails or returns no usable durations.
"""
ffprobe_cmd = ffmpeg_bin_path(
"ffprobe", gui_mode, ffmpeg_resources_path=ffmpeg_path
)
# ffprobe -select_streams does not accept the "0:" input-index prefix;
# strip it so "0:s:0" → "s:0" and "0:3" → "3".
probe_stream = stream[2:] if stream.startswith("0:") else stream
try:
probe_data = ffmpeg.probe(
fname,
cmd=ffprobe_cmd,
show_packets=None,
select_streams=probe_stream,
show_entries="packet=pts_time,duration_time,size",
)
except Exception:
return None

results: List[Tuple[float, float]] = []
for packet in probe_data.get("packets", []):
pts_time_str = packet.get("pts_time")
duration_time_str = packet.get("duration_time")
size_str = packet.get("size")
if pts_time_str is None or duration_time_str is None or size_str is None:
continue
if duration_time_str == "N/A":
continue
try:
pts_time = float(pts_time_str)
duration_time = float(duration_time_str)
size = int(size_str)
except ValueError:
continue
if size > 50: # skip clear events (~30 bytes)
results.append((pts_time, pts_time + duration_time))

if not results:
return None
return results


class PGSSpeechTransformer(TransformerMixin, ComputeSpeechFrameBoundariesMixin):
"""Use PGS (Presentation Graphic Stream) subtitle timings as a sync reference.

PGS subtitles are bitmap-based (e.g. Blu-ray) and cannot be converted to
text by ffmpeg. This transformer extracts the raw SUP stream from the
video file, parses the on-screen / off-screen timestamps from the binary
Presentation Composition Segments, and builds the same kind of sparse
binary signal that :class:`SubtitleSpeechTransformer` produces for text
subtitles. The resulting signal can then be aligned against the input
subtitle file in the normal ffsubsync pipeline.
"""

# PGS is already in the MKV timebase so its duration cannot be compared
# against the SRT to infer a framerate ratio. Returning None here prevents
# the duration-based framerate inference in try_sync from running.
@property
def num_frames(self) -> None:
return None

def __init__(
self,
sample_rate: int,
start_seconds: int = 0,
ffmpeg_path: Optional[str] = None,
ref_stream: Optional[str] = None,
gui_mode: bool = False,
) -> None:
super(PGSSpeechTransformer, self).__init__()
self.sample_rate: int = sample_rate
self.start_seconds: int = start_seconds
self.ffmpeg_path: Optional[str] = ffmpeg_path
self.ref_stream: Optional[str] = ref_stream
self.gui_mode: bool = gui_mode
self.pgs_speech_results_: Optional[np.ndarray] = None

def fit(self, fname: str, *_) -> "PGSSpeechTransformer":
if self.ref_stream is None:
stream = find_pgs_stream(fname, self.ffmpeg_path, self.gui_mode)
if stream is None:
raise ValueError(
"No hdmv_pgs_subtitle stream found in {}. "
"Specify one explicitly with --pgs-ref-stream.".format(fname)
)
else:
stream = self.ref_stream
if not stream.startswith("0:"):
stream = "0:" + stream

logger.info("reading PGS timings for stream %s from %s...", stream, fname)
timings = _get_pgs_timings_via_ffprobe(
fname, stream, self.ffmpeg_path, self.gui_mode
)
if timings is None:
raise ValueError(
"Failed to get PGS timings via ffprobe for stream {} from {}. "
"Make sure the stream exists and is an hdmv_pgs_subtitle track "
"(check with: ffprobe -show_streams {}).".format(stream, fname, fname)
)

if not timings:
raise ValueError(
"No subtitle timings found in PGS stream {}.".format(stream)
)

logger.info("found %d PGS subtitle segments", len(timings))
for i, (s, e) in enumerate(timings[:8]):
logger.debug(
" PGS[%d]: %s --> %s (%.3fs)",
i,
str(timedelta(seconds=s)),
str(timedelta(seconds=e)),
e - s,
)

max_time = max(end for _, end in timings)
num_samples = int(max_time * self.sample_rate) + 2
samples = np.zeros(num_samples, dtype=float)

for start, end in timings:
start_sample = int(round((start - self.start_seconds) * self.sample_rate))
end_sample = int(round((end - self.start_seconds) * self.sample_rate))
start_sample = max(start_sample, 0)
end_sample = min(end_sample, num_samples)
if start_sample < end_sample:
samples[start_sample:end_sample] = 1.0

self.pgs_speech_results_ = samples
self.fit_boundaries(self.pgs_speech_results_)
logger.info(
"total PGS subtitle frames: %d", int(np.sum(self.pgs_speech_results_))
)
return self

def transform(self, *_) -> np.ndarray:
assert self.pgs_speech_results_ is not None
return self.pgs_speech_results_
Loading