diff --git a/lvsfunc/__init__.py b/lvsfunc/__init__.py index e2fb83c..474001d 100644 --- a/lvsfunc/__init__.py +++ b/lvsfunc/__init__.py @@ -18,5 +18,6 @@ from .grain import * from .hdcam import * from .misc import * +from .packets import * from .presets import * from .util import * diff --git a/lvsfunc/packets/__init__.py b/lvsfunc/packets/__init__.py new file mode 100644 index 0000000..3d1d851 --- /dev/null +++ b/lvsfunc/packets/__init__.py @@ -0,0 +1,3 @@ +# flake8: noqa + +from .info import * diff --git a/lvsfunc/packets/info.py b/lvsfunc/packets/info.py new file mode 100644 index 0000000..6005b7d --- /dev/null +++ b/lvsfunc/packets/info.py @@ -0,0 +1,221 @@ +import io +import json +import shutil +import subprocess as sp +import warnings +from functools import partial +from tempfile import NamedTemporaryFile +from typing import overload + +from stgpytools import DependencyNotFoundError +from vstools import (CustomValueError, FuncExceptT, Keyframes, SPath, + SPathLike, core, vs) + +from ..util import get_file_from_clip + +__all__ = [ + 'get_packet_sizes', + 'get_packet_scene_stats', +] + + +@overload +def get_packet_sizes( + clip: vs.VideoNode, + src_file: SPathLike | None = None, + out_file: SPathLike | None = None, + keyframes: Keyframes | None = None, + offset: int = 0, + return_packet_sizes: bool = False, + func_except: FuncExceptT | None = None +) -> vs.VideoNode: + ... + + +@overload +def get_packet_sizes( # type:ignore[misc] + clip: vs.VideoNode, + src_file: SPathLike | None = None, + out_file: SPathLike | None = None, + keyframes: Keyframes | None = None, + offset: int = 0, + return_packet_sizes: bool = True, + func_except: FuncExceptT | None = None +) -> list[int]: + ... + + +def get_packet_sizes( + clip: vs.VideoNode, + src_file: SPathLike | None = None, + out_file: SPathLike | None = None, + keyframes: Keyframes | None = None, + offset: int = 0, + return_packet_sizes: bool = False, + func_except: FuncExceptT | None = None +) -> vs.VideoNode | list[int]: + """ + A simple function to read and add frame packet sizes as frame props. + + "Packet sizes" are the size of individual frames. These can be used to calculate the average bitrate of a clip + or a scene, and to process certain frames differently depending on how much bitrate is allocated to specific + sections. + + If `out_file` is set, the results will be written to a file. This file will be read in subsequent calls to save + time. This is useful when you're working with a large clip and you don't want to call ffprobe every time you + refresh the preview. + + If a Keyframes object is passed, additional scene-based frame props will be added. These are the min, max, and + average packet sizes of a scene based on these Keyframes. + + If a non-zero `offset` is set, the function will trim the list of packet sizes to match. Negative values will + instead set the packet sizes for the first `offset` frames to -1. This is intended to be used with trimmed clips. + + Dependencies: + + * `ffprobe `_ + + :param clip: Clip to add the properties to. + :param src_file: The path to the original file that was indexed. + If None, tries to read the `idx_filepath` property from `clip`. + Will throw an error if it can't find either. + This parameter is ignored if `out_file` is set and a file can be read. + :param out_file: Output file for packet sizes. If set, the results will be written to that file, + and also read from that file in subsequent calls. This saves us from having to + call ffprobe every time you refresh the preview. + :param keyframes: A Keyframes object to identify scene changes. If set, scene-based metrics will + be calculated and added as frame props alongside the `pkt_size` frame prop. + :param offset: Offset to trim or duplicate the list of packet sizes. This is useful when you're + working with a trimmed clip. Should be the same value as your trim at the start + of the clip. Negative values will set the packet sizes for the first `offset` + frames to -1 instead. + :param return_packet_sizes: If set to True, the function will return the packet sizes as a list of integers. + To get the scene-based stats, you will need to pass this list to the + `get_packet_scene_stats` function along with a Keyframes object. + Default: False. + :param func_except: Function returned for custom error handling. + This should only be set by VS package developers. + + :return: Input clip with `pkt_size` frame props added, with optionally scene-based packet + stats frame props added on top. if `return_packet_sizes` is set to True, it will + return the packet sizes as a list of integers instead. + """ + + func = func_except or get_packet_sizes + + if out_file is not None and SPath(out_file).exists(): + with open(out_file, "r+") as f: + pkt_sizes = [int(pkt) for pkt in f.readlines()] + else: + sfile = get_file_from_clip(clip, src_file, func_except=func) + pkt_sizes = _get_frames(sfile, func) # type:ignore[arg-type] + + if out_file is not None and not (sout := SPath(out_file)).exists(): + print(f"Writing packet sizes to \"{sout.absolute()}\"...") + + sout.parent.mkdir(parents=True, exist_ok=True) + sout.write_text("\n".join([str(pkt) for pkt in pkt_sizes]), "utf-8", newline="\n") + + if offset < 0: + pkt_sizes = [-1] * -offset + pkt_sizes + elif offset > 0: + pkt_sizes = pkt_sizes[offset:] + + if return_packet_sizes: + return pkt_sizes + + def _set_sizes_props(n: int, clip: vs.VideoNode, pkt_sizes: list[int]) -> vs.VideoNode: + if (pkt_size := pkt_sizes[n]) < 0: + warnings.warn(f"{func}: \"Frame {n} bitrate could not be determined!\"") + + return clip.std.SetFrameProps(pkt_size=pkt_size) + + if not keyframes: + return clip.std.FrameEval(partial(_set_sizes_props, clip=clip, pkt_sizes=pkt_sizes)) + + def _set_scene_stats(n: int, clip: vs.VideoNode, stats: list[dict[str, int]]) -> vs.VideoNode: + if (pkt_size := pkt_sizes[n]) < 0: + warnings.warn(f"{func}: \"Frame {n} bitrate could not be determined!\"") + + try: + return clip.std.SetFrameProps(pkt_size=pkt_size, **stats[n]) + except Exception: + warnings.warn(f"{func}: \"Could not find stats for a section... (Frame: {n})\"") + + return clip.std.SetFrameProps( + pkt_scene_avg_size=-1, + pkt_scene_max_size=-1, + pkt_scene_min_size=-1 + ) + + stats = get_packet_scene_stats(keyframes, pkt_sizes) + + return clip.std.FrameEval(partial(_set_scene_stats, clip=clip, stats=stats)) + + +def get_packet_scene_stats(keyframes: Keyframes, packet_sizes: list[int]) -> list[dict[str, float]]: + """ + Get basic scene-based stats from packet sizes and keyframes. + + :param keyframes: Keyframes object. This is used to determine where scenes start and end. + :param packet_sizes: Individual sizes for every frame. + + :return: list of dictionaries containing scene-based packet size stats. + """ + + stats = list[dict[str, float]]() + + no_stats_backup = (0.0, 0.0) + + try: + for start, end in zip(keyframes, keyframes[1:]): + pkt_scenes = packet_sizes[start:end] + + total_pkt_size = sum(pkt_scenes) + + avg_pkt_size = total_pkt_size / (len(pkt_scenes) or 1) + max_pkt_size = max(pkt_scenes or no_stats_backup) + min_pkt_size = min(pkt_scenes or no_stats_backup) + + for _ in pkt_scenes: + stats += [dict( + pkt_scene_avg_size=avg_pkt_size, + pkt_scene_max_size=max_pkt_size, + pkt_scene_min_size=min_pkt_size + )] + except ValueError as e: + raise CustomValueError("Some kind of error occurred!", get_packet_scene_stats, str(e)) + + return stats + + +def _get_frames(sfile: SPath, func: FuncExceptT) -> list[int]: + if not shutil.which("ffprobe"): + raise DependencyNotFoundError(func, "ffprobe", "Could not find {package}! Make sure it's in your PATH!") + + proc = sp.Popen( + [ + "ffprobe", "-hide_banner", "-show_frames", "-show_streams", "-threads", str(core.num_threads), + "-loglevel", "quiet", "-print_format", "json", "-select_streams", "v:0", + sfile + ], + stdout=sp.PIPE + ) + + with NamedTemporaryFile("a+", delete=False) as tempfile: + stempfile = SPath(tempfile.name) + + assert proc.stdout + + for line in io.TextIOWrapper(proc.stdout, "utf-8"): + tempfile.write(line) + + with open(stempfile) as f: + data = dict(json.load(f)) + + frames = data.get("frames", {}) + + if not frames: + raise CustomValueError(f"No frames found in file, \"{sfile}\"! Your file may be corrupted!", func) + + return [int(dict(frame).get("pkt_size", -1)) for frame in frames] diff --git a/lvsfunc/util.py b/lvsfunc/util.py index 62c2b78..29f5506 100644 --- a/lvsfunc/util.py +++ b/lvsfunc/util.py @@ -3,16 +3,19 @@ import colorsys import random import re -from typing import Any +from typing import overload, Any, Literal +from stgpytools import MISSING, FileWasNotFoundError, SPathLike from vstools import (CustomIndexError, CustomValueError, FrameRangeN, - FrameRangesN, FuncExceptT, KwargsT, - check_variable_resolution, core, get_h, get_w, vs) + FrameRangesN, FuncExceptT, KwargsT, SPath, + check_variable_resolution, core, get_h, get_prop, get_w, + vs) __all__ = [ 'colored_clips', 'convert_rfs', 'get_match_centers_scaling', + 'get_file_from_clip', ] @@ -194,3 +197,66 @@ def get_match_centers_scaling( height = clip.height * (target_height - 1) / (clip.height - 1) return KwargsT(width=width, height=height, base_width=target_width, base_height=target_height) + + +@overload +def get_file_from_clip( + clip: vs.VideoNode, fallback: SPathLike | None = ..., + prop: str = ..., strict: bool = True, + func_except: FuncExceptT | None = ... +) -> SPath: + ... + + +@overload +def get_file_from_clip( # type:ignore[misc] + clip: vs.VideoNode, fallback: SPathLike | None = ..., + prop: str = ..., strict: bool = False, + func_except: FuncExceptT | None = ... +) -> SPath | Literal[False]: + ... + + +def get_file_from_clip( + clip: vs.VideoNode, fallback: SPathLike | None = None, + prop: str = "idx_filepath", strict: bool = False, + func_except: FuncExceptT | None = None +) -> SPath | Literal[False]: + """ + Helper function to get the file path from a clip. + + This function also checks to ensure the file exists, + and throws an error if it doesn't. + + :param clip: The clip to get the file path from. + :param fallback: Fallback file path to use if the `prop` is not found. + :param prop: The property to get the file path from. + Default: "idx_filepath" (used by vs-source classes). + :param strict: If True, will raise an error if the `prop` is not found. + This makes it so the function will NEVER return False. + Default: False. + :param func_except: Function returned for custom error handling. + This should only be set by VS package developers. + + :raises FileWasNotFoundError: The file path was not found. + :raises FramePropError: The property was not found in the clip. + """ + + func = func_except or get_file_from_clip + + if fallback is not None and not (fallback_path := SPath(fallback)).exists() and strict: + raise FileWasNotFoundError("Fallback file not found!", func, fallback_path.absolute()) + + if not (path := get_prop(clip, prop, str, default=MISSING if strict else False, func=func)): + return fallback_path or False + + if not (spath := SPath(str(path))).exists() and not fallback: + raise FileWasNotFoundError("File not found!", func, spath.absolute()) + + if spath.exists(): + return spath + + if fallback is not None and fallback_path.exists(): + return fallback_path + + raise FileWasNotFoundError("File not found!", func, spath.absolute()) diff --git a/setup.py b/setup.py index a5582bc..bdd41a6 100755 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -import setuptools +import setuptools # type:ignore[import-untyped] from pathlib import Path long_description = Path("README.md").read_text() @@ -21,7 +21,8 @@ long_description=long_description, long_description_content_type="text/markdown", packages=[ - package_name + package_name, + f"{package_name}.packets", ], package_data={ package_name: ['py.typed'],