diff --git a/development/stream_interface/rfdetr_coco_same_shape_parity.py b/development/stream_interface/rfdetr_coco_same_shape_parity.py index 2ed30eec50..9050a7be16 100644 --- a/development/stream_interface/rfdetr_coco_same_shape_parity.py +++ b/development/stream_interface/rfdetr_coco_same_shape_parity.py @@ -1,8 +1,9 @@ """Compare RF-DETR instance-segmentation outputs on same-shape COCO images. This harness is used to reproduce the correctness table in the RF-DETR Triton -postprocess PR. It runs a baseline git ref with all RF-DETR fast paths disabled -and a candidate ref with only Triton RLE postprocess enabled, then compares +preprocess PR. It runs a baseline git ref with all RF-DETR fast paths disabled +and a candidate ref with Triton RLE postprocess and Triton preprocess enabled, +then compares detection counts, classes, boxes, scores, and RLE masks. Example: @@ -10,7 +11,7 @@ env PARITY_MODEL_PATH=/path/to/rfdetr-seg-nano-orin-trt-package \ python development/stream_interface/rfdetr_coco_same_shape_parity.py \ --base-ref main \ - --candidate-ref opt-python-postproc \ + --candidate-ref opt-preprocess \ --height 480 \ --width 640 \ --image-count 1000 @@ -47,7 +48,7 @@ } CANDIDATE_FLAGS_ON = { "INFERENCE_MODELS_RFDETR_TRITON_POSTPROC_ENABLED": "true", - "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED": "false", + "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED": "true", "RFDETR_PIPELINE_DEPTH": "1", "ENABLE_AUTO_CUDA_GRAPHS_FOR_TRT_BACKEND": "false", "RFDETR_NSIGHT_MARKERS": "false", diff --git a/development/stream_interface/rfdetr_preprocess_microbenchmark.py b/development/stream_interface/rfdetr_preprocess_microbenchmark.py new file mode 100644 index 0000000000..d06a1a9263 --- /dev/null +++ b/development/stream_interface/rfdetr_preprocess_microbenchmark.py @@ -0,0 +1,637 @@ +"""Capture/replay benchmark for RF-DETR reference preprocessing. + +Default usage captures 100 invocations of +``inference_models.models.rfdetr.pre_processing.pre_process_network_input`` from +the e2e workflow and immediately replays them: + + python development/stream_interface/rfdetr_preprocess_microbenchmark.py \ + --video_reference vehicles_1080p.mp4 + +Replay-only usage: + + python development/stream_interface/rfdetr_preprocess_microbenchmark.py \ + --mode replay --cases-dir temp/rfdetr_preprocess_cases + +The TRT RF-DETR model has a Triton fast path that bypasses this function. +Capture mode forces ``INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED=false`` +before loading the workflow so the reference preprocessing function is +exercised. Triton replay uses the same ``FastPreprocessRuntime`` helper as the +TRT adapter rather than duplicating eligibility, buffer, and metadata logic in +this harness. +""" + +import argparse +import functools +import importlib.util +import json +import os +from pathlib import Path +import pickle +import sys +import threading +from time import perf_counter, time +from typing import Any, Dict, List, Optional, Tuple, Union + +import numpy as np +import torch + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_INFERENCE_MODELS_ROOT = _REPO_ROOT / "inference_models" +_WORKFLOW_PATH = ( + _REPO_ROOT / "development" / "stream_interface" / "rfdetr_nano_seg_trt_workflow.py" +) +_TARGET_FUNCTION = "pre_process_network_input" +_SCHEMA_VERSION = 1 +_FORCED_PREPROC_ENV = "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED" +_TRITON_REPLAY_RUNTIMES: Dict[str, Any] = {} + + +def _ensure_local_import_paths() -> None: + for path in (str(_INFERENCE_MODELS_ROOT), str(_REPO_ROOT)): + if path not in sys.path: + sys.path.insert(0, path) + + +def _load_workflow_module() -> Any: + spec = importlib.util.spec_from_file_location( + "rfdetr_nano_seg_trt_workflow_for_preprocess_microbenchmark", + _WORKFLOW_PATH, + ) + if spec is None or spec.loader is None: + raise RuntimeError(f"Could not load workflow module from {_WORKFLOW_PATH}") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def _tensor_to_cpu(tensor: torch.Tensor) -> torch.Tensor: + return tensor.detach().cpu().clone() + + +def _snapshot_images(images: Any) -> Any: + if isinstance(images, torch.Tensor): + return {"kind": "tensor", "value": _tensor_to_cpu(images)} + if isinstance(images, np.ndarray): + return {"kind": "ndarray", "value": np.array(images, copy=True)} + if isinstance(images, list): + return {"kind": "list", "value": [_snapshot_images(image) for image in images]} + return {"kind": "raw", "value": images} + + +def _materialize_images(payload: Any, device: torch.device) -> Any: + kind = payload["kind"] + value = payload["value"] + if kind == "tensor": + return value.to(device=device).clone() + if kind == "ndarray": + return np.array(value, copy=True) + if kind == "list": + return [_materialize_images(image, device=device) for image in value] + if kind == "raw": + return value + raise RuntimeError(f"Unknown image payload kind: {kind}") + + +def _snapshot_inputs( + *, + images: Any, + image_pre_processing: Any, + network_input: Any, + target_device: torch.device, + input_color_format: Optional[Any], + image_size_wh: Optional[Union[int, Tuple[int, int]]], + pre_processing_overrides: Optional[Any], +) -> dict: + return { + "images": _snapshot_images(images), + "image_pre_processing": image_pre_processing, + "network_input": network_input, + "target_device": str(target_device), + "input_color_format": input_color_format, + "image_size_wh": image_size_wh, + "pre_processing_overrides": pre_processing_overrides, + } + + +def _snapshot_output(output: Tuple[torch.Tensor, List[Any]]) -> dict: + tensor, metadata = output + return { + "tensor": _tensor_to_cpu(tensor), + "metadata": list(metadata), + } + + +def _bind_target_arguments(args: tuple, kwargs: dict) -> dict: + names = ( + "images", + "image_pre_processing", + "network_input", + "target_device", + "input_color_format", + "image_size_wh", + "pre_processing_overrides", + ) + bound = { + "input_color_format": None, + "image_size_wh": None, + "pre_processing_overrides": None, + } + bound.update(dict(zip(names, args))) + bound.update(kwargs) + missing = [ + name + for name in ("images", "image_pre_processing", "network_input", "target_device") + if name not in bound + ] + if missing: + raise RuntimeError(f"Cannot capture target call; missing args: {missing}") + return {name: bound[name] for name in names} + + +def _write_pickle(path: Path, payload: dict) -> None: + tmp_path = path.with_suffix(path.suffix + ".tmp") + with tmp_path.open("wb") as f: + pickle.dump(payload, f, protocol=pickle.HIGHEST_PROTOCOL) + os.replace(tmp_path, path) + + +class _CaptureState: + def __init__(self, cases_dir: Path, limit: int) -> None: + self.cases_dir = cases_dir + self.limit = limit + self.count = 0 + self.lock = threading.Lock() + + def maybe_save(self, inputs: dict, output: Tuple[torch.Tensor, List[Any]]) -> None: + with self.lock: + if self.count >= self.limit: + return + case_index = self.count + payload = { + "schema_version": _SCHEMA_VERSION, + "case_index": case_index, + "inputs": _snapshot_inputs(**inputs), + "expected_output": _snapshot_output(output), + } + _write_pickle(self.cases_dir / f"case_{case_index:04d}.pkl", payload) + self.count += 1 + if self.count == 1 or self.count % 10 == 0 or self.count == self.limit: + print( + f"[capture] saved {self.count}/{self.limit} preprocess calls", + flush=True, + ) + + +def _install_capture_hook(state: _CaptureState) -> None: + _ensure_local_import_paths() + from inference_models.models.rfdetr import pre_processing as rfdetr_pre_processing + + original = getattr(rfdetr_pre_processing, _TARGET_FUNCTION) + + @functools.wraps(original) + def wrapper(*args: Any, **kwargs: Any) -> Tuple[torch.Tensor, List[Any]]: + result = original(*args, **kwargs) + state.maybe_save(inputs=_bind_target_arguments(args, kwargs), output=result) + return result + + setattr(rfdetr_pre_processing, _TARGET_FUNCTION, wrapper) + for module_name in ( + "inference_models.models.rfdetr.rfdetr_instance_segmentation_trt", + "inference_models.models.rfdetr.rfdetr_instance_segmentation_onnx", + "inference_models.models.rfdetr.rfdetr_instance_segmentation_pytorch", + "inference_models.models.rfdetr.rfdetr_object_detection_trt", + "inference_models.models.rfdetr.rfdetr_object_detection_onnx", + "inference_models.models.rfdetr.rfdetr_object_detection_pytorch", + ): + module = sys.modules.get(module_name) + if module is not None and hasattr(module, _TARGET_FUNCTION): + setattr(module, _TARGET_FUNCTION, wrapper) + + +def _prepare_cases_dir(cases_dir: Path, overwrite: bool) -> None: + cases_dir.mkdir(parents=True, exist_ok=True) + existing = list(cases_dir.glob("case_*.pkl")) + manifest_path = cases_dir / "manifest.json" + if not overwrite and (existing or manifest_path.exists()): + raise RuntimeError( + f"{cases_dir} already contains captured cases; pass --overwrite " + "or choose a different --cases-dir." + ) + if overwrite: + for path in existing: + path.unlink() + if manifest_path.exists(): + manifest_path.unlink() + + +def _write_manifest(cases_dir: Path, payload: dict) -> None: + with (cases_dir / "manifest.json").open("w") as f: + json.dump(payload, f, indent=2, sort_keys=True) + f.write("\n") + + +def _run_capture(args: argparse.Namespace) -> int: + if args.force_reference_preprocess: + os.environ[_FORCED_PREPROC_ENV] = "false" + + cases_dir = args.cases_dir.resolve() + _prepare_cases_dir(cases_dir=cases_dir, overwrite=args.overwrite) + + workflow = _load_workflow_module() + model_id = workflow._resolve_model_id(args.model_id, args.backend) + workflow._prepare_local_workflow_model_bundle(model_id) + if model_id != args.model_id: + print( + f"[model] using local TRT package via workflow model id: {model_id}", + flush=True, + ) + + state = _CaptureState(cases_dir=cases_dir, limit=args.capture_count) + _install_capture_hook(state=state) + + frame_count = 0 + start_time: Optional[float] = None + pipeline_ref: Dict[str, Any] = {} + + def sink(predictions: Any, video_frames: Any) -> None: + nonlocal frame_count, start_time + del video_frames + if not isinstance(predictions, list): + predictions = [predictions] + frame_count += sum(p is not None for p in predictions) + if start_time is None: + start_time = perf_counter() + if frame_count % args.progress_every == 0: + elapsed = perf_counter() - start_time + fps = frame_count / elapsed if elapsed > 0 else 0.0 + print( + f"[progress] frames={frame_count} fps={fps:.2f} " + f"captures={state.count}/{state.limit}", + flush=True, + ) + if state.count >= state.limit and "pipeline" in pipeline_ref: + pipeline_ref["pipeline"].terminate() + + pipeline = workflow.InferencePipeline.init_with_workflow( + video_reference=args.video_reference, + workflow_specification=workflow.build_workflow(model_id, args.confidence), + on_prediction=sink, + ) + pipeline_ref["pipeline"] = pipeline + pipeline.start() + pipeline.join() + + if state.count < args.capture_count: + raise RuntimeError( + f"Captured only {state.count}/{args.capture_count} invocations. " + "Use a longer video or lower --capture-count." + ) + + elapsed = perf_counter() - start_time if start_time else 0.0 + _write_manifest( + cases_dir=cases_dir, + payload={ + "schema_version": _SCHEMA_VERSION, + "function": ( + "inference_models.models.rfdetr.pre_processing." + "pre_process_network_input" + ), + "case_count": state.count, + "video_reference": args.video_reference, + "backend": args.backend, + "model_id": model_id, + "confidence": args.confidence, + "frames_seen_by_sink": frame_count, + "capture_elapsed_seconds": elapsed, + "created_at_unix": time(), + "forced_env": ( + f"{_FORCED_PREPROC_ENV}=false" + if args.force_reference_preprocess + else None + ), + }, + ) + print(f"[capture] wrote {state.count} cases to {cases_dir}", flush=True) + return state.count + + +def _resolve_device(device: str, captured: str) -> torch.device: + if device == "captured": + return torch.device(captured) + if device == "auto": + return torch.device("cuda" if torch.cuda.is_available() else "cpu") + resolved = torch.device(device) + if resolved.type == "cuda" and not torch.cuda.is_available(): + raise RuntimeError("--device cuda requested, but CUDA is not available") + return resolved + + +def _materialize_inputs(case: dict, device_override: str) -> dict: + inputs = case["inputs"] + target_device = _resolve_device( + device=device_override, + captured=inputs["target_device"], + ) + return { + "images": _materialize_images(inputs["images"], device=target_device), + "image_pre_processing": inputs["image_pre_processing"], + "network_input": inputs["network_input"], + "target_device": target_device, + "input_color_format": inputs["input_color_format"], + "image_size_wh": inputs["image_size_wh"], + "pre_processing_overrides": inputs["pre_processing_overrides"], + } + + +def _run_triton_fast_preprocess(inputs: dict) -> Tuple[torch.Tensor, List[Any]]: + from inference_models.models.rfdetr import triton_preprocess_runtime + from inference_models.models.rfdetr.triton_preprocess_runtime import ( + FastPreprocessRuntime, + ) + + target_device = inputs["target_device"] + if target_device.type != "cuda": + raise RuntimeError( + f"Triton replay requires CUDA target_device, got {target_device}" + ) + + # Capture may have forced the environment flag off in this process. Replay + # mode is an explicit request to exercise the Triton path, so force the + # runtime gate on while still using the production runtime helper. + triton_preprocess_runtime._FAST_PATH_ENABLED = True + + runtime_state = _TRITON_REPLAY_RUNTIMES.get(str(target_device)) + if runtime_state is None: + runtime_state = ( + FastPreprocessRuntime(device=target_device), + torch.cuda.Stream(device=target_device), + ) + _TRITON_REPLAY_RUNTIMES[str(target_device)] = runtime_state + runtime, stream = runtime_state + + result = runtime.try_preprocess( + images=inputs["images"], + input_color_format=inputs["input_color_format"], + image_size=inputs["image_size_wh"], + image_pre_processing=inputs["image_pre_processing"], + network_input=inputs["network_input"], + stream=stream, + ) + if result is None: + raise RuntimeError( + "Captured preprocess case is not supported by FastPreprocessRuntime; " + "run replay with --replay-implementation reference to benchmark the " + "reference path." + ) + return result.tensor, result.metadata + + +def _synchronize(device: torch.device) -> None: + if device.type == "cuda": + torch.cuda.synchronize(device) + + +def _load_case(path: Path) -> dict: + with path.open("rb") as f: + payload = pickle.load(f) + if payload.get("schema_version") != _SCHEMA_VERSION: + raise RuntimeError( + f"{path} has schema_version={payload.get('schema_version')}; " + f"expected {_SCHEMA_VERSION}." + ) + return payload + + +def _assert_tensor_equal( + *, + actual: torch.Tensor, + expected: torch.Tensor, + label: str, + atol: float, + rtol: float, +) -> None: + actual_cpu = actual.detach().cpu() + if torch.is_floating_point(actual_cpu) and (atol != 0.0 or rtol != 0.0): + equal = torch.allclose(actual_cpu, expected, atol=atol, rtol=rtol) + else: + equal = torch.equal(actual_cpu, expected) + if not equal: + max_abs = ( + (actual_cpu - expected).abs().max().item() + if actual_cpu.shape == expected.shape + else None + ) + raise AssertionError( + f"{label} differs: actual shape={tuple(actual_cpu.shape)} " + f"expected shape={tuple(expected.shape)} max_abs_diff={max_abs}" + ) + + +def _assert_outputs_equal( + *, + actual: Tuple[torch.Tensor, List[Any]], + expected: dict, + case_index: int, + atol: float, + rtol: float, +) -> None: + actual_tensor, actual_metadata = actual + _assert_tensor_equal( + actual=actual_tensor, + expected=expected["tensor"], + label=f"case {case_index} tensor", + atol=atol, + rtol=rtol, + ) + if list(actual_metadata) != list(expected["metadata"]): + raise AssertionError(f"case {case_index} metadata differs") + + +def _run_one_replay_case( + *, + case_path: Path, + device_override: str, + implementation: str, + atol: float, + rtol: float, +) -> float: + from inference_models.models.rfdetr.pre_processing import pre_process_network_input + + case = _load_case(case_path) + inputs = _materialize_inputs(case=case, device_override=device_override) + _synchronize(inputs["target_device"]) + start = perf_counter() + if implementation == "reference": + actual = pre_process_network_input(**inputs) + elif implementation == "triton": + actual = _run_triton_fast_preprocess(inputs) + else: + raise RuntimeError(f"Unknown replay implementation: {implementation}") + _synchronize(inputs["target_device"]) + elapsed = perf_counter() - start + _assert_outputs_equal( + actual=actual, + expected=case["expected_output"], + case_index=case["case_index"], + atol=atol, + rtol=rtol, + ) + return elapsed + + +def _summarize_timings(timings: List[float]) -> dict: + sorted_timings = sorted(timings) + total = sum(sorted_timings) + count = len(sorted_timings) + + def percentile(p: float) -> float: + if count == 0: + return 0.0 + index = min(count - 1, int(round((count - 1) * p))) + return sorted_timings[index] + + return { + "count": count, + "total_seconds": total, + "mean_ms": (total / count) * 1000 if count else 0.0, + "min_ms": sorted_timings[0] * 1000 if count else 0.0, + "p50_ms": percentile(0.50) * 1000, + "p90_ms": percentile(0.90) * 1000, + "p99_ms": percentile(0.99) * 1000, + "max_ms": sorted_timings[-1] * 1000 if count else 0.0, + } + + +def _print_timing_summary(summary: dict) -> None: + print( + "[replay] " + f"calls={summary['count']} " + f"total={summary['total_seconds']:.3f}s " + f"mean={summary['mean_ms']:.3f}ms " + f"p50={summary['p50_ms']:.3f}ms " + f"p90={summary['p90_ms']:.3f}ms " + f"p99={summary['p99_ms']:.3f}ms " + f"min={summary['min_ms']:.3f}ms " + f"max={summary['max_ms']:.3f}ms", + flush=True, + ) + + +def _run_replay(args: argparse.Namespace) -> dict: + _ensure_local_import_paths() + cases_dir = args.cases_dir.resolve() + case_paths = sorted(cases_dir.glob("case_*.pkl")) + if args.max_cases is not None: + case_paths = case_paths[: args.max_cases] + if not case_paths: + raise RuntimeError(f"No case_*.pkl files found in {cases_dir}") + + print( + f"[replay] cases={len(case_paths)} repeats={args.repeats} " + f"warmup_repeats={args.warmup_repeats} device={args.device} " + f"implementation={args.replay_implementation}", + flush=True, + ) + for _ in range(args.warmup_repeats): + for case_path in case_paths: + _run_one_replay_case( + case_path=case_path, + device_override=args.device, + implementation=args.replay_implementation, + atol=args.atol, + rtol=args.rtol, + ) + + timings = [] + for repeat_index in range(args.repeats): + for case_path in case_paths: + timings.append( + _run_one_replay_case( + case_path=case_path, + device_override=args.device, + implementation=args.replay_implementation, + atol=args.atol, + rtol=args.rtol, + ) + ) + print( + f"[replay] completed repeat {repeat_index + 1}/{args.repeats}", + flush=True, + ) + + summary = _summarize_timings(timings) + _print_timing_summary(summary) + print("[replay] all outputs matched captured e2e outputs", flush=True) + return summary + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument( + "--mode", + choices=("capture", "replay", "capture-and-replay"), + default="capture-and-replay", + ) + parser.add_argument("--video_reference", default="vehicles_1080p.mp4") + parser.add_argument("--model_id", default="rfdetr-seg-nano") + parser.add_argument("--confidence", type=float, default=0.4) + parser.add_argument("--backend", choices=("trt", "onnx", "torch"), default="trt") + parser.add_argument( + "--cases-dir", + type=Path, + default=Path("temp/rfdetr_preprocess_cases"), + ) + parser.add_argument("--capture-count", type=int, default=100) + parser.add_argument("--progress-every", type=int, default=50) + parser.add_argument( + "--overwrite", + action=argparse.BooleanOptionalAction, + default=True, + ) + parser.add_argument( + "--force-reference-preprocess", + action=argparse.BooleanOptionalAction, + default=True, + help=( + "Set INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED=false before " + "loading the workflow so pre_process_network_input is called." + ), + ) + parser.add_argument( + "--device", + default="captured", + help="'captured', 'auto', 'cpu', or a torch device string used on replay.", + ) + parser.add_argument( + "--replay-implementation", + choices=("reference", "triton"), + default="reference", + help="Implementation used by replay; capture always hooks pre_process_network_input.", + ) + parser.add_argument("--repeats", type=int, default=1) + parser.add_argument("--warmup-repeats", type=int, default=0) + parser.add_argument("--max-cases", type=int, default=None) + parser.add_argument("--atol", type=float, default=0.0) + parser.add_argument("--rtol", type=float, default=0.0) + args = parser.parse_args() + if args.capture_count <= 0: + raise ValueError("--capture-count must be positive") + if args.repeats <= 0: + raise ValueError("--repeats must be positive") + if args.warmup_repeats < 0: + raise ValueError("--warmup-repeats must be non-negative") + if args.progress_every <= 0: + raise ValueError("--progress-every must be positive") + return args + + +def main() -> None: + args = _parse_args() + if args.mode in {"capture", "capture-and-replay"}: + _run_capture(args=args) + if args.mode in {"replay", "capture-and-replay"}: + _run_replay(args=args) + + +if __name__ == "__main__": + main() diff --git a/inference_models/docs/changelog.md b/inference_models/docs/changelog.md index 5166f497ea..1473faafad 100644 --- a/inference_models/docs/changelog.md +++ b/inference_models/docs/changelog.md @@ -8,6 +8,9 @@ `INFERENCE_MODELS_RFDETR_TRITON_POSTPROC_ENABLED=True` to generate COCO RLE masks directly from sparse interpolated mask regions on supported CUDA inputs. +- Opt-in Triton RF-DETR instance-segmentation preprocessing for the TensorRT + backend. Set `INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED=True` to run the + supported resize and normalize path on CUDA. --- diff --git a/inference_models/inference_models/configuration.py b/inference_models/inference_models/configuration.py index 9350627ce1..5b10d0daf3 100644 --- a/inference_models/inference_models/configuration.py +++ b/inference_models/inference_models/configuration.py @@ -294,6 +294,11 @@ variable_name="INFERENCE_MODELS_RFDETR_TRITON_POSTPROC_ENABLED", default=DEFAULT_INFERENCE_MODELS_RFDETR_TRITON_POSTPROC_ENABLED, ) +DEFAULT_INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED = False +INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED = get_boolean_from_env( + variable_name="INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED", + default=DEFAULT_INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED, +) INFERENCE_MODELS_ROBOFLOW_INSTANT_DEFAULT_CONFIDENCE = get_float_from_env( variable_name="INFERENCE_MODELS_ROBOFLOW_INSTANT_DEFAULT_CONFIDENCE", default=0.99, diff --git a/inference_models/inference_models/models/rfdetr/rfdetr_instance_segmentation_trt.py b/inference_models/inference_models/models/rfdetr/rfdetr_instance_segmentation_trt.py index e8196f8eae..1e570e3f9f 100644 --- a/inference_models/inference_models/models/rfdetr/rfdetr_instance_segmentation_trt.py +++ b/inference_models/inference_models/models/rfdetr/rfdetr_instance_segmentation_trt.py @@ -52,6 +52,9 @@ post_process_instance_segmentation_results_to_rle_masks, ) from inference_models.models.rfdetr.pre_processing import pre_process_network_input +from inference_models.models.rfdetr.triton_preprocess_runtime import ( + FastPreprocessRuntime, +) from inference_models.weights_providers.entities import RecommendedParameters try: @@ -218,8 +221,10 @@ def __init__( self._trt_cuda_graph_cache = trt_cuda_graph_cache self._lock = threading.Lock() self._inference_stream = torch.cuda.Stream(device=self._device) + self._pre_process_cuda_stream = torch.cuda.Stream(device=self._device) self._thread_local_storage = threading.local() self.recommended_parameters = recommended_parameters + self._fast_preprocess_runtime = FastPreprocessRuntime(device=self._device) @property def class_names(self) -> List[str]: @@ -237,6 +242,17 @@ def pre_process( pre_processing_overrides: Optional[PreProcessingOverrides] = None, **kwargs, ) -> Tuple[torch.Tensor, List[PreProcessingMetadata]]: + fast = self._fast_preprocess_runtime.try_preprocess( + images=images, + input_color_format=input_color_format, + image_size=image_size, + image_pre_processing=self._inference_config.image_pre_processing, + network_input=self._inference_config.network_input, + stream=self._pre_process_stream, + ) + if fast is not None: + self._fast_preproc_event = fast.ready_event + return fast.tensor, fast.metadata with torch.cuda.stream(self._pre_process_stream): pre_processed_images, pre_processing_meta = pre_process_network_input( images=images, @@ -248,6 +264,7 @@ def pre_process( pre_processing_overrides=pre_processing_overrides, ) self._pre_process_stream.synchronize() + pre_processed_images._pre_processing_meta = pre_processing_meta # type: ignore[attr-defined] return pre_processed_images, pre_processing_meta def forward( @@ -257,6 +274,10 @@ def forward( **kwargs, ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: cache = self._trt_cuda_graph_cache if not disable_cuda_graphs else None + preproc_event = getattr(self, "_fast_preproc_event", None) + if preproc_event is not None: + self._inference_stream.wait_event(preproc_event) + self._fast_preproc_event = None with self._lock: with use_cuda_context(context=self._cuda_context): detections, labels, masks = infer_from_trt_engine( @@ -324,11 +345,7 @@ def post_process( @property def _pre_process_stream(self) -> torch.cuda.Stream: - if not hasattr(self._thread_local_storage, "pre_process_stream"): - self._thread_local_storage.pre_process_stream = torch.cuda.Stream( - device=self._device - ) - return self._thread_local_storage.pre_process_stream + return self._pre_process_cuda_stream @property def _post_process_stream(self) -> torch.cuda.Stream: diff --git a/inference_models/inference_models/models/rfdetr/triton_preprocess.py b/inference_models/inference_models/models/rfdetr/triton_preprocess.py new file mode 100644 index 0000000000..a3d88d0a69 --- /dev/null +++ b/inference_models/inference_models/models/rfdetr/triton_preprocess.py @@ -0,0 +1,686 @@ +"""Triton preprocessing kernels for RF-DETR. + +Byte-exact port of PIL's separable bilinear-antialias resize (the algorithm +torchvision's `TF.resize(pil, ..., antialias=True)` uses on PIL inputs), with +the subsequent `/255` + ImageNet normalize fused into the same pass. + +PIL's scheme (src/libImaging/Resample.c): + + PRECISION_BITS = 22 + scale = in_size / out_size + filterscale = max(1.0, scale) + support = 1.0 * filterscale # triangle radius = 1 + ksize = ceil(support) * 2 + 1 + center(o) = (o + 0.5) * scale + xmin(o) = int(center - support + 0.5) clipped to [0, in] + xmax(o) = int(center + support + 0.5) clipped to [0, in] + w_f(o, k) = triangle((k + xmin - center + 0.5) / filterscale) + w_f normalised to sum to 1 per output pixel + w_i(o, k) = round(w_f(o, k) * (1 << PRECISION_BITS)) int32 + out(o) = clamp((Σ w_i(o, k) * src_u8) + (1 << (PRECISION_BITS-1)) >> PRECISION_BITS, 0, 255) + +The runtime implementation is the consolidated two-pass path: +horizontal PIL-antialias resize into a uint8 CHW scratch buffer, followed by +the vertical pass plus `/255` + ImageNet normalization into fp32 CHW output. + +Tensor contracts: + +* ``src`` is a CUDA uint8 HWC image with shape ``(raw_h, raw_w, 3)``. The + hot TRT path currently passes a full frame with no static crop, but the + kernels also accept crop offsets and logical crop dimensions. +* ``tmp`` is a CUDA uint8 CHW scratch tensor with shape + ``(3, src_h, target_w)``. It stores the horizontally resized image after + the same fixed-point rounding PIL applies between its separable passes. +* ``out`` is a CUDA fp32 NCHW tensor with shape ``(1, 3, target_h, target_w)`` + in network channel order. Each element is ``(uint8 / 255 - mean) / std``. +* ``ResampleTables`` owns the per-axis int32 fixed-point start/weight tables + that PIL would precompute for this source/target shape pair. +""" + +from __future__ import annotations + +import math +import os +from typing import Optional, Tuple + +import numpy as np +import torch + +from inference_models.errors import ( + MissingDependencyError, + ModelInputError, + ModelRuntimeError, +) + +try: + import triton + import triton.language as tl + + TRITON_AVAILABLE = True +except ImportError: # pragma: no cover + triton = None + tl = None + TRITON_AVAILABLE = False + + +PRECISION_BITS = 22 +_PREPROC_BLOCK_H_ENV = "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_BLOCK_H" +_PREPROC_BLOCK_W_ENV = "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_BLOCK_W" +_PREPROC_HORIZONTAL_BLOCK_H_ENV = ( + "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_HORIZONTAL_BLOCK_H" +) +_PREPROC_HORIZONTAL_BLOCK_W_ENV = ( + "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_HORIZONTAL_BLOCK_W" +) + + +def _read_power_of_two_env(name: str, default: int) -> int: + """Read an optional Triton block-size override from the environment. + + The preprocess kernels use power-of-two block sizes so Triton can form + static tensor shapes for vectorized loads/stores. This helper keeps those + launch-shape constraints local to the kernel wrapper and raises a + user-facing ``ModelRuntimeError`` for invalid values. + """ + raw = os.getenv(name) + if raw is None or raw.strip() == "": + return default + try: + value = int(raw) + except ValueError as error: + raise ModelRuntimeError( + message=f"{name} must be an integer, got {raw!r}.", + help_url="https://inference-models.roboflow.com/errors/models-runtime/#modelruntimeerror", + ) from error + if value <= 0 or value & (value - 1) != 0: + raise ModelRuntimeError( + message=f"{name} must be a positive power of two, got {value}.", + help_url="https://inference-models.roboflow.com/errors/models-runtime/#modelruntimeerror", + ) + if value > 512: + raise ModelRuntimeError( + message=f"{name} must be <= 512, got {value}.", + help_url="https://inference-models.roboflow.com/errors/models-runtime/#modelruntimeerror", + ) + return value + + +def _bilinear_antialias_weights_1d_int( + in_size: int, out_size: int +) -> Tuple[np.ndarray, np.ndarray, int]: + """Build one axis of PIL-compatible bilinear-antialias tables. + + Args: + in_size: Number of pixels along the source axis after static cropping. + out_size: Number of pixels along the resized output axis. + + Returns: + ``(starts, weights_int, ksize)`` where ``starts`` has shape + ``(out_size,)`` and gives the first source sample for each output + coordinate, ``weights_int`` has shape ``(out_size, ksize)`` and stores + PIL's normalized triangle weights in ``PRECISION_BITS`` fixed-point + format, and ``ksize`` is the compile-time convolution width used by the + Triton loop for that axis. + """ + scale = in_size / out_size + filterscale = max(1.0, scale) + support = filterscale + ksize = int(math.ceil(support)) * 2 + 1 + + starts = np.zeros(out_size, dtype=np.int32) + weights_fp = np.zeros((out_size, ksize), dtype=np.float64) + inv_fs = 1.0 / filterscale + + for o in range(out_size): + center = (o + 0.5) * scale + xmin = int(center - support + 0.5) + if xmin < 0: + xmin = 0 + xmax = int(center + support + 0.5) + if xmax > in_size: + xmax = in_size + actual = xmax - xmin + starts[o] = xmin + total = 0.0 + for k in range(actual): + t = (k + xmin - center + 0.5) * inv_fs + t_abs = -t if t < 0.0 else t + w = 1.0 - t_abs if t_abs < 1.0 else 0.0 + weights_fp[o, k] = w + total += w + if total != 0.0: + weights_fp[o, :actual] /= total + + weights_int = np.rint(weights_fp * (1 << PRECISION_BITS)).astype(np.int32) + return starts, weights_int, ksize + + +if TRITON_AVAILABLE: + + _HALF = 1 << (PRECISION_BITS - 1) + + @triton.jit + def horizontal_resize_uint8_all_channels_kernel( + src_ptr, + tmp_ptr, + xmin_ptr, + wx_ptr, + src_h, + src_w, + src_stride_h, + src_stride_w, + crop_offset_y, + crop_offset_x, + target_w, + CH_R: tl.constexpr, + CH_G: tl.constexpr, + CH_B: tl.constexpr, + KSIZE_X: tl.constexpr, + PRECISION_BITS_C: tl.constexpr, + HALF_C: tl.constexpr, + BLOCK_H: tl.constexpr, + BLOCK_W: tl.constexpr, + ): + """Compute PIL's horizontal resize pass for one tile. + + Args: + src_ptr: CUDA uint8 HWC source image, shape ``(raw_h, raw_w, 3)``. + tmp_ptr: CUDA uint8 CHW scratch output, shape + ``(3, src_h, target_w)``. + xmin_ptr: CUDA int32 starts table, shape ``(target_w,)``. + wx_ptr: CUDA int32 flattened weights table, shape + ``(target_w * KSIZE_X,)``. + src_h/src_w: Logical source height/width after crop. These drive + bounds checks for the resized region. + src_stride_h/src_stride_w: Source strides in elements, used so the + kernel does not assume contiguous row pitch beyond HWC layout. + crop_offset_y/crop_offset_x: Offset into ``src_ptr`` for static + crop support. The TRT fast path passes zero. + target_w: Width of the resized network input. + CH_R/CH_G/CH_B: Source channel indices to emit into network + channel order. For BGR input feeding an RGB model this is + ``2, 1, 0``. + + Output: + Writes the horizontally resized and PIL-rounded uint8 values into + ``tmp_ptr`` in CHW order. The vertical kernel consumes this scratch + buffer as its input image. + """ + # Program ids tile over logical source rows and target columns. The + # y-axis is still source height because this pass only resizes width. + pid_y = tl.program_id(0) + pid_x = tl.program_id(1) + + offs_y = pid_y * BLOCK_H + tl.arange(0, BLOCK_H) + offs_x = pid_x * BLOCK_W + tl.arange(0, BLOCK_W) + mask_y = offs_y < src_h + mask_x = offs_x < target_w + mask_out = mask_y[:, None] & mask_x[None, :] + + # For each output x, PIL precomputes the first contributing source x + # and a fixed-width row of int32 fixed-point triangle weights. + xmin = tl.load(xmin_ptr + offs_x, mask=mask_x, other=0) + sy = offs_y + crop_offset_y + + hacc_r = tl.zeros((BLOCK_H, BLOCK_W), dtype=tl.int32) + hacc_g = tl.zeros((BLOCK_H, BLOCK_W), dtype=tl.int32) + hacc_b = tl.zeros((BLOCK_H, BLOCK_W), dtype=tl.int32) + for kx in tl.static_range(KSIZE_X): + sx = xmin + kx + sx_c = tl.maximum(tl.minimum(sx, src_w - 1), 0) + crop_offset_x + wx = tl.load(wx_ptr + offs_x * KSIZE_X + kx, mask=mask_x, other=0) + base = sy[:, None] * src_stride_h + sx_c[None, :] * src_stride_w + # Load source pixels in the network's channel order so the channel + # swap replaces the original PIL image conversion step. + p_r = tl.load(src_ptr + base + CH_R, mask=mask_out, other=0).to(tl.int32) + p_g = tl.load(src_ptr + base + CH_G, mask=mask_out, other=0).to(tl.int32) + p_b = tl.load(src_ptr + base + CH_B, mask=mask_out, other=0).to(tl.int32) + wx_2d = wx[None, :] + # Fixed-point horizontal convolution: sum(src * PIL_weight_int). + hacc_r += p_r * wx_2d + hacc_g += p_g * wx_2d + hacc_b += p_b * wx_2d + + # Match PIL's intermediate uint8 rounding before the vertical pass. + q_r = (hacc_r + HALF_C) >> PRECISION_BITS_C + q_g = (hacc_g + HALF_C) >> PRECISION_BITS_C + q_b = (hacc_b + HALF_C) >> PRECISION_BITS_C + q_r = tl.minimum(tl.maximum(q_r, 0), 255) + q_g = tl.minimum(tl.maximum(q_g, 0), 255) + q_b = tl.minimum(tl.maximum(q_b, 0), 255) + + out_row = offs_y[:, None] * target_w + offs_x[None, :] + channel_stride = src_h * target_w + # CHW scratch keeps the following vertical pass contiguous along x for + # one output channel at a time. + tl.store(tmp_ptr + 0 * channel_stride + out_row, q_r, mask=mask_out) + tl.store(tmp_ptr + 1 * channel_stride + out_row, q_g, mask=mask_out) + tl.store(tmp_ptr + 2 * channel_stride + out_row, q_b, mask=mask_out) + + @triton.jit + def vertical_normalize_from_horizontal_kernel( + tmp_ptr, + dst_ptr, + ymin_ptr, + wy_ptr, + src_h, + dst_stride_c, + dst_stride_h, + target_h, + target_w, + inv_std_255_r, + inv_std_255_g, + inv_std_255_b, + offset_r, + offset_g, + offset_b, + KSIZE_Y: tl.constexpr, + PRECISION_BITS_C: tl.constexpr, + HALF_C: tl.constexpr, + BLOCK_H: tl.constexpr, + BLOCK_W: tl.constexpr, + ): + """Compute PIL's vertical resize pass and torchvision normalization. + + Args: + tmp_ptr: CUDA uint8 CHW horizontal scratch, shape + ``(3, src_h, target_w)``. + dst_ptr: CUDA fp32 NCHW output, shape + ``(1, 3, target_h, target_w)``. + ymin_ptr: CUDA int32 starts table, shape ``(target_h,)``. + wy_ptr: CUDA int32 flattened weights table, shape + ``(target_h * KSIZE_Y,)``. + src_h: Logical source height after crop and after the horizontal + pass. This is the height of ``tmp_ptr``. + dst_stride_c/dst_stride_h: Output strides in elements. + target_h/target_w: Resized network input shape. + inv_std_255_*: Precomputed ``1 / (255 * std[channel])`` values. + offset_*: Precomputed ``-mean[channel] / std[channel]`` values. + + Output: + Writes normalized fp32 NCHW data into ``dst_ptr``. This is the + tensor consumed directly by TensorRT. + """ + # Program ids tile over output rows, output columns, and the three + # output channels. Channel-specific normalization is selected by pid_c. + pid_y = tl.program_id(0) + pid_x = tl.program_id(1) + pid_c = tl.program_id(2) + + offs_y = pid_y * BLOCK_H + tl.arange(0, BLOCK_H) + offs_x = pid_x * BLOCK_W + tl.arange(0, BLOCK_W) + mask_y = offs_y < target_h + mask_x = offs_x < target_w + mask_out = mask_y[:, None] & mask_x[None, :] + + # For each output y, load the first source row and PIL fixed-point + # weights for the vertical half of the separable resize. + ymin = tl.load(ymin_ptr + offs_y, mask=mask_y, other=0) + + vacc = tl.zeros((BLOCK_H, BLOCK_W), dtype=tl.int32) + for ky in tl.static_range(KSIZE_Y): + sy = ymin + ky + sy_c = tl.maximum(tl.minimum(sy, src_h - 1), 0) + wy = tl.load(wy_ptr + offs_y * KSIZE_Y + ky, mask=mask_y, other=0) + base = sy_c[:, None] * target_w + offs_x[None, :] + p = tl.load( + tmp_ptr + pid_c * src_h * target_w + base, mask=mask_out, other=0 + ).to(tl.int32) + # Fixed-point vertical convolution over the horizontally rounded + # scratch buffer, matching PIL's second resample pass. + vacc += p * wy[:, None] + + # Final PIL uint8 rounding/clamping before torchvision's to_tensor. + q = (vacc + HALF_C) >> PRECISION_BITS_C + q = tl.minimum(tl.maximum(q, 0), 255) + + # Fuse TF.to_tensor() (`q / 255`) and TF.normalize(). + inv_std_255 = tl.where( + pid_c == 0, + inv_std_255_r, + tl.where(pid_c == 1, inv_std_255_g, inv_std_255_b), + ) + offset = tl.where( + pid_c == 0, + offset_r, + tl.where(pid_c == 1, offset_g, offset_b), + ) + out = q.to(tl.float32) * inv_std_255 + offset + + out_row = offs_y[:, None] * dst_stride_h + offs_x[None, :] + tl.store(dst_ptr + pid_c * dst_stride_c + out_row, out, mask=mask_out) + + +class ResampleTables: + """CUDA cache of PIL fixed-point resize tables for one shape pair. + + Attributes: + ymin_gpu: int32 tensor with shape ``(target_h,)``. ``ymin_gpu[y]`` is + the first source row contributing to output row ``y``. + xmin_gpu: int32 tensor with shape ``(target_w,)``. ``xmin_gpu[x]`` is + the first source column contributing to output column ``x``. + wy_gpu: int32 flattened tensor with shape ``(target_h * ksize_y,)``. + Row ``y`` contains the fixed-point vertical weights for output row + ``y``. + wx_gpu: int32 flattened tensor with shape ``(target_w * ksize_x,)``. + Row ``x`` contains the fixed-point horizontal weights for output + column ``x``. + ksize_y/ksize_x: Static loop bounds for the vertical and horizontal + Triton kernels. They are determined by PIL's antialias support + radius for the current source/target scale. + """ + + __slots__ = ( + "ymin_gpu", + "xmin_gpu", + "wy_gpu", + "wx_gpu", + "ksize_y", + "ksize_x", + ) + + def __init__( + self, + ymin_gpu: torch.Tensor, + xmin_gpu: torch.Tensor, + wy_gpu: torch.Tensor, + wx_gpu: torch.Tensor, + ksize_y: int, + ksize_x: int, + ) -> None: + self.ymin_gpu = ymin_gpu + self.xmin_gpu = xmin_gpu + self.wy_gpu = wy_gpu + self.wx_gpu = wx_gpu + self.ksize_y = ksize_y + self.ksize_x = ksize_x + + +def resolve_two_pass_launch_config() -> Tuple[int, int, int, int]: + """Resolve block sizes for the two-pass Triton implementation. + + Returns: + ``(vertical_block_h, vertical_block_w, horizontal_block_h, + horizontal_block_w)``. Defaults are tuned for the RF-DETR TRT workload, + while environment variables allow microbenchmark sweeps without code + changes. + """ + return ( + _read_power_of_two_env(_PREPROC_BLOCK_H_ENV, 1), + _read_power_of_two_env(_PREPROC_BLOCK_W_ENV, 128), + _read_power_of_two_env(_PREPROC_HORIZONTAL_BLOCK_H_ENV, 1), + _read_power_of_two_env(_PREPROC_HORIZONTAL_BLOCK_W_ENV, 128), + ) + + +def build_resample_tables( + src_h: int, + src_w: int, + target_h: int, + target_w: int, + device: torch.device, +) -> ResampleTables: + """Build and upload PIL-compatible resample tables for one resize. + + Args: + src_h/src_w: Effective source image dimensions after optional crop. + target_h/target_w: Network input dimensions after resize. + device: CUDA device where the Triton kernels will run. + + Returns: + ``ResampleTables`` with all starts/weights already copied to ``device``. + The hot TRT path keeps this object in a shape-keyed cache so table + construction is not repeated per frame. + """ + ymin, wy, ksize_y = _bilinear_antialias_weights_1d_int(src_h, target_h) + xmin, wx, ksize_x = _bilinear_antialias_weights_1d_int(src_w, target_w) + return ResampleTables( + ymin_gpu=torch.from_numpy(ymin).to(device=device, non_blocking=True), + xmin_gpu=torch.from_numpy(xmin).to(device=device, non_blocking=True), + wy_gpu=torch.from_numpy(wy.ravel()).to(device=device, non_blocking=True), + wx_gpu=torch.from_numpy(wx.ravel()).to(device=device, non_blocking=True), + ksize_y=ksize_y, + ksize_x=ksize_x, + ) + + +def triton_preprocess_rfdetr_stretch_two_pass_preallocated( + src: torch.Tensor, + out: torch.Tensor, + tmp: torch.Tensor, + tables: ResampleTables, + target_h: int, + target_w: int, + means: Tuple[float, float, float], + stds: Tuple[float, float, float], + swap_rb: bool, + launch_config: Tuple[int, int, int, int], + crop_offset_y: int = 0, + crop_offset_x: int = 0, + crop_h: Optional[int] = None, + crop_w: Optional[int] = None, +) -> torch.Tensor: + """Launch the fast two-pass preprocessor using caller-owned buffers. + + This is the hot path used by the TensorRT adapter. It intentionally assumes + the caller already validated shapes, dtypes, device placement, and table + compatibility so each frame only pays for the HtoD copy and two Triton + kernel launches. + + Args: + src: CUDA uint8 HWC source tensor, shape ``(raw_h, raw_w, 3)``. + out: CUDA fp32 NCHW output tensor, shape + ``(1, 3, target_h, target_w)``. + tmp: CUDA uint8 CHW scratch tensor, shape ``(3, src_h, target_w)``. + tables: ``ResampleTables`` built for ``(src_h, src_w)`` to + ``(target_h, target_w)``. + target_h/target_w: Network input dimensions. + means/stds: Per-channel normalization constants in output channel + order. + swap_rb: Whether to swap source red/blue channels while writing + network-order output channels. + launch_config: Block sizes returned by ``resolve_two_pass_launch_config``. + crop_offset_y/crop_offset_x: Optional top-left crop offset in ``src``. + crop_h/crop_w: Optional logical source shape after crop. When omitted, + the full source tensor shape is used. + + Returns: + The same ``out`` tensor after scheduling both kernels on the current + CUDA stream. + """ + raw_src_h, raw_src_w = int(src.shape[0]), int(src.shape[1]) + src_h = crop_h if crop_h is not None else raw_src_h + src_w = crop_w if crop_w is not None else raw_src_w + src_stride_h = int(src.stride(0)) + src_stride_w = int(src.stride(1)) + dst_stride_c = target_h * target_w + dst_stride_h = target_w + + if swap_rb: + ch_r, ch_g, ch_b = 2, 1, 0 + else: + ch_r, ch_g, ch_b = 0, 1, 2 + + inv_std_255_r = 1.0 / (255.0 * stds[0]) + inv_std_255_g = 1.0 / (255.0 * stds[1]) + inv_std_255_b = 1.0 / (255.0 * stds[2]) + offset_r = -means[0] / stds[0] + offset_g = -means[1] / stds[1] + offset_b = -means[2] / stds[2] + block_h, block_w, horizontal_block_h, horizontal_block_w = launch_config + + # First reproduce PIL's horizontal resize into uint8 scratch. This is the + # only pass that reads the raw HWC frame. + horizontal_grid = ( + (src_h + horizontal_block_h - 1) // horizontal_block_h, + (target_w + horizontal_block_w - 1) // horizontal_block_w, + ) + horizontal_resize_uint8_all_channels_kernel[horizontal_grid]( + src, + tmp, + tables.xmin_gpu, + tables.wx_gpu, + src_h, + src_w, + src_stride_h, + src_stride_w, + int(crop_offset_y), + int(crop_offset_x), + target_w, + CH_R=ch_r, + CH_G=ch_g, + CH_B=ch_b, + KSIZE_X=tables.ksize_x, + PRECISION_BITS_C=PRECISION_BITS, + HALF_C=_HALF, + BLOCK_H=horizontal_block_h, + BLOCK_W=horizontal_block_w, + ) + # Then reproduce PIL's vertical resize and fuse the torchvision tensor + # conversion and normalization into the final fp32 TensorRT input. + grid = ( + (target_h + block_h - 1) // block_h, + (target_w + block_w - 1) // block_w, + ) + vertical_normalize_from_horizontal_kernel[(grid[0], grid[1], 3)]( + tmp, + out, + tables.ymin_gpu, + tables.wy_gpu, + src_h, + dst_stride_c, + dst_stride_h, + target_h, + target_w, + float(inv_std_255_r), + float(inv_std_255_g), + float(inv_std_255_b), + float(offset_r), + float(offset_g), + float(offset_b), + KSIZE_Y=tables.ksize_y, + PRECISION_BITS_C=PRECISION_BITS, + HALF_C=_HALF, + BLOCK_H=block_h, + BLOCK_W=block_w, + ) + return out + + +def triton_preprocess_rfdetr_stretch( + src: torch.Tensor, + tables: ResampleTables, + target_h: int, + target_w: int, + means: Tuple[float, float, float] = (0.485, 0.456, 0.406), + stds: Tuple[float, float, float] = (0.229, 0.224, 0.225), + swap_rb: bool = True, + crop_offset_y: int = 0, + crop_offset_x: int = 0, + crop_h: Optional[int] = None, + crop_w: Optional[int] = None, + out: Optional[torch.Tensor] = None, + tmp: Optional[torch.Tensor] = None, +) -> torch.Tensor: + """PIL-exact resize + color swap + normalize using the two-pass kernels. + + Args: + src: uint8 CUDA tensor, shape ``(raw_h, raw_w, 3)``, HWC layout. + tables: precomputed int32 resample tables sized against the *cropped* + source ``(crop_h, crop_w)`` to ``(target_h, target_w)``. + target_h, target_w: output spatial dims. + means, stds: normalization in output channel order (R, G, B for + network_input.color_mode == 'rgb'). + swap_rb: if True, source channel 0 → output B (BGR input, RGB network). + crop_offset_y/_x: load-time offset into `src` for a static crop. 0 + means no crop. + crop_h/_w: effective source dims after crop. Defaults to src dims + when no crop is configured. + out: optional preallocated fp32 ``(1, 3, target_h, target_w)`` CUDA + tensor. + tmp: optional preallocated uint8 ``(3, crop_h/raw_h, target_w)`` CUDA + tensor used by the horizontal pass. + + Returns: + fp32 ``(1, 3, target_h, target_w)`` on the same device as ``src``. + """ + if not TRITON_AVAILABLE: + raise MissingDependencyError( + message="triton is not installed", + help_url="https://inference-models.roboflow.com/errors/runtime-environment/#missingdependencyerror", + ) + if not src.is_cuda: + raise ModelInputError( + message=f"expected CUDA src tensor, got device={src.device}", + help_url="https://inference-models.roboflow.com/errors/input-validation/#modelinputerror", + ) + if src.dtype != torch.uint8: + raise ModelInputError( + message=f"expected uint8 src, got {src.dtype}", + help_url="https://inference-models.roboflow.com/errors/input-validation/#modelinputerror", + ) + if src.ndim != 3 or src.shape[2] != 3: + raise ModelInputError( + message=f"expected HWC 3-channel, got shape={tuple(src.shape)}", + help_url="https://inference-models.roboflow.com/errors/input-validation/#modelinputerror", + ) + + src = src.contiguous() + raw_src_h, raw_src_w = int(src.shape[0]), int(src.shape[1]) + src_h = crop_h if crop_h is not None else raw_src_h + src_w = crop_w if crop_w is not None else raw_src_w + src_stride_h = int(src.stride(0)) + src_stride_w = int(src.stride(1)) + + if out is None: + out = torch.empty( + (1, 3, target_h, target_w), dtype=torch.float32, device=src.device + ) + else: + if tuple(out.shape) != (1, 3, target_h, target_w): + raise ModelRuntimeError( + message=( + f"out has shape {tuple(out.shape)}, expected " + f"(1, 3, {target_h}, {target_w})" + ), + help_url="https://inference-models.roboflow.com/errors/models-runtime/#modelruntimeerror", + ) + if out.dtype != torch.float32 or not out.is_cuda: + raise ModelRuntimeError( + message="out must be fp32 CUDA tensor", + help_url="https://inference-models.roboflow.com/errors/models-runtime/#modelruntimeerror", + ) + + if tmp is None: + tmp = torch.empty((3, src_h, target_w), dtype=torch.uint8, device=src.device) + else: + if tuple(tmp.shape) != (3, src_h, target_w): + raise ModelRuntimeError( + message=( + f"tmp has shape {tuple(tmp.shape)}, expected " + f"(3, {src_h}, {target_w})" + ), + help_url="https://inference-models.roboflow.com/errors/models-runtime/#modelruntimeerror", + ) + if tmp.dtype != torch.uint8 or not tmp.is_cuda: + raise ModelRuntimeError( + message="tmp must be uint8 CUDA tensor", + help_url="https://inference-models.roboflow.com/errors/models-runtime/#modelruntimeerror", + ) + + return triton_preprocess_rfdetr_stretch_two_pass_preallocated( + src=src, + out=out, + tmp=tmp, + tables=tables, + target_h=target_h, + target_w=target_w, + means=means, + stds=stds, + swap_rb=swap_rb, + launch_config=resolve_two_pass_launch_config(), + crop_offset_y=crop_offset_y, + crop_offset_x=crop_offset_x, + crop_h=crop_h, + crop_w=crop_w, + ) diff --git a/inference_models/inference_models/models/rfdetr/triton_preprocess_runtime.py b/inference_models/inference_models/models/rfdetr/triton_preprocess_runtime.py new file mode 100644 index 0000000000..7e72c28cad --- /dev/null +++ b/inference_models/inference_models/models/rfdetr/triton_preprocess_runtime.py @@ -0,0 +1,416 @@ +"""Runtime glue for RF-DETR TensorRT Triton preprocessing. + +This module owns the pieces that are specific to running the Triton +preprocessor inside the TensorRT RF-DETR model adapter: fast-path eligibility, +warning throttling, reusable CUDA buffers, and CUDA event handoff to the TRT +inference stream. The numerical resize/normalize kernels live in +``triton_preprocess.py``; this file only decides when and how to launch them. +""" + +from __future__ import annotations + +import warnings +from dataclasses import dataclass +from typing import List, Optional, Tuple + +import numpy as np +import torch + +from inference_models.configuration import ( + INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED, +) +from inference_models.entities import ColorFormat, ImageDimensions +from inference_models.models.common.roboflow.model_packages import ( + ColorMode, + ImagePreProcessing, + NetworkInputDefinition, + PreProcessingMetadata, + ResizeMode, + StaticCropOffset, +) + +try: + from inference_models.models.rfdetr.triton_preprocess import ( + TRITON_AVAILABLE as _TRITON_AVAILABLE, + ResampleTables, + build_resample_tables, + resolve_two_pass_launch_config, + triton_preprocess_rfdetr_stretch_two_pass_preallocated, + ) +except ImportError: # pragma: no cover - import-time dependency guard + _TRITON_AVAILABLE = False + ResampleTables = None + build_resample_tables = None + resolve_two_pass_launch_config = None + triton_preprocess_rfdetr_stretch_two_pass_preallocated = None + + +_FAST_PATH_ENABLED = INFERENCE_MODELS_RFDETR_TRITON_PREPROC_ENABLED +_BUFFER_RING_SIZE = 3 + + +@dataclass(frozen=True) +class FastPreprocessResult: + """Result returned after the Triton preprocessing work is enqueued. + + Attributes: + tensor: CUDA fp32 tensor with shape ``(1, 3, target_h, target_w)`` in + network color order and normalized with the model's mean/std. + metadata: Single-item preprocessing metadata list matching the reference + RF-DETR TRT preprocessing contract. + ready_event: Event recorded on the preprocessing stream after the HtoD + copy and both Triton kernels. The TensorRT stream must wait on this + event before consuming ``tensor``. + """ + + tensor: torch.Tensor + metadata: List[PreProcessingMetadata] + ready_event: torch.cuda.Event + + +class FastPreprocessState: + """Reusable buffers for one source shape and one network input shape. + + The state is keyed by ``(src_h, src_w, target_h, target_w)`` because those + dimensions define both the PIL fixed-point resample tables and every buffer + size used by the Triton kernels. + + Attributes: + pinned_host: Pinned CPU HWC uint8 staging tensor. The incoming numpy + image is copied here first so the following host-to-device copy can + be submitted as ``non_blocking=True`` on the preprocessing stream. + src_gpu: CUDA HWC uint8 tensor consumed by the horizontal Triton kernel. + out_buffers: Ring of CUDA fp32 ``(1, 3, target_h, target_w)`` outputs. + The returned tensor can still be owned by TensorRT or response + finalization while Python prepares the next frame, so the ring avoids + overwriting an output that downstream work may still read. + tmp_buffers: Matching ring of CUDA uint8 ``(3, src_h, target_w)`` + horizontal-resize scratch buffers. They are paired with output + buffers so each in-flight preprocessing submission has independent + scratch storage until the vertical kernel finishes. + tables: CUDA int32 PIL-compatible resample tables. ``xmin``/``wx`` are + used by the horizontal pass and ``ymin``/``wy`` by the vertical pass. + launch_config: Tuned block sizes for the two Triton kernels, resolved + once from env vars when the shape-specific state is built. + """ + + __slots__ = ( + "src_h", + "src_w", + "target_h", + "target_w", + "pinned_host", + "src_gpu", + "out_buffers", + "tmp_buffers", + "out_buffer_index", + "tables", + "launch_config", + ) + + def __init__( + self, + src_h: int, + src_w: int, + target_h: int, + target_w: int, + pinned_host: torch.Tensor, + src_gpu: torch.Tensor, + out_buffers: List[torch.Tensor], + tmp_buffers: List[torch.Tensor], + tables: ResampleTables, + launch_config: Tuple[int, int, int, int], + ) -> None: + self.src_h = src_h + self.src_w = src_w + self.target_h = target_h + self.target_w = target_w + self.pinned_host = pinned_host + self.src_gpu = src_gpu + self.out_buffers = out_buffers + self.tmp_buffers = tmp_buffers + self.out_buffer_index = 0 + self.tables = tables + self.launch_config = launch_config + + @classmethod + def build( + cls, + src_h: int, + src_w: int, + target_h: int, + target_w: int, + device: torch.device, + ) -> "FastPreprocessState": + """Allocate shape-specific buffers and build GPU resample tables.""" + pinned_host = torch.empty((src_h, src_w, 3), dtype=torch.uint8, pin_memory=True) + src_gpu = torch.empty((src_h, src_w, 3), dtype=torch.uint8, device=device) + out_buffers = [ + torch.empty((1, 3, target_h, target_w), dtype=torch.float32, device=device) + for _ in range(_BUFFER_RING_SIZE) + ] + tmp_buffers = [ + torch.empty((3, src_h, target_w), dtype=torch.uint8, device=device) + for _ in range(_BUFFER_RING_SIZE) + ] + tables = build_resample_tables( + src_h=src_h, + src_w=src_w, + target_h=target_h, + target_w=target_w, + device=device, + ) + return cls( + src_h=src_h, + src_w=src_w, + target_h=target_h, + target_w=target_w, + pinned_host=pinned_host, + src_gpu=src_gpu, + out_buffers=out_buffers, + tmp_buffers=tmp_buffers, + tables=tables, + launch_config=resolve_two_pass_launch_config(), + ) + + def is_stale(self, src_h: int, src_w: int, target_h: int, target_w: int) -> bool: + """Return true when image/network dimensions no longer match state.""" + return ( + self.src_h != src_h + or self.src_w != src_w + or self.target_h != target_h + or self.target_w != target_w + ) + + def next_buffers(self) -> Tuple[torch.Tensor, torch.Tensor]: + """Return the next output/scratch pair from the ring.""" + idx = self.out_buffer_index + out = self.out_buffers[idx] + tmp = self.tmp_buffers[idx] + self.out_buffer_index = (idx + 1) % len(self.out_buffers) + return out, tmp + + +class FastPreprocessRuntime: + """Eligibility, launch, and CUDA handoff manager for Triton preprocessing. + + ``RFDetrForInstanceSegmentationTRT`` owns one runtime instance for the life + of the model adapter. The runtime keeps all mutable fast-path state here so + the adapter only has to call ``try_preprocess(...)`` and, when a result is + returned, make the TensorRT stream wait on ``result.ready_event``. + + Stream lifetime: + The caller supplies the preprocessing CUDA stream for each launch. This + runtime records the returned event on that stream after the host-to- + device copy and both Triton kernels. The output tensor also records the + stream so PyTorch's caching allocator cannot reuse its storage before + the asynchronous preprocessing work has finished. + """ + + def __init__(self, device: torch.device) -> None: + self._device = device + self._state: Optional[FastPreprocessState] = None + self._warned_reasons: set[str] = set() + + def try_preprocess( + self, + images, + input_color_format: Optional[ColorFormat], + image_size: Optional[Tuple[int, int]], + image_pre_processing: ImagePreProcessing, + network_input: NetworkInputDefinition, + stream: torch.cuda.Stream, + ) -> Optional[FastPreprocessResult]: + """Enqueue Triton preprocessing when the request is supported. + + Args: + images: Single uint8 HWC numpy image, or a single-item list + containing one. Batch sizes greater than one use the reference + preprocessing path. + input_color_format: Caller-provided image color order. ``None`` is + treated as BGR, matching the model adapter default. + image_size: Per-call size override. Overrides are rejected because + the fast path is keyed to the model's configured network input. + image_pre_processing: Model package preprocessing config used for + fast-path eligibility checks. + network_input: Model package network-input config. Its resize mode, + color mode, normalization, and target size define the kernel + contract. + stream: CUDA stream where the HtoD copy and Triton kernels are + enqueued. It must be a real stream when the request is eligible. + + Returns: + ``FastPreprocessResult`` after the GPU work has been scheduled, or + ``None`` when the opt-in flag is disabled or the request falls + outside the conservative fast-path contract. In that case the caller + should run the reference preprocessing path. + """ + if not _FAST_PATH_ENABLED: + return None + unsupported_reason = self._unsupported_reason( + images=images, + image_size=image_size, + image_pre_processing=image_pre_processing, + network_input=network_input, + ) + if unsupported_reason is not None: + self._warn_unsupported(unsupported_reason) + return None + + candidate = images[0] if isinstance(images, list) else images + caller_mode = ( + ColorMode(input_color_format) + if input_color_format is not None + else ColorMode.BGR + ) + swap_rb = caller_mode != network_input.color_mode + + means, stds = network_input.normalization + means_t = (float(means[0]), float(means[1]), float(means[2])) + stds_t = (float(stds[0]), float(stds[1]), float(stds[2])) + target_h = network_input.training_input_size.height + target_w = network_input.training_input_size.width + orig_h, orig_w = int(candidate.shape[0]), int(candidate.shape[1]) + + state = self._state + if state is None or state.is_stale( + src_h=orig_h, + src_w=orig_w, + target_h=target_h, + target_w=target_w, + ): + state = FastPreprocessState.build( + src_h=orig_h, + src_w=orig_w, + target_h=target_h, + target_w=target_w, + device=self._device, + ) + self._state = state + + np.copyto(state.pinned_host.numpy(), candidate, casting="no") + out_buffer, tmp_buffer = state.next_buffers() + + with torch.cuda.stream(stream): + state.src_gpu.copy_(state.pinned_host, non_blocking=True) + triton_preprocess_rfdetr_stretch_two_pass_preallocated( + src=state.src_gpu, + out=out_buffer, + tmp=tmp_buffer, + tables=state.tables, + target_h=target_h, + target_w=target_w, + means=means_t, + stds=stds_t, + swap_rb=swap_rb, + launch_config=state.launch_config, + ) + ready_event = torch.cuda.Event() + ready_event.record(stream) + out_buffer._trt_ready_event = ready_event # type: ignore[attr-defined] + out_buffer.record_stream(stream) + + metadata = [ + PreProcessingMetadata( + pad_left=0, + pad_top=0, + pad_right=0, + pad_bottom=0, + original_size=ImageDimensions(width=orig_w, height=orig_h), + size_after_pre_processing=ImageDimensions( + width=orig_w, + height=orig_h, + ), + inference_size=ImageDimensions(width=target_w, height=target_h), + scale_width=target_w / orig_w, + scale_height=target_h / orig_h, + static_crop_offset=StaticCropOffset( + offset_x=0, + offset_y=0, + crop_width=orig_w, + crop_height=orig_h, + ), + ) + ] + out_buffer._pre_processing_meta = metadata # type: ignore[attr-defined] + return FastPreprocessResult( + tensor=out_buffer, + metadata=metadata, + ready_event=ready_event, + ) + + def _unsupported_reason( + self, + images, + image_size: Optional[Tuple[int, int]], + image_pre_processing: ImagePreProcessing, + network_input: NetworkInputDefinition, + ) -> Optional[str]: + """Explain why the request must use the reference preprocessing path.""" + if not _TRITON_AVAILABLE: + return "triton is not installed" + if self._device.type != "cuda": + return "CUDA device is required" + if image_size is not None: + return "custom image_size overrides are not supported" + + # Overrides can only disable configured transforms; they cannot enable + # transforms. The fast path deliberately rejects model configs that ask + # for transforms whose pixel semantics are not implemented in Triton. + if ( + ( + image_pre_processing.static_crop is not None + and image_pre_processing.static_crop.enabled + ) + or ( + image_pre_processing.contrast is not None + and image_pre_processing.contrast.enabled + ) + or ( + image_pre_processing.grayscale is not None + and image_pre_processing.grayscale.enabled + ) + ): + return "static crop, contrast, and grayscale preprocessing are unsupported" + + if network_input.dataset_version_resize_dimensions is not None: + return "dataset-version resize is unsupported" + if network_input.input_channels != 3: + return "only 3-channel inputs are supported" + if network_input.scaling_factor not in (None, 255): + return "only scaling_factor None or 255 is supported" + if network_input.normalization is None: + return "normalization is required" + if network_input.resize_mode not in ( + ResizeMode.STRETCH_TO, + ResizeMode.LETTERBOX, + ResizeMode.CENTER_CROP, + ResizeMode.LETTERBOX_REFLECT_EDGES, + ): + return f"resize mode {network_input.resize_mode!r} is unsupported" + + if isinstance(images, list): + if len(images) != 1: + return "only batch size 1 is supported" + candidate = images[0] + else: + candidate = images + if not isinstance(candidate, np.ndarray): + return "only numpy ndarray inputs are supported" + if ( + candidate.dtype != np.uint8 + or candidate.ndim != 3 + or candidate.shape[2] != 3 + ): + return "input must be uint8 HWC with 3 channels" + return None + + def _warn_unsupported(self, reason: str) -> None: + if reason in self._warned_reasons: + return + self._warned_reasons.add(reason) + warnings.warn( + f"RF-DETR Triton preprocess path is unsupported: {reason}", + RuntimeWarning, + stacklevel=4, + ) diff --git a/inference_models/tests/integration_tests/models/test_rfdetr_seg_predictions_trt.py b/inference_models/tests/integration_tests/models/test_rfdetr_seg_predictions_trt.py index 269db95f72..18c7b1f063 100644 --- a/inference_models/tests/integration_tests/models/test_rfdetr_seg_predictions_trt.py +++ b/inference_models/tests/integration_tests/models/test_rfdetr_seg_predictions_trt.py @@ -1,10 +1,44 @@ +import os + import numpy as np import pytest import torch +from inference_models.errors import CorruptedModelPackageError from inference_models.models.common.rle_utils import coco_rle_masks_to_torch_mask +def _assert_instance_segmentation_predictions_match(actual, expected) -> None: + assert len(actual) == len(expected) + for actual_element, expected_element in zip(actual, expected): + torch.testing.assert_close( + actual_element.xyxy.cpu(), + expected_element.xyxy.cpu(), + atol=1.0, + rtol=0, + ) + torch.testing.assert_close( + actual_element.confidence.cpu(), + expected_element.confidence.cpu(), + atol=1e-4, + rtol=0, + ) + torch.testing.assert_close( + actual_element.class_id.cpu(), + expected_element.class_id.cpu(), + atol=0, + rtol=0, + ) + + actual_mask = actual_element.mask.detach().to(torch.bool).cpu() + expected_mask = expected_element.mask.detach().to(torch.bool).cpu() + assert tuple(actual_mask.shape) == tuple(expected_mask.shape) + intersection = torch.logical_and(actual_mask, expected_mask).sum().item() + union = torch.logical_or(actual_mask, expected_mask).sum().item() + assert union > 0 + assert intersection / union >= 0.999 + + @pytest.mark.slow @pytest.mark.trt_extras def test_trt_package_numpy( @@ -430,6 +464,63 @@ def test_trt_package_torch_batch( assert 16179 <= predictions[1].mask.cpu().sum().item() <= 16229 +@pytest.mark.slow +@pytest.mark.trt_extras +def test_trt_triton_preprocess_output_matches_reference_preprocess( + monkeypatch: pytest.MonkeyPatch, + rfdetr_seg_asl_trt_package: str, + asl_image_numpy: np.ndarray, +) -> None: + pytest.importorskip("triton") + if not torch.cuda.is_available(): + pytest.skip("CUDA is required for Triton preprocessing parity") + + from inference_models.models.rfdetr import triton_preprocess_runtime + from inference_models.models.rfdetr.rfdetr_instance_segmentation_trt import ( + RFDetrForInstanceSegmentationTRT, + ) + + model_package = os.getenv( + "RFDETR_SEG_TRT_PACKAGE_PATH", + rfdetr_seg_asl_trt_package, + ) + try: + model = RFDetrForInstanceSegmentationTRT.from_pretrained( + model_name_or_path=model_package, + engine_host_code_allowed=True, + ) + except CorruptedModelPackageError as error: + if "Platform specific tag mismatch" in str(error): + pytest.skip("TRT engine package is not compatible with this platform") + raise + + monkeypatch.setattr(triton_preprocess_runtime, "_FAST_PATH_ENABLED", False) + reference_predictions = model(asl_image_numpy) + + original_triton_preprocess = ( + triton_preprocess_runtime.triton_preprocess_rfdetr_stretch_two_pass_preallocated + ) + triton_calls = {"count": 0} + + def counting_triton_preprocess(*args, **kwargs): + triton_calls["count"] += 1 + return original_triton_preprocess(*args, **kwargs) + + monkeypatch.setattr( + triton_preprocess_runtime, + "triton_preprocess_rfdetr_stretch_two_pass_preallocated", + counting_triton_preprocess, + ) + monkeypatch.setattr(triton_preprocess_runtime, "_FAST_PATH_ENABLED", True) + triton_predictions = model(asl_image_numpy) + + assert triton_calls["count"] == 1 + _assert_instance_segmentation_predictions_match( + actual=triton_predictions, + expected=reference_predictions, + ) + + @pytest.mark.slow @pytest.mark.trt_extras def test_trt_cudagraph_output_matches_non_cudagraph_output( diff --git a/inference_models/tests/unit_tests/models/rfdetr/test_triton_preprocess.py b/inference_models/tests/unit_tests/models/rfdetr/test_triton_preprocess.py new file mode 100644 index 0000000000..f9064e1118 --- /dev/null +++ b/inference_models/tests/unit_tests/models/rfdetr/test_triton_preprocess.py @@ -0,0 +1,164 @@ +import numpy as np +import pytest +import torch +import torchvision.transforms.functional as TF +from PIL import Image + +from inference_models.errors import ModelInputError, ModelRuntimeError +from inference_models.models.rfdetr import triton_preprocess +from inference_models.models.rfdetr.triton_preprocess import ( + build_resample_tables, + resolve_two_pass_launch_config, + triton_preprocess_rfdetr_stretch, +) + +_IMAGENET_MEAN = (0.485, 0.456, 0.406) +_IMAGENET_STD = (0.229, 0.224, 0.225) +_PREPROC_ENV_VARS = ( + "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_BLOCK_H", + "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_BLOCK_W", + "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_HORIZONTAL_BLOCK_H", + "INFERENCE_MODELS_RFDETR_TRITON_PREPROC_HORIZONTAL_BLOCK_W", +) + + +def _reference_preprocess(image_rgb: np.ndarray, target_h: int, target_w: int): + resized = TF.resize( + Image.fromarray(image_rgb), + (target_h, target_w), + antialias=True, + ) + tensor = TF.to_tensor(resized) + tensor = TF.normalize( + tensor, + mean=list(_IMAGENET_MEAN), + std=list(_IMAGENET_STD), + ) + return tensor.unsqueeze(0) + + +def test_build_resample_tables_shapes_on_cpu() -> None: + tables = build_resample_tables( + src_h=11, + src_w=13, + target_h=7, + target_w=5, + device=torch.device("cpu"), + ) + + assert tuple(tables.ymin_gpu.shape) == (7,) + assert tuple(tables.xmin_gpu.shape) == (5,) + assert tuple(tables.wy_gpu.shape) == (7 * tables.ksize_y,) + assert tuple(tables.wx_gpu.shape) == (5 * tables.ksize_x,) + assert tables.ymin_gpu.dtype == torch.int32 + assert tables.wx_gpu.dtype == torch.int32 + + +def test_resolve_launch_config_rejects_non_power_of_two( + monkeypatch: pytest.MonkeyPatch, +) -> None: + for env_var in _PREPROC_ENV_VARS: + monkeypatch.delenv(env_var, raising=False) + monkeypatch.setenv("INFERENCE_MODELS_RFDETR_TRITON_PREPROC_BLOCK_W", "96") + + with pytest.raises(ModelRuntimeError, match="positive power of two"): + resolve_two_pass_launch_config() + + +@pytest.mark.skipif( + not triton_preprocess.TRITON_AVAILABLE, + reason="Triton is required for runtime validation", +) +def test_triton_preprocess_rejects_cpu_source_tensor() -> None: + source = torch.zeros((8, 8, 3), dtype=torch.uint8) + tables = build_resample_tables( + src_h=8, + src_w=8, + target_h=8, + target_w=8, + device=torch.device("cpu"), + ) + + with pytest.raises(ModelInputError, match="expected CUDA src tensor"): + triton_preprocess_rfdetr_stretch( + src=source, + tables=tables, + target_h=8, + target_w=8, + ) + + +@pytest.mark.skipif( + not torch.cuda.is_available() or not triton_preprocess.TRITON_AVAILABLE, + reason="CUDA and Triton are required", +) +def test_triton_preprocess_matches_pil_for_rgb_numpy() -> None: + rng = np.random.default_rng(seed=17) + image_rgb = rng.integers(0, 256, size=(77, 51, 3), dtype=np.uint8) + target_h, target_w = 48, 64 + device = torch.device("cuda") + tables = build_resample_tables( + src_h=image_rgb.shape[0], + src_w=image_rgb.shape[1], + target_h=target_h, + target_w=target_w, + device=device, + ) + + actual = triton_preprocess_rfdetr_stretch( + src=torch.from_numpy(image_rgb).to(device=device), + tables=tables, + target_h=target_h, + target_w=target_w, + means=_IMAGENET_MEAN, + stds=_IMAGENET_STD, + swap_rb=False, + ) + torch.cuda.synchronize() + + expected = _reference_preprocess(image_rgb, target_h=target_h, target_w=target_w) + torch.testing.assert_close(actual.cpu(), expected, atol=1e-6, rtol=0) + + +@pytest.mark.skipif( + not torch.cuda.is_available() or not triton_preprocess.TRITON_AVAILABLE, + reason="CUDA and Triton are required", +) +def test_triton_preprocess_matches_pil_for_bgr_numpy_with_preallocated_buffers() -> ( + None +): + rng = np.random.default_rng(seed=23) + image_rgb = rng.integers(0, 256, size=(63, 85, 3), dtype=np.uint8) + image_bgr = image_rgb[:, :, ::-1].copy() + target_h, target_w = 64, 64 + device = torch.device("cuda") + tables = build_resample_tables( + src_h=image_bgr.shape[0], + src_w=image_bgr.shape[1], + target_h=target_h, + target_w=target_w, + device=device, + ) + out = torch.empty((1, 3, target_h, target_w), dtype=torch.float32, device=device) + tmp = torch.empty( + (3, image_bgr.shape[0], target_w), + dtype=torch.uint8, + device=device, + ) + + actual = triton_preprocess_rfdetr_stretch( + src=torch.from_numpy(image_bgr).to(device=device), + tables=tables, + target_h=target_h, + target_w=target_w, + means=_IMAGENET_MEAN, + stds=_IMAGENET_STD, + swap_rb=True, + out=out, + tmp=tmp, + ) + torch.cuda.synchronize() + + expected = _reference_preprocess(image_rgb, target_h=target_h, target_w=target_w) + assert actual.data_ptr() == out.data_ptr() + torch.testing.assert_close(actual.cpu(), expected, atol=1e-6, rtol=0) diff --git a/inference_models/tests/unit_tests/models/rfdetr/test_trt_preprocess_fast_path.py b/inference_models/tests/unit_tests/models/rfdetr/test_trt_preprocess_fast_path.py new file mode 100644 index 0000000000..072889c0b2 --- /dev/null +++ b/inference_models/tests/unit_tests/models/rfdetr/test_trt_preprocess_fast_path.py @@ -0,0 +1,329 @@ +import warnings + +import numpy as np +import pytest +import torch +import torchvision.transforms.functional as TF +from PIL import Image + +from inference_models.entities import ImageDimensions +from inference_models.models.common.roboflow.model_packages import ( + ColorMode, + Contrast, + ContrastType, + Grayscale, + ImagePreProcessing, + NetworkInputDefinition, + ResizeMode, + StaticCrop, + TrainingInputSize, +) +from inference_models.models.rfdetr import ( + triton_preprocess, + triton_preprocess_runtime, +) +from inference_models.models.rfdetr.triton_preprocess_runtime import ( + FastPreprocessRuntime, +) + +_IMAGENET_MEAN = (0.485, 0.456, 0.406) +_IMAGENET_STD = (0.229, 0.224, 0.225) +_DEFAULT_NORMALIZATION = object() + + +def _network_input( + target_h: int = 64, + target_w: int = 64, + dataset_version_resize_dimensions=None, + resize_mode: ResizeMode = ResizeMode.STRETCH_TO, + input_channels: int = 3, + scaling_factor=255, + normalization=_DEFAULT_NORMALIZATION, +) -> NetworkInputDefinition: + if normalization is _DEFAULT_NORMALIZATION: + normalization = [list(_IMAGENET_MEAN), list(_IMAGENET_STD)] + return NetworkInputDefinition( + training_input_size=TrainingInputSize(height=target_h, width=target_w), + dataset_version_resize_dimensions=dataset_version_resize_dimensions, + dynamic_spatial_size_supported=False, + color_mode=ColorMode.RGB, + resize_mode=resize_mode, + input_channels=input_channels, + scaling_factor=scaling_factor, + normalization=normalization, + ) + + +def _call_fast_preprocess( + runtime: FastPreprocessRuntime, + *, + images=None, + image_size=None, + image_pre_processing=None, + network_input=None, +): + if images is None: + images = np.zeros((8, 8, 3), dtype=np.uint8) + if image_pre_processing is None: + image_pre_processing = ImagePreProcessing() + if network_input is None: + network_input = _network_input() + return runtime.try_preprocess( + images=images, + input_color_format="bgr", + image_size=image_size, + image_pre_processing=image_pre_processing, + network_input=network_input, + stream=None, + ) + + +def _reference_preprocess(image_rgb: np.ndarray, target_h: int, target_w: int): + resized = TF.resize( + Image.fromarray(image_rgb), + (target_h, target_w), + antialias=True, + ) + tensor = TF.to_tensor(resized) + tensor = TF.normalize( + tensor, + mean=list(_IMAGENET_MEAN), + std=list(_IMAGENET_STD), + ) + return tensor.unsqueeze(0) + + +def test_trt_fast_preprocess_warns_once_for_unsupported_batch( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(triton_preprocess_runtime, "_FAST_PATH_ENABLED", True) + monkeypatch.setattr(triton_preprocess_runtime, "_TRITON_AVAILABLE", True) + runtime = FastPreprocessRuntime(device=torch.device("cuda")) + image = np.zeros((8, 8, 3), dtype=np.uint8) + + with pytest.warns(RuntimeWarning, match="only batch size 1 is supported"): + assert ( + runtime.try_preprocess( + images=[image, image], + input_color_format="bgr", + image_size=None, + image_pre_processing=ImagePreProcessing(), + network_input=_network_input(), + stream=None, + ) + is None + ) + + with warnings.catch_warnings(record=True) as recorded: + warnings.simplefilter("always") + assert ( + runtime.try_preprocess( + images=[image, image], + input_color_format="bgr", + image_size=None, + image_pre_processing=ImagePreProcessing(), + network_input=_network_input(), + stream=None, + ) + is None + ) + assert recorded == [] + + +def test_trt_fast_preprocess_flag_disabled_returns_none_without_warning( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(triton_preprocess_runtime, "_FAST_PATH_ENABLED", False) + monkeypatch.setattr(triton_preprocess_runtime, "_TRITON_AVAILABLE", True) + runtime = FastPreprocessRuntime(device=torch.device("cuda")) + + with warnings.catch_warnings(record=True) as recorded: + warnings.simplefilter("always") + assert _call_fast_preprocess(runtime) is None + assert recorded == [] + + +@pytest.mark.parametrize( + ("runtime_device", "kwargs", "reason"), + [ + ( + torch.device("cuda"), + {}, + "triton is not installed", + ), + ], +) +def test_trt_fast_preprocess_warns_for_unavailable_runtime( + monkeypatch: pytest.MonkeyPatch, + runtime_device: torch.device, + kwargs, + reason: str, +) -> None: + monkeypatch.setattr(triton_preprocess_runtime, "_FAST_PATH_ENABLED", True) + monkeypatch.setattr(triton_preprocess_runtime, "_TRITON_AVAILABLE", False) + runtime = FastPreprocessRuntime(device=runtime_device) + + with pytest.warns(RuntimeWarning, match=reason): + assert _call_fast_preprocess(runtime, **kwargs) is None + + +@pytest.mark.parametrize( + ("runtime_device", "kwargs", "reason"), + [ + ( + torch.device("cpu"), + {}, + "CUDA device is required", + ), + ( + torch.device("cuda"), + {"image_size": (32, 32)}, + "custom image_size overrides are not supported", + ), + ( + torch.device("cuda"), + { + "image_pre_processing": ImagePreProcessing.model_validate( + { + "static-crop": StaticCrop( + enabled=True, + x_min=0, + x_max=8, + y_min=0, + y_max=8, + ) + } + ) + }, + "static crop, contrast, and grayscale preprocessing are unsupported", + ), + ( + torch.device("cuda"), + { + "image_pre_processing": ImagePreProcessing( + contrast=Contrast( + enabled=True, + type=ContrastType.CONTRAST_STRETCHING, + ) + ) + }, + "static crop, contrast, and grayscale preprocessing are unsupported", + ), + ( + torch.device("cuda"), + { + "image_pre_processing": ImagePreProcessing( + grayscale=Grayscale(enabled=True) + ) + }, + "static crop, contrast, and grayscale preprocessing are unsupported", + ), + ( + torch.device("cuda"), + { + "network_input": _network_input( + dataset_version_resize_dimensions=TrainingInputSize( + height=8, + width=8, + ) + ) + }, + "dataset-version resize is unsupported", + ), + ( + torch.device("cuda"), + {"network_input": _network_input(input_channels=1)}, + "only 3-channel inputs are supported", + ), + ( + torch.device("cuda"), + {"network_input": _network_input(scaling_factor=1)}, + "only scaling_factor None or 255 is supported", + ), + ( + torch.device("cuda"), + {"network_input": _network_input(normalization=None)}, + "normalization is required", + ), + ( + torch.device("cuda"), + {"network_input": _network_input(resize_mode=ResizeMode.FIT_LONGER_EDGE)}, + "resize mode", + ), + ( + torch.device("cuda"), + {"images": torch.zeros((8, 8, 3), dtype=torch.uint8)}, + "only numpy ndarray inputs are supported", + ), + ( + torch.device("cuda"), + {"images": np.zeros((8, 8, 3), dtype=np.float32)}, + "input must be uint8 HWC with 3 channels", + ), + ( + torch.device("cuda"), + {"images": np.zeros((8, 8), dtype=np.uint8)}, + "input must be uint8 HWC with 3 channels", + ), + ( + torch.device("cuda"), + {"images": np.zeros((8, 8, 1), dtype=np.uint8)}, + "input must be uint8 HWC with 3 channels", + ), + ], +) +def test_trt_fast_preprocess_warns_for_unsupported_requests( + monkeypatch: pytest.MonkeyPatch, + runtime_device: torch.device, + kwargs, + reason: str, +) -> None: + monkeypatch.setattr(triton_preprocess_runtime, "_FAST_PATH_ENABLED", True) + monkeypatch.setattr(triton_preprocess_runtime, "_TRITON_AVAILABLE", True) + runtime = FastPreprocessRuntime(device=runtime_device) + + with pytest.warns(RuntimeWarning, match=reason): + assert _call_fast_preprocess(runtime, **kwargs) is None + + +@pytest.mark.skipif( + not torch.cuda.is_available() or not triton_preprocess.TRITON_AVAILABLE, + reason="CUDA and Triton are required", +) +def test_trt_fast_preprocess_matches_reference_and_metadata( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(triton_preprocess_runtime, "_FAST_PATH_ENABLED", True) + monkeypatch.setattr(triton_preprocess_runtime, "_TRITON_AVAILABLE", True) + target_h, target_w = 64, 64 + runtime = FastPreprocessRuntime(device=torch.device("cuda")) + stream = torch.cuda.Stream(device=torch.device("cuda")) + rng = np.random.default_rng(seed=71) + image_rgb = rng.integers(0, 256, size=(96, 80, 3), dtype=np.uint8) + image_bgr = image_rgb[:, :, ::-1].copy() + + result = runtime.try_preprocess( + images=image_bgr, + input_color_format="bgr", + image_size=None, + image_pre_processing=ImagePreProcessing(), + network_input=_network_input(target_h=target_h, target_w=target_w), + stream=stream, + ) + assert result is not None + result.ready_event.synchronize() + + expected = _reference_preprocess(image_rgb, target_h=target_h, target_w=target_w) + torch.testing.assert_close(result.tensor.cpu(), expected, atol=1e-6, rtol=0) + + metadata = result.metadata + assert metadata[0].original_size == ImageDimensions(height=96, width=80) + assert metadata[0].size_after_pre_processing == ImageDimensions( + height=96, + width=80, + ) + assert metadata[0].inference_size == ImageDimensions( + height=target_h, + width=target_w, + ) + assert result.tensor._pre_processing_meta == metadata # type: ignore[attr-defined]