From 03c9c9e7fdb04cfcfcb1db036d6b4c8ab87f91ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Thu, 18 Jul 2024 17:33:02 -0300 Subject: [PATCH] fixup! tests: Add udp h264 latency and jitter test --- tests/udp_h264_test.rs | 515 ------------------------ tests/v4l2_latency_and_jitter.rs | 652 +++++++++++++++++++++++++++++++ 2 files changed, 652 insertions(+), 515 deletions(-) delete mode 100644 tests/udp_h264_test.rs create mode 100644 tests/v4l2_latency_and_jitter.rs diff --git a/tests/udp_h264_test.rs b/tests/udp_h264_test.rs deleted file mode 100644 index 607d9a7c..00000000 --- a/tests/udp_h264_test.rs +++ /dev/null @@ -1,515 +0,0 @@ -use tracing::*; - -use std::{str::FromStr, sync::Arc}; - -use tokio::sync::RwLock; - -use anyhow::*; -use gst::prelude::*; -use url::Url; - -use mavlink_camera_manager::{ - settings, - stream::{ - self, - types::{ - CaptureConfiguration, ExtendedConfiguration, StreamInformation, - VideoCaptureConfiguration, - }, - Stream, - }, - video::{ - types::{FrameInterval, VideoEncodeType, VideoSourceType}, - video_source::VideoSourceAvailable, - video_source_local::{VideoSourceLocal, VideoSourceLocalType}, - }, - video_stream::types::VideoAndStreamInformation, -}; - -fn get_loopback_device() -> Result { - let cameras_available = VideoSourceLocal::cameras_available(); - dbg!(&cameras_available); - - let loopback_device = cameras_available - .iter() - .find(|&video_source| { - dbg!(video_source.inner().source_string()); - if let VideoSourceType::Local(video_source_local) = video_source { - if let VideoSourceLocalType::V4L2Loopback(_) = video_source_local.typ { - return true; - } - } - - false - }) - .unwrap() - .to_owned(); - dbg!(&loopback_device); - - Ok(loopback_device) -} - -struct V4l2LoopBack { - bus_task: tokio::task::JoinHandle<()>, - pipeline: gst::Pipeline, -} - -impl V4l2LoopBack { - pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { - let device_path = video_and_stream_information - .video_source - .inner() - .source_string(); - - let CaptureConfiguration::Video(configuration) = &video_and_stream_information - .stream_information - .configuration - else { - unreachable!(); - }; - - let pipeline_description = match configuration.encode { - VideoEncodeType::H264 => format!( - concat!( - "qrtimestampsrc do-timestamp=true", - " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! videoconvert", - " ! x264enc tune=zerolatency speed-preset=ultrafast bitrate=5000", - " ! h264parse", - " ! video/x-h264,profile=constrained-baseline,stream-format=byte-stream,alignment=au", - " ! v4l2sink device={device_path} sync=false", - ), - height = configuration.height, - width = configuration.width, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - device_path = device_path, - ), - VideoEncodeType::H265 => format!( - concat!( - "qrtimestampsrc do-timestamp=true", - " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! videoconvert", - " ! x265enc tune=zerolatency speed-preset=ultrafast bitrate=5000", - " ! h265parse", - " ! video/x-h265,profile=constrained-baseline,stream-format=byte-stream,alignment=au", - " ! v4l2sink device={device_path} sync=false", - ), - height = configuration.height, - width = configuration.width, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - device_path = device_path, - ), - VideoEncodeType::Mjpg => format!( - concat!( - "qrtimestampsrc do-timestamp=true", - " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! videoconvert", - " ! jpegenc quality=85, idct-method=ifast", - " ! jpegparse", - " ! video/image/jpeg", - " ! v4l2sink device={device_path} sync=false", - ), - height = configuration.height, - width = configuration.width, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - device_path = device_path, - ), - VideoEncodeType::Rgb => format!( - concat!( - "qrtimestampsrc do-timestamp=true", - " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! videoconvert", - " ! rawvideoparse", - " ! video/x-raw,format=RGB", - " ! v4l2sink device={device_path} sync=false", - ), - height = configuration.height, - width = configuration.width, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - device_path = device_path, - ), - VideoEncodeType::Yuyv => format!( - concat!( - "qrtimestampsrc do-timestamp=true", - " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! videoconvert", - " ! rawvideoparse", - " ! video/x-raw,format=UYVY", - " ! v4l2sink device={device_path} sync=false", - ), - height = configuration.height, - width = configuration.width, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - device_path = device_path, - ), - _ => unimplemented!(), - }; - dbg!(&pipeline_description); - - let pipeline = gst::parse::launch(&pipeline_description)? - .downcast::() - .unwrap(); - - let bus_task = new_bus_task(pipeline.downgrade()); - - Ok(Self { bus_task, pipeline }) - } - - pub async fn finish(self) -> Result<()> { - println!("Posting EOS to v4l2pipeline..."); - self.pipeline.post_message(gst::message::Eos::new())?; - - println!("Waiting for v4l2pipeline to receive EOS..."); - self.bus_task.await?; - - println!("Setting v4l2pipeline to Null..."); - self.pipeline.set_state(gst::State::Null)?; - while self.pipeline.current_state() != gst::State::Null { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - - Ok(()) - } -} - -struct QrTimeStampSink { - pipeline: gst::Pipeline, - bus_task: tokio::task::JoinHandle<()>, - latencies: Arc>>, -} - -impl QrTimeStampSink { - pub async fn try_new( - video_and_stream_information: &VideoAndStreamInformation, - buffers: usize, - ) -> Result { - let CaptureConfiguration::Video(configuration) = &video_and_stream_information - .stream_information - .configuration - else { - unreachable!(); - }; - - let endpoint = &video_and_stream_information.stream_information.endpoints[0]; - let address = endpoint.host_str().unwrap(); - let port = endpoint.port().unwrap(); - - let pipeline_description = match configuration.encode { - VideoEncodeType::H264 => format!( - concat!( - "udpsrc do-timestamp=false address={address} port={port}", - " ! application/x-rtp,payload=96", - " ! rtph264depay", - " ! h264parse", - " ! video/x-h264,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! avdec_h264", - " ! videoconvert", - " ! qrtimestampsink name=qrsink", - ), - address = address, - port = port, - width = configuration.width, - height = configuration.height, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - ), - VideoEncodeType::H265 => format!( - concat!( - "udpsrc do-timestamp=false address={address} port={port}", - " ! application/x-rtp,payload=96", - " ! rtph265depay", - " ! h265parse", - " ! video/x-h265,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! avdec_h265", - " ! videoconvert", - " ! qrtimestampsink name=qrsink", - ), - address = address, - port = port, - width = configuration.width, - height = configuration.height, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - ), - VideoEncodeType::Mjpg => format!( - concat!( - "udpsrc do-timestamp=false address={address} port={port}", - " ! application/x-rtp,payload=96", - " ! rtpjpegdepay", - " ! jpegparse", - " ! image/jpeg,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! jpegdec", - " ! videoconvert", - " ! qrtimestampsink name=qrsink", - ), - address = address, - port = port, - width = configuration.width, - height = configuration.height, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - ), - VideoEncodeType::Rgb | VideoEncodeType::Yuyv => format!( - concat!( - "udpsrc do-timestamp=false address={address} port={port}", - " ! application/x-rtp,payload=96", - " ! rtpvrawdepay", - " ! rawvideoparse", - " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den}", - " ! videoconvert", - " ! qrtimestampsink name=qrsink", - ), - address = address, - port = port, - width = configuration.width, - height = configuration.height, - framerate_num = configuration.frame_interval.denominator, - framerate_den = configuration.frame_interval.numerator, - ), - _ => unimplemented!(), - }; - - dbg!(&pipeline_description); - - let pipeline = gst::parse::launch(&pipeline_description) - .unwrap() - .downcast::() - .unwrap(); - - let qrtimestampsink = pipeline.by_name("qrsink").unwrap(); - - let latencies = Arc::new(RwLock::new(Vec::with_capacity(buffers))); - - let latencies_cloned = latencies.clone(); - let pipeline_weak = pipeline.downgrade(); - qrtimestampsink.connect("on-render", false, move |values| { - let _element = values[0].get::().expect("Invalid argument"); - let _info = values[1] - .get::() - .expect("Invalid argument"); - let diff: i64 = values[2].get::().expect("Invalid argument"); - - let pipeline = pipeline_weak.upgrade()?; - - let mut latencies_cloned = latencies_cloned.blocking_write(); - latencies_cloned.push(diff); - if latencies_cloned.len() == buffers { - eprintln!("All buffers received, posting EOS to qrtimestampsink pipeline..."); - pipeline.post_message(gst::message::Eos::new()).unwrap(); - } - - None - }); - - let bus_task = new_bus_task(pipeline.downgrade()); - - Ok(Self { - pipeline, - bus_task, - latencies, - }) - } - - pub async fn get_latencies(self) -> Result> { - self.bus_task.await?; - - // Cleanup - println!("Setting qrtimestamp to Null..."); - self.pipeline.set_state(gst::State::Null).unwrap(); - while self.pipeline.current_state() != gst::State::Null { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - - let latencies = self - .latencies - .read() - .await - .iter() - .cloned() - .map(|i| i as f64) - .collect::>(); - - Ok(latencies) - } -} - -pub fn start_pipeline(pipeline: &gst::Pipeline, wait: bool) -> Result<()> { - pipeline.set_state(gst::State::Playing)?; - - if wait { - while pipeline.current_state() != gst::State::Playing { - std::thread::sleep(std::time::Duration::from_millis(1)); - } - } - - Ok(()) -} - -fn check_latency_and_jitter(latencies: &[f64], expected_latency: f64, expected_max_jitter: f64) { - // Notes: - // 1. We are skipping the first frame as it will always have a high value, possibly from the negotiation - let latencies = &latencies[1..]; - dbg!(&latencies); - - let jitters = &latencies - .windows(2) - .map(|a| a[1] - a[0]) - .collect::>()[..]; - dbg!(&jitters); - - // Asserts - use statrs::statistics::Statistics; - let latency_min = latencies.min(); - let latency_max = latencies.max(); - let latency_mean = latencies.mean(); - let latency_std_dev = latencies.std_dev(); - let latency_variance = latencies.variance(); - let jitter_min = jitters.min(); - let jitter_max = jitters.max(); - let jitter_mean = jitters.mean(); - let jitter_std_dev = jitters.std_dev(); - let jitter_variance = jitters.variance(); - - dbg!( - &latency_min, - &latency_max, - &latency_mean, - &latency_std_dev, - &latency_variance - ); - dbg!( - &jitter_min, - &jitter_max, - &jitter_mean, - &jitter_std_dev, - &jitter_variance - ); - - dbg!(&expected_latency, &expected_max_jitter); - - assert!(latency_mean >= expected_latency - expected_max_jitter); - assert!(latency_max <= expected_latency + expected_max_jitter); - assert!(jitter_mean <= expected_max_jitter); - assert!(jitter_max <= expected_max_jitter); -} - -fn new_bus_task(pipeline_weak: gst::glib::WeakRef) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - // FIXME: THIS TASK SHOULD BE CANCELABLE AND NONBLOCKING - - let Some(pipeline) = pipeline_weak.upgrade() else { - return; - }; - - let Some(bus) = pipeline.bus() else { - return; - }; - - for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView; - - match msg.view() { - MessageView::Eos(..) => { - println!("EOS Recived"); - break; - } - MessageView::Error(err) => { - error!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - break; - } - MessageView::Latency(_latency) => { - pipeline.recalculate_latency().unwrap(); - } - _ => (), - } - } - - println!("FINISHIG BUS TASK"); - }) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 10)] -async fn main() { - settings::manager::init(None); - stream::manager::init(); - - gst::init().unwrap(); - - let fps = 30; - let width = 320; - let height = width; - let address = "127.0.0.1"; - let port = 5600; - let buffers = 100; - - let video_and_stream_information = VideoAndStreamInformation { - name: "QRTimeStamp - QR".to_string(), - stream_information: StreamInformation { - endpoints: vec![Url::from_str(format!("udp://{address}:{port}").as_str()).unwrap()], - configuration: CaptureConfiguration::Video(VideoCaptureConfiguration { - encode: VideoEncodeType::H264, - height, - width, - frame_interval: FrameInterval { - numerator: 1, - denominator: fps, - }, - }), - extended_configuration: Some(ExtendedConfiguration { - thermal: false, - disable_mavlink: true, - }), - }, - video_source: get_loopback_device().unwrap(), - }; - dbg!(&video_and_stream_information); - - eprintln!("Building v4lloopback pipeline (video generation with qrtimestampsrc)..."); - let loopback = V4l2LoopBack::try_new(&video_and_stream_information).unwrap(); - start_pipeline(&loopback.pipeline, false).unwrap(); - - eprintln!("Building qrtimestamp pipeline (video receiver with qrtimestampsink)..."); - let qrtimestamp = QrTimeStampSink::try_new(&video_and_stream_information, buffers) - .await - .unwrap(); - start_pipeline(&qrtimestamp.pipeline, false).unwrap(); - - eprintln!("Building MCM stream..."); - let stream = Stream::try_new(&video_and_stream_information) - .await - .unwrap(); - let stream_id = stream.id().await.unwrap(); - stream::manager::Manager::add_stream(stream).await.unwrap(); - - eprintln!("Waiting for qrtimestamp pipeline to finish..."); - let latencies_result = qrtimestamp.get_latencies().await; - - eprintln!("Finishing loopback pipeline..."); - let loopback_result = loopback.finish().await; - - eprintln!("Finishing MCM stream..."); - stream::manager::Manager::remove_stream(&stream_id) - .await - .unwrap(); - - eprintln!("Evaluating results..."); - loopback_result.unwrap(); - let latencies = latencies_result.unwrap(); - assert!(!latencies.is_empty()); - - let expected_latency = 40.0; - let expected_max_jitter = 1.0; - check_latency_and_jitter(&latencies, expected_latency, expected_max_jitter); - - println!("END"); -} diff --git a/tests/v4l2_latency_and_jitter.rs b/tests/v4l2_latency_and_jitter.rs new file mode 100644 index 00000000..5dbed204 --- /dev/null +++ b/tests/v4l2_latency_and_jitter.rs @@ -0,0 +1,652 @@ +use tracing::*; + +use std::{str::FromStr, sync::Arc}; + +use tokio::sync::RwLock; + +use anyhow::Result; +use gst::prelude::*; +use url::Url; + +use mavlink_camera_manager::{ + logger, settings, + stream::{ + self, + types::{ + CaptureConfiguration, ExtendedConfiguration, StreamInformation, + VideoCaptureConfiguration, + }, + Stream, + }, + video::{ + types::{FrameInterval, VideoEncodeType, VideoSourceType}, + video_source::VideoSourceAvailable, + video_source_local::{VideoSourceLocal, VideoSourceLocalType}, + }, + video_stream::types::VideoAndStreamInformation, +}; + +fn get_loopback_device() -> Result { + let cameras_available = VideoSourceLocal::cameras_available(); + + let loopback_device = cameras_available + .iter() + .find(|&video_source| { + if let VideoSourceType::Local(video_source_local) = video_source { + if let VideoSourceLocalType::V4L2Loopback(_) = video_source_local.typ { + return true; + } + } + + false + }) + .expect("No v4l2loopback device found.") + .to_owned(); + + Ok(loopback_device) +} + +struct V4l2LoopBack { + bus_task: tokio::task::JoinHandle<()>, + pipeline: gst::Pipeline, +} + +impl V4l2LoopBack { + pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result { + let device_path = video_and_stream_information + .video_source + .inner() + .source_string(); + + let CaptureConfiguration::Video(configuration) = &video_and_stream_information + .stream_information + .configuration + else { + unreachable!(); + }; + + let pipeline_description = match configuration.encode { + VideoEncodeType::H264 => format!( + concat!( + "qrtimestampsrc do-timestamp=true", + " ! videoconvert", + " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den},format=I420", + " ! x264enc tune=zerolatency speed-preset=ultrafast bitrate=5000", + " ! h264parse", + " ! video/x-h264,profile=constrained-baseline,stream-format=byte-stream,alignment=au", + " ! v4l2sink device={device_path} sync=false", + ), + height = configuration.height, + width = configuration.width, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + device_path = device_path, + ), + VideoEncodeType::Mjpg => format!( + concat!( + "qrtimestampsrc do-timestamp=true", + " ! videoconvert", + " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den},format=I420", + " ! jpegenc quality=85 idct-method=ifast", + " ! jpegparse", + " ! v4l2sink device={device_path} sync=false", + ), + height = configuration.height, + width = configuration.width, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + device_path = device_path, + ), + VideoEncodeType::Yuyv => format!( + concat!( + "qrtimestampsrc do-timestamp=true", + " ! videoconvert", + " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den},format=YUY2", + " ! rawvideoparse use-sink-caps=true", + " ! v4l2sink device={device_path} sync=false", + ), + height = configuration.height, + width = configuration.width, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + device_path = device_path, + ), + _ => unimplemented!(), + }; + + dbg!(&pipeline_description); + + let pipeline = gst::parse::launch(&pipeline_description)? + .downcast::() + .unwrap(); + + let bus_task = new_bus_task(pipeline.downgrade()); + + Ok(Self { bus_task, pipeline }) + } + + pub async fn finish(self) -> Result<()> { + debug!("Posting EOS to v4l2pipeline..."); + self.pipeline.post_message(gst::message::Eos::new())?; + + debug!("Waiting for v4l2pipeline to receive EOS..."); + self.bus_task.await?; + + debug!("Setting v4l2pipeline to Null..."); + self.pipeline.set_state(gst::State::Null)?; + while self.pipeline.current_state() != gst::State::Null { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + Ok(()) + } +} + +struct QrTimeStampSink { + pipeline: gst::Pipeline, + bus_task: tokio::task::JoinHandle<()>, + latencies: Arc>>, +} + +impl QrTimeStampSink { + pub async fn try_new( + video_and_stream_information: &VideoAndStreamInformation, + buffers: usize, + ) -> Result { + let CaptureConfiguration::Video(configuration) = &video_and_stream_information + .stream_information + .configuration + else { + unreachable!(); + }; + + let endpoint = &video_and_stream_information.stream_information.endpoints[0]; + let address = endpoint.host_str().unwrap(); + let port = endpoint.port().unwrap(); + + let source_description = match endpoint.scheme().to_lowercase().as_str() { + "udp" => format!("udpsrc address={address} port={port} do-timestamp=true"), + "rtsp" => { + format!("rtspsrc location={endpoint} is-live=true latency=0") + } + _ => unimplemented!(), + }; + + let pipeline_description = match configuration.encode { + VideoEncodeType::H264 => format!( + concat!( + "{source_description}", + " ! application/x-rtp,payload=96", + " ! rtph264depay", + " ! h264parse", + " ! video/x-h264,width={width},height={height},framerate={framerate_num}/{framerate_den}", + " ! avdec_h264 discard-corrupted-frames=true", + " ! videoconvert", + " ! qrtimestampsink name=qrsink", + ), + source_description = source_description, + width = configuration.width, + height = configuration.height, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + ), + VideoEncodeType::Mjpg => format!( + concat!( + "{source_description}", + " ! application/x-rtp,payload=96", + " ! rtpjpegdepay", + " ! jpegparse", + " ! image/jpeg,width={width},height={height},framerate={framerate_num}/{framerate_den}", + " ! jpegdec", + " ! videoconvert", + " ! qrtimestampsink name=qrsink", + ), + source_description = source_description, + width = configuration.width, + height = configuration.height, + // For some reason, rtpjpegpay is making the framerate to be 0/1 + // framerate_num = configuration.frame_interval.denominator, + // framerate_den = configuration.frame_interval.numerator, + framerate_num = 0, + framerate_den = 1, + ), + VideoEncodeType::Yuyv => format!( + concat!( + "{source_description}", + " ! capsfilter caps=\"application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)RAW, sampling=(string)YCbCr-4:2:0, depth=(string)8, width=(string)320, height=(string)320, colorimetry=(string)BT601-5, payload=(int)96, a-framerate=(string)30\"", + " ! rtpvrawdepay", + " ! videorate skip-to-first=true silent=true", + " ! capsfilter caps=\"video/x-raw, width=(int){width}, height=(int){height}, framerate=(fraction){framerate_num}/{framerate_den}, pixel-aspect-ratio=(fraction)1/1, interlace-mode=(string)progressive, format=(string)I420, colorimetry=(string)bt601\"", + " ! rawvideoparse use-sink-caps=true ", + " ! videoconvert", + " ! qrtimestampsink name=qrsink", + ), + source_description = source_description, + width = configuration.width, + height = configuration.height, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + ), + _ => unimplemented!(), + }; + + dbg!(&pipeline_description); + + let pipeline = gst::parse::launch(&pipeline_description) + .unwrap() + .downcast::() + .unwrap(); + + let qrtimestampsink = pipeline.by_name("qrsink").unwrap(); + + let latencies = Arc::new(RwLock::new(Vec::with_capacity(buffers))); + + let latencies_cloned = latencies.clone(); + let pipeline_weak = pipeline.downgrade(); + qrtimestampsink.connect("on-render", false, move |values| { + let _element = values[0].get::().expect("Invalid argument"); + let _info = values[1] + .get::() + .expect("Invalid argument"); + let diff: i64 = values[2].get::().expect("Invalid argument"); + + let pipeline = pipeline_weak.upgrade()?; + + let mut latencies_cloned = latencies_cloned.blocking_write(); + latencies_cloned.push(diff); + if latencies_cloned.len() == buffers { + debug!("All buffers received, posting EOS to qrtimestampsink pipeline..."); + pipeline.post_message(gst::message::Eos::new()).unwrap(); + } + + None + }); + + let bus_task = new_bus_task(pipeline.downgrade()); + + Ok(Self { + pipeline, + bus_task, + latencies, + }) + } + + pub async fn get_latencies(self) -> Result { + get_latencies(&self.pipeline, self.bus_task, self.latencies).await + } +} + +struct Baseline { + pipeline: gst::Pipeline, + bus_task: tokio::task::JoinHandle<()>, + latencies: Arc>>, +} + +impl Baseline { + pub async fn try_new( + video_and_stream_information: &VideoAndStreamInformation, + buffers: usize, + ) -> Result { + let CaptureConfiguration::Video(configuration) = &video_and_stream_information + .stream_information + .configuration + else { + unreachable!(); + }; + + let pipeline_description = match configuration.encode { + VideoEncodeType::H264 => format!( + concat!( + "qrtimestampsrc do-timestamp=true", + " ! videoconvert", + " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den},format=I420", + " ! x264enc tune=zerolatency speed-preset=ultrafast bitrate=5000", + " ! h264parse", + " ! video/x-h264,profile=constrained-baseline,stream-format=byte-stream,alignment=au", + " ! h264parse", + " ! video/x-h264,width={width},height={height},framerate={framerate_num}/{framerate_den}", + " ! avdec_h264 discard-corrupted-frames=true", + " ! videoconvert", + " ! qrtimestampsink name=qrsink", + ), + width = configuration.width, + height = configuration.height, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + ), + VideoEncodeType::Mjpg => format!( + concat!( + "qrtimestampsrc do-timestamp=true", + " ! videoconvert", + " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den},format=I420", + " ! jpegenc quality=85 idct-method=ifast", + " ! jpegparse", + " ! jpegparse", + " ! image/jpeg,width={width},height={height},framerate={framerate_num}/{framerate_den}", + " ! jpegdec", + " ! videoconvert", + " ! qrtimestampsink name=qrsink", + ), + width = configuration.width, + height = configuration.height, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + ), + VideoEncodeType::Yuyv => format!( + concat!( + "qrtimestampsrc do-timestamp=true", + " ! videoconvert", + " ! video/x-raw,width={width},height={height},framerate={framerate_num}/{framerate_den},format=YUY2", + " ! rawvideoparse use-sink-caps=true", + " ! videorate skip-to-first=true silent=true", + " ! capsfilter caps=\"video/x-raw, width=(int){width}, height=(int){height}, framerate=(fraction){framerate_num}/{framerate_den}, pixel-aspect-ratio=(fraction)1/1, interlace-mode=(string)progressive, format=(string)YUY2, colorimetry=(string)bt601\"", + " ! rawvideoparse use-sink-caps=true ", + " ! videoconvert", + " ! qrtimestampsink name=qrsink", + ), + width = configuration.width, + height = configuration.height, + framerate_num = configuration.frame_interval.denominator, + framerate_den = configuration.frame_interval.numerator, + ), + _ => unimplemented!(), + }; + + dbg!(&pipeline_description); + + let pipeline = gst::parse::launch(&pipeline_description) + .unwrap() + .downcast::() + .unwrap(); + + let qrtimestampsink = pipeline.by_name("qrsink").unwrap(); + + let latencies = Arc::new(RwLock::new(Vec::with_capacity(buffers))); + + let latencies_cloned = latencies.clone(); + let pipeline_weak = pipeline.downgrade(); + qrtimestampsink.connect("on-render", false, move |values| { + let _element = values[0].get::().expect("Invalid argument"); + let _info = values[1] + .get::() + .expect("Invalid argument"); + let diff: i64 = values[2].get::().expect("Invalid argument"); + + let pipeline = pipeline_weak.upgrade()?; + + let mut latencies_cloned = latencies_cloned.blocking_write(); + latencies_cloned.push(diff); + if latencies_cloned.len() == buffers { + debug!("All buffers received, posting EOS to qrtimestampsink pipeline..."); + pipeline.post_message(gst::message::Eos::new()).unwrap(); + } + + None + }); + + let bus_task = new_bus_task(pipeline.downgrade()); + + Ok(Self { + pipeline, + bus_task, + latencies, + }) + } + + pub async fn get_latencies(self) -> Result { + get_latencies(&self.pipeline, self.bus_task, self.latencies).await + } +} + +pub async fn start_pipeline(pipeline: &gst::Pipeline, wait: bool) -> Result<()> { + pipeline.set_state(gst::State::Playing)?; + + if wait { + while pipeline.current_state() != gst::State::Playing { + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + } + } + + Ok(()) +} + +async fn get_latencies( + pipeline: &gst::Pipeline, + bus_task: tokio::task::JoinHandle<()>, + latencies: Arc>>, +) -> Result { + bus_task.await?; + + // Cleanup + debug!("Setting qrtimestamp to Null..."); + pipeline.set_state(gst::State::Null).unwrap(); + + Ok(Latencies::new( + latencies + .read() + .await + .iter() + .cloned() + .map(|i| i as f64) + .collect::>(), + )) +} + +fn new_bus_task(pipeline_weak: gst::glib::WeakRef) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let Some(pipeline) = pipeline_weak.upgrade() else { + return; + }; + + let Some(bus) = pipeline.bus() else { + return; + }; + + // TODO: Move this to gst bus tokio async pattern so this tokio task can be cancellable + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + debug!("EOS Recived"); + break; + } + MessageView::Error(err) => { + error!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + break; + } + MessageView::Latency(_latency) => { + pipeline.recalculate_latency().unwrap(); + } + _ => (), + } + } + + debug!("FINISHIG BUS TASK"); + }) +} + +#[derive(Debug)] +struct LatenciesStats { + latency_min: f64, + latency_max: f64, + latency_mean: f64, + latency_std_dev: f64, + latency_variance: f64, + jitter_min: f64, + jitter_max: f64, + jitter_mean: f64, + jitter_std_dev: f64, + jitter_variance: f64, +} + +#[derive(Debug)] +struct Latencies { + latencies: Vec, + jitters: Vec, + stats: LatenciesStats, +} + +impl Latencies { + pub fn new(latencies: Vec) -> Self { + // Note: We are skipping the first frame as it will always have a high value, possibly from the negotiation + let latencies = &latencies[1..]; + trace!("latencies: {latencies:?}"); + + let jitters = &latencies + .windows(2) + .map(|a| a[1] - a[0]) + .collect::>()[..]; + trace!("jitters: {jitters:?}"); + + use statrs::statistics::Statistics; + Self { + latencies: latencies.to_owned(), + jitters: jitters.to_owned(), + stats: LatenciesStats { + latency_min: latencies.min(), + latency_max: latencies.max(), + latency_mean: latencies.mean(), + latency_std_dev: latencies.std_dev(), + latency_variance: latencies.variance(), + jitter_min: jitters.min(), + jitter_max: jitters.max(), + jitter_mean: jitters.mean(), + jitter_std_dev: jitters.std_dev(), + jitter_variance: jitters.variance(), + }, + } + } +} + +async fn compute_baseline_latency( + video_and_stream_information: &VideoAndStreamInformation, + buffers: usize, +) -> Latencies { + let baseline = Baseline::try_new(video_and_stream_information, buffers) + .await + .unwrap(); + start_pipeline(&baseline.pipeline, true).await.unwrap(); + baseline.get_latencies().await.unwrap() +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn main() { + settings::manager::init(None); + stream::manager::init(); + logger::manager::init(); + + gst::init().unwrap(); + + let fps = 30; + let width = 320; + let height = width; + let address = "127.0.0.1"; + let buffers = 100; + + let test_cases = [ + (VideoEncodeType::H264, format!("udp://{address}:5600"), 5.), + (VideoEncodeType::Mjpg, format!("udp://{address}:5600"), 5.), + (VideoEncodeType::Yuyv, format!("udp://{address}:5600"), 8.), + ( + VideoEncodeType::H264, + format!("rtsp://{address}:8554/test"), + 5., + ), + // ( + // VideoEncodeType::Mjpg, + // format!("rtsp://{address}:8554/test"), + // 80., + // ), + // ( + // VideoEncodeType::Yuyv, + // format!("rtsp://{address}:8554/test"), + // 80., + // ), + ]; + + for (encode, endpoint, expected_pipeline_latency) in test_cases { + let video_and_stream_information = VideoAndStreamInformation { + name: "QRTimeStamp - QR".to_string(), + stream_information: StreamInformation { + endpoints: vec![Url::from_str(endpoint.as_str()).unwrap()], + configuration: CaptureConfiguration::Video(VideoCaptureConfiguration { + encode, + height, + width, + frame_interval: FrameInterval { + numerator: 1, + denominator: fps, + }, + }), + extended_configuration: Some(ExtendedConfiguration { + thermal: false, + disable_mavlink: true, + }), + }, + video_source: get_loopback_device().unwrap(), + }; + info!("Testing for: {video_and_stream_information:#?}"); + + info!("Building v4lloopback pipeline (video generation with qrtimestampsrc)..."); + let loopback = V4l2LoopBack::try_new(&video_and_stream_information).unwrap(); + start_pipeline(&loopback.pipeline, true).await.unwrap(); + + info!("Building MCM stream..."); + let stream = Stream::try_new(&video_and_stream_information) + .await + .unwrap(); + let stream_id = stream.id().await.unwrap(); + stream::manager::Manager::add_stream(stream).await.unwrap(); + + info!("Getting baseline latency..."); + let baseline_latencies = + compute_baseline_latency(&video_and_stream_information, buffers).await; + let baseline_latency = baseline_latencies.stats.latency_mean; + let baseline_max_jitter = baseline_latencies.stats.jitter_max; + info!("Baseline latency: {:#?}", &baseline_latencies.stats); + + info!("Building qrtimestamp pipeline (video receiver with qrtimestampsink)..."); + let qrtimestampsink = QrTimeStampSink::try_new(&video_and_stream_information, buffers) + .await + .unwrap(); + start_pipeline(&qrtimestampsink.pipeline, false) + .await + .unwrap(); + + info!("Waiting for qrtimestampsink pipeline to finish..."); + let latencies_result = qrtimestampsink.get_latencies().await; + + info!("Finishing loopback pipeline..."); + let loopback_result = loopback.finish().await; + + info!("Finishing MCM stream..."); + stream::manager::Manager::remove_stream(&stream_id) + .await + .unwrap(); + + info!("Evaluating results..."); + loopback_result.unwrap(); + let latencies = latencies_result.unwrap(); + info!("Pipeline latency: {:#?}", &latencies.stats); + assert!(!latencies.latencies.is_empty()); + + assert!( + latencies.stats.latency_mean - baseline_latency + <= expected_pipeline_latency + baseline_max_jitter + ); + assert!( + latencies.stats.latency_max - baseline_latency + <= expected_pipeline_latency + baseline_max_jitter + ); + assert!(latencies.stats.jitter_mean <= baseline_max_jitter); + assert!(latencies.stats.jitter_max <= baseline_max_jitter); + } + + debug!("END"); +}