From 880de741f029a8a1f22264d8d89b2df8f376a874 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 5 Nov 2024 15:48:11 -0300 Subject: [PATCH] WIP --- .gitignore | 2 + src/lib/controls/onvif/camera.rs | 13 +- src/lib/controls/onvif/manager.rs | 46 +++- src/lib/custom/bluerov.rs | 239 ++++++++++-------- src/lib/custom/mod.rs | 15 +- src/lib/logger/manager.rs | 47 +++- src/lib/mavlink/mavlink_camera.rs | 17 +- src/lib/server/pages.rs | 97 +++---- src/lib/settings/manager.rs | 76 +++--- src/lib/stream/gst/utils.rs | 62 +++++ src/lib/stream/manager.rs | 37 +-- src/lib/stream/mod.rs | 47 ++-- src/lib/stream/pipeline/mod.rs | 9 +- src/lib/stream/pipeline/onvif_pipeline.rs | 40 ++- src/lib/stream/pipeline/runner.rs | 24 +- src/lib/stream/sink/webrtc_sink.rs | 36 +-- src/lib/stream/webrtc/signalling_server.rs | 23 +- .../video/local/video_source_local_linux.rs | 143 ++++++----- .../video/local/video_source_local_none.rs | 21 +- src/lib/video/types.rs | 23 +- src/lib/video/video_source.rs | 51 ++-- src/lib/video/video_source_gst.rs | 51 ++-- src/lib/video/video_source_onvif.rs | 20 +- src/lib/video/video_source_redirect.rs | 33 ++- src/lib/video/xml.rs | 16 +- src/main.rs | 4 +- 26 files changed, 717 insertions(+), 475 deletions(-) diff --git a/.gitignore b/.gitignore index cbc56376..3a8ec856 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /src/html/vue.js /target **/*.rs.bk +.vscode +logs diff --git a/src/lib/controls/onvif/camera.rs b/src/lib/controls/onvif/camera.rs index 6aea78a6..76dd51cd 100644 --- a/src/lib/controls/onvif/camera.rs +++ b/src/lib/controls/onvif/camera.rs @@ -4,7 +4,7 @@ use onvif_schema::{devicemgmt::GetDeviceInformationResponse, transport}; use anyhow::{anyhow, Result}; use tracing::*; -use crate::video::types::{Format, VideoEncodeType}; +use crate::{stream::gst::utils::get_encode_from_rtspsrc, video::types::Format}; #[derive(Clone)] pub struct OnvifCamera { @@ -149,15 +149,8 @@ impl OnvifCamera { continue; }; - let encode = match &video_encoder_configuration.encoding { - onvif_schema::onvif::VideoEncoding::H264 => VideoEncodeType::H264, - onvif_schema::onvif::VideoEncoding::__Unknown__(encoding) if encoding == "H265" => { - VideoEncodeType::H265 - } - _unsupported => { - warn!("Skipping unsupported encoding: {_unsupported:?}"); - continue; - } + let Some(encode) = get_encode_from_rtspsrc(&stream_uri).await else { + continue; }; let video_rate = video_encoder_configuration diff --git a/src/lib/controls/onvif/manager.rs b/src/lib/controls/onvif/manager.rs index 1ae8d395..c88b0c13 100644 --- a/src/lib/controls/onvif/manager.rs +++ b/src/lib/controls/onvif/manager.rs @@ -53,10 +53,22 @@ impl Default for Manager { impl Manager { // Construct our manager, should be done inside main #[instrument(level = "trace")] - pub fn init() { + pub async fn init() { MANAGER.as_ref(); + let manager = MANAGER.write().await; + + let mut mcontext = manager.mcontext.write().await; + // todo: fill MANAGER.context.credentials with credentials passed by ENV and CLI + // It can be in the form of ":@", but we need to escape special characters need to. + let _ = mcontext.credentials.insert( + "192.168.0.168".to_string(), + Arc::new(RwLock::new(Credentials { + username: "admin".to_string(), + password: "12345".to_string(), + })), + ); } #[instrument(level = "trace", skip(context))] @@ -71,7 +83,7 @@ impl Manager { onvif::discovery::DiscoveryBuilder::default() .listen_address(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .duration(tokio::time::Duration::from_secs(5)) + .duration(tokio::time::Duration::from_secs(20)) .run() .await? .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| { @@ -80,14 +92,22 @@ impl Manager { async move { trace!("Device found: {device:#?}"); - let credentials = context - .read() - .await - .credentials - .get(&device.address) - .map(|c| c.blocking_read().clone()); - for url in device.urls { + let host = url + .host() + .map(|host| host.to_string()) + .unwrap_or(url.to_string()); + + let credentials = if let Some(credentials) = + context.read().await.credentials.get(&host) + { + Some(credentials.read().await.clone()) + } else { + None + }; + + trace!("Device {host}. Using credentials: {credentials:?}"); + let camera = match OnvifCamera::try_new(&Auth { credentials: credentials.clone(), url: url.clone(), @@ -96,7 +116,7 @@ impl Manager { { Ok(camera) => camera, Err(error) => { - error!("Failed creating camera: {error:#?}"); + error!(host, "Failed creating camera: {error:?}"); return; } }; @@ -105,12 +125,12 @@ impl Manager { { Ok(streams_informations) => streams_informations, Err(error) => { - error!("Failed getting stream uris: {error:#?}"); + error!(host, "Failed getting stream information: {error}"); continue; } }; - trace!("Found streams {streams_informations:?}"); + trace!(host, "Found streams {streams_informations:?}"); let mut context = context.write().await; for stream_information in streams_informations { @@ -118,7 +138,7 @@ impl Manager { .cameras .entry(stream_information.stream_uri.clone()) .or_insert_with(|| { - debug!("New stream inserted: {stream_information:?}"); + debug!(host, "New stream inserted: {stream_information:?}"); camera.clone() }); diff --git a/src/lib/custom/bluerov.rs b/src/lib/custom/bluerov.rs index fa9f7c44..f20d02ce 100644 --- a/src/lib/custom/bluerov.rs +++ b/src/lib/custom/bluerov.rs @@ -1,23 +1,34 @@ +use tracing::*; use url::Url; -use crate::network::utils::get_visible_qgc_address; -use crate::stream::types::*; -use crate::video::{self, types::*, video_source::VideoSourceAvailable}; -use crate::video_stream::types::VideoAndStreamInformation; -use tracing::*; +use crate::{ + network::utils::get_visible_qgc_address, + stream::types::*, + video::{ + self, + types::*, + video_source::{CamerasAvailable, VideoSourceFormats}, + }, + video_stream::types::VideoAndStreamInformation, +}; -fn get_cameras_with_encode_type(encode: VideoEncodeType) -> Vec { - let cameras = video::video_source_local::VideoSourceLocal::cameras_available(); - cameras - .iter() - .filter(move |cam| { - cam.inner() - .formats() - .iter() - .any(|format| format.encode == encode) - }) - .cloned() - .collect() +async fn get_cameras_with_encode_type(encode: VideoEncodeType) -> Vec { + let mut result = Vec::new(); + + let cameras = video::video_source_local::VideoSourceLocal::cameras_available().await; + + for camera in cameras.iter() { + if camera + .formats() + .await + .iter() + .any(|format| format.encode == encode) + { + result.push(camera.clone()); + } + } + + result } fn sort_sizes(sizes: &mut [Size]) { @@ -27,91 +38,117 @@ fn sort_sizes(sizes: &mut [Size]) { }); } -pub fn udp() -> Vec { - get_cameras_with_encode_type(VideoEncodeType::H264) - .iter() - .enumerate() - .filter_map(|(index, cam)| { - let formats = cam.inner().formats(); - let Some(format) = formats - .iter() - .find(|format| format.encode == VideoEncodeType::H264) - else { - warn!("Unable to find a valid format for {cam:?}"); - return None; - }; - - // Get the biggest resolution possible - let mut sizes = format.sizes.clone(); - sort_sizes(&mut sizes); - - let Some(size) = sizes.last() else { - warn!("Unable to find a valid size for {cam:?}"); - return None; - }; - - Some(VideoAndStreamInformation { - name: format!("UDP Stream {}", index), - stream_information: StreamInformation { - endpoints: vec![ - Url::parse(&format!("udp://192.168.2.1:{}", 5600 + index)).ok()? - ], - configuration: CaptureConfiguration::Video(VideoCaptureConfiguration { - encode: format.encode.clone(), - height: size.height, - width: size.width, - frame_interval: size.intervals.first()?.clone(), - }), - extended_configuration: None, - }, - video_source: cam.clone(), - }) +pub async fn udp() -> Vec { + let mut result = Vec::new(); + + let sources = get_cameras_with_encode_type(VideoEncodeType::H264).await; + + for (index, source) in sources.iter().enumerate() { + let formats = source.formats().await; + + let Some(format) = formats + .iter() + .find(|format| format.encode == VideoEncodeType::H264) + else { + warn!("Unable to find a valid format for {source:?}"); + continue; + }; + + // Get the biggest resolution possible + let mut sizes = format.sizes.clone(); + sort_sizes(&mut sizes); + let Some(size) = sizes.last() else { + warn!("Unable to find a valid size for {source:?}"); + continue; + }; + + let Some(frame_interval) = size.intervals.first().cloned() else { + warn!("Unable to find a frame interval"); + continue; + }; + + let endpoint = match Url::parse(&format!("udp://192.168.2.1:{}", 5600 + index)) { + Ok(url) => url, + Err(error) => { + warn!("Failed to parse URL: {error:?}"); + continue; + } + }; + + result.push(VideoAndStreamInformation { + name: format!("UDP Stream {index}"), + stream_information: StreamInformation { + endpoints: vec![endpoint], + configuration: CaptureConfiguration::Video(VideoCaptureConfiguration { + encode: format.encode.clone(), + height: size.height, + width: size.width, + frame_interval, + }), + extended_configuration: None, + }, + video_source: source.clone(), }) - .collect() + } + + result } -pub fn rtsp() -> Vec { - get_cameras_with_encode_type(VideoEncodeType::H264) - .iter() - .enumerate() - .filter_map(|(index, cam)| { - let formats = cam.inner().formats(); - let Some(format) = formats - .iter() - .find(|format| format.encode == VideoEncodeType::H264) - else { - warn!("Unable to find a valid format for {cam:?}"); - return None; - }; - - // Get the biggest resolution possible - let mut sizes = format.sizes.clone(); - sort_sizes(&mut sizes); - - let Some(size) = sizes.last() else { - warn!("Unable to find a valid size for {cam:?}"); - return None; - }; - - let visible_qgc_ip_address = get_visible_qgc_address(); - - Some(VideoAndStreamInformation { - name: format!("RTSP Stream {index}"), - stream_information: StreamInformation { - endpoints: vec![Url::parse(&format!( - "rtsp://{visible_qgc_ip_address}:8554/video_{index}" - )) - .ok()?], - configuration: CaptureConfiguration::Video(VideoCaptureConfiguration { - encode: format.encode.clone(), - height: size.height, - width: size.width, - frame_interval: size.intervals.first()?.clone(), - }), - extended_configuration: None, - }, - video_source: cam.clone(), - }) - }) - .collect() +pub async fn rtsp() -> Vec { + let mut result = Vec::new(); + + let sources = get_cameras_with_encode_type(VideoEncodeType::H264).await; + + for (index, source) in sources.iter().enumerate() { + let formats = source.formats().await; + + let Some(format) = formats + .iter() + .find(|format| format.encode == VideoEncodeType::H264) + else { + warn!("Unable to find a valid format for {source:?}"); + continue; + }; + + // Get the biggest resolution possible + let mut sizes = format.sizes.clone(); + sort_sizes(&mut sizes); + let Some(size) = sizes.last() else { + warn!("Unable to find a valid size for {source:?}"); + continue; + }; + + let Some(frame_interval) = size.intervals.first().cloned() else { + warn!("Unable to find a frame interval"); + continue; + }; + + let visible_qgc_ip_address = get_visible_qgc_address(); + let endpoint = match Url::parse(&format!( + "rtsp://{visible_qgc_ip_address}:8554/video_{index}" + )) { + Ok(url) => url, + Err(error) => { + warn!("Failed to parse URL: {error:?}"); + continue; + } + }; + + result.push(VideoAndStreamInformation { + name: format!("RTSP Stream {index}"), + stream_information: StreamInformation { + endpoints: vec![endpoint], + configuration: CaptureConfiguration::Video(VideoCaptureConfiguration { + encode: format.encode.clone(), + height: size.height, + width: size.width, + frame_interval, + }), + extended_configuration: None, + }, + video_source: source.clone(), + }); + } + + result } diff --git a/src/lib/custom/mod.rs b/src/lib/custom/mod.rs index 4f2ef0f3..44220acb 100644 --- a/src/lib/custom/mod.rs +++ b/src/lib/custom/mod.rs @@ -1,11 +1,10 @@ -use clap::ValueEnum; - -use crate::cli; -use crate::video_stream::types::VideoAndStreamInformation; - mod bluerov; mod test; +use clap::ValueEnum; + +use crate::{cli, video_stream::types::VideoAndStreamInformation}; + #[derive(ValueEnum, PartialEq, Debug, Clone)] #[clap(rename_all = "verbatim")] pub enum CustomEnvironment { @@ -14,10 +13,10 @@ pub enum CustomEnvironment { WebRTCTest, } -pub fn create_default_streams() -> Vec { +pub async fn create_default_streams() -> Vec { match cli::manager::default_settings() { - Some(CustomEnvironment::BlueROVUDP) => bluerov::udp(), - Some(CustomEnvironment::BlueROVRTSP) => bluerov::rtsp(), + Some(CustomEnvironment::BlueROVUDP) => bluerov::udp().await, + Some(CustomEnvironment::BlueROVRTSP) => bluerov::rtsp().await, Some(CustomEnvironment::WebRTCTest) => test::take_webrtc_stream(), _ => vec![], } diff --git a/src/lib/logger/manager.rs b/src/lib/logger/manager.rs index 7ac03094..517a78e6 100644 --- a/src/lib/logger/manager.rs +++ b/src/lib/logger/manager.rs @@ -1,9 +1,9 @@ -use crate::cli; - use tracing::{metadata::LevelFilter, *}; use tracing_log::LogTracer; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer}; +use crate::cli; + // Start logger, should be done inside main pub fn init() { // Redirect all logs from libs using "Log" @@ -23,7 +23,50 @@ pub fn init() { .add_directive("hyper=off".parse().unwrap()) // Reducing onvif-related libs verbosity .add_directive("yaserde=off".parse().unwrap()) + .add_directive("reqwest=off".parse().unwrap()) + .add_directive("onvif=off".parse().unwrap()) + .add_directive("schema=off".parse().unwrap()) + .add_directive("transport=off".parse().unwrap()) + .add_directive("validate=off".parse().unwrap()) + .add_directive("common=off".parse().unwrap()) + .add_directive("metadatastream=off".parse().unwrap()) + .add_directive("onvif_xsd=off".parse().unwrap()) + .add_directive("radiometry=off".parse().unwrap()) + .add_directive("rules=off".parse().unwrap()) + .add_directive("soap_envelope=off".parse().unwrap()) + .add_directive("types=off".parse().unwrap()) + .add_directive("xmlmime=off".parse().unwrap()) + .add_directive("xop=off".parse().unwrap()) + .add_directive("accesscontrol=off".parse().unwrap()) + .add_directive("accessrules=off".parse().unwrap()) + .add_directive("actionengine=off".parse().unwrap()) + .add_directive("advancedsecurity=off".parse().unwrap()) + .add_directive("analytics=off".parse().unwrap()) + .add_directive("authenticationbehavior=off".parse().unwrap()) + .add_directive("b_2=off".parse().unwrap()) + .add_directive("bf_2=off".parse().unwrap()) + .add_directive("credential=off".parse().unwrap()) + .add_directive("deviceio=off".parse().unwrap()) + .add_directive("devicemgmt=off".parse().unwrap()) + .add_directive("display=off".parse().unwrap()) + .add_directive("doorcontrol=off".parse().unwrap()) + .add_directive("event=off".parse().unwrap()) + .add_directive("imaging=off".parse().unwrap()) + .add_directive("media=off".parse().unwrap()) + .add_directive("media2=off".parse().unwrap()) + .add_directive("provisioning=off".parse().unwrap()) + .add_directive("ptz=off".parse().unwrap()) + .add_directive("receiver=off".parse().unwrap()) + .add_directive("recording=off".parse().unwrap()) + .add_directive("replay=off".parse().unwrap()) + .add_directive("schedule=off".parse().unwrap()) + .add_directive("search=off".parse().unwrap()) + .add_directive("t_1=off".parse().unwrap()) + .add_directive("thermal=off".parse().unwrap()) + .add_directive("uplink=off".parse().unwrap()) + .add_directive("ws_addr=off".parse().unwrap()) .add_directive("ws_discovery=off".parse().unwrap()) + .add_directive("xml_xsd=off".parse().unwrap()) .add_directive("onvif::discovery=off".parse().unwrap()); let console_layer = fmt::Layer::new() diff --git a/src/lib/mavlink/mavlink_camera.rs b/src/lib/mavlink/mavlink_camera.rs index ba5a2896..d2156c74 100644 --- a/src/lib/mavlink/mavlink_camera.rs +++ b/src/lib/mavlink/mavlink_camera.rs @@ -1,19 +1,18 @@ use std::sync::Arc; -use crate::{ - cli, mavlink::mavlink_camera_component::MavlinkCameraComponent, - network::utils::get_visible_qgc_address, video::types::VideoSourceType, - video_stream::types::VideoAndStreamInformation, -}; - use anyhow::{anyhow, Context, Result}; use mavlink::{common::MavMessage, MavHeader}; use tokio::sync::broadcast; use tracing::*; use url::Url; -use super::manager::Message; -use super::utils::*; +use crate::{ + cli, mavlink::mavlink_camera_component::MavlinkCameraComponent, + network::utils::get_visible_qgc_address, video::types::VideoSourceType, + video_stream::types::VideoAndStreamInformation, +}; + +use super::{manager::Message, utils::*}; #[derive(Debug)] pub struct MavlinkCamera { @@ -394,7 +393,7 @@ impl MavlinkCameraInner { send_ack(&sender, our_header, their_header, data.command, result); let source_string = camera.video_source_type.inner().source_string(); - let result = match crate::video::video_source::reset_controls(source_string) { + let result = match crate::video::video_source::reset_controls(source_string).await { Ok(_) => mavlink::common::MavResult::MAV_RESULT_ACCEPTED, Err(error) => { error!("Failed to reset {source_string:?} controls with its default values as {:#?}:{:#?}. Reason: {error:?}", our_header.system_id, our_header.component_id); diff --git a/src/lib/server/pages.rs b/src/lib/server/pages.rs index de80c9a7..8b795fb4 100644 --- a/src/lib/server/pages.rs +++ b/src/lib/server/pages.rs @@ -1,26 +1,26 @@ -use crate::controls::types::Control; -use crate::helper; -use crate::settings; -use crate::stream::{gst as gst_stream, manager as stream_manager, types::StreamInformation}; -use crate::video::{ - types::{Format, VideoSourceType}, - video_source, - video_source::VideoSource, - xml, -}; -use crate::video_stream::types::VideoAndStreamInformation; -use actix_web::http::header; +use std::io::prelude::*; + use actix_web::{ + http::header, web::{self, Json}, HttpRequest, HttpResponse, }; - use paperclip::actix::{api_v2_operation, Apiv2Schema, CreatedJson}; use serde::{Deserialize, Serialize}; use tracing::*; use validator::Validate; -use std::io::prelude::*; +use crate::{ + controls::types::Control, + helper, settings, + stream::{gst as gst_stream, manager as stream_manager, types::StreamInformation}, + video::{ + types::{Format, VideoSourceType}, + video_source::{self, VideoSource, VideoSourceFormats}, + xml, + }, + video_stream::types::VideoAndStreamInformation, +}; #[derive(Apiv2Schema, Debug, Serialize)] pub struct ApiVideoSource { @@ -185,39 +185,50 @@ pub async fn info() -> CreatedJson { #[api_v2_operation] /// Provides list of all video sources, with controls and formats pub async fn v4l() -> Json> { - let cameras = video_source::cameras_available(); - let cameras: Vec = cameras - .iter() - .map(|cam| match cam { - VideoSourceType::Local(cam) => ApiVideoSource { - name: cam.name().clone(), - source: cam.source_string().to_string(), - formats: cam.formats(), - controls: cam.controls(), - }, - VideoSourceType::Gst(gst) => ApiVideoSource { - name: gst.name().clone(), - source: gst.source_string().to_string(), - formats: gst.formats(), - controls: gst.controls(), - }, - VideoSourceType::Redirect(redirect) => ApiVideoSource { - name: redirect.name().clone(), - source: redirect.source_string().to_string(), - formats: redirect.formats(), - controls: redirect.controls(), - }, + let cameras = video_source::cameras_available().await; + + use futures::stream::{self, StreamExt}; + + let cameras: Vec = stream::iter(cameras) + .then(|cam| async { + match cam { + VideoSourceType::Local(local) => ApiVideoSource { + name: local.name().clone(), + source: local.source_string().to_string(), + formats: local.formats().await, + controls: local.controls(), + }, + VideoSourceType::Gst(gst) => ApiVideoSource { + name: gst.name().clone(), + source: gst.source_string().to_string(), + formats: gst.formats().await, + controls: gst.controls(), + }, + VideoSourceType::Onvif(onvif) => ApiVideoSource { + name: onvif.name().clone(), + source: onvif.source_string().to_string(), + formats: onvif.formats().await, + controls: onvif.controls(), + }, + VideoSourceType::Redirect(redirect) => ApiVideoSource { + name: redirect.name().clone(), + source: redirect.source_string().to_string(), + formats: redirect.formats().await, + controls: redirect.controls(), + }, + } }) - .collect(); + .collect() + .await; Json(cameras) } #[api_v2_operation] /// Change video control for a specific source -pub fn v4l_post(json: web::Json) -> HttpResponse { +pub async fn v4l_post(json: web::Json) -> HttpResponse { let control = json.into_inner(); - let answer = video_source::set_control(&control.device, control.v4l_id, control.value); + let answer = video_source::set_control(&control.device, control.v4l_id, control.value).await; if let Err(error) = answer { return HttpResponse::NotAcceptable() @@ -232,7 +243,7 @@ pub fn v4l_post(json: web::Json) -> HttpResponse { /// Reset service settings pub async fn reset_settings(query: web::Query) -> HttpResponse { if query.all.unwrap_or_default() { - settings::manager::reset(); + settings::manager::reset().await; if let Err(error) = stream_manager::start_default().await { return HttpResponse::InternalServerError() .content_type("text/plain") @@ -273,7 +284,7 @@ pub async fn streams() -> HttpResponse { pub async fn streams_post(json: web::Json) -> HttpResponse { let json = json.into_inner(); - let video_source = match video_source::get_video_source(&json.source) { + let video_source = match video_source::get_video_source(&json.source).await { Ok(video_source) => video_source, Err(error) => { return HttpResponse::NotAcceptable() @@ -329,7 +340,7 @@ pub fn remove_stream(query: web::Query) -> HttpResponse { #[api_v2_operation] /// Reset controls from a given camera source pub fn camera_reset_controls(json: web::Json) -> HttpResponse { - if let Err(errors) = video_source::reset_controls(&json.device) { + if let Err(errors) = video_source::reset_controls(&json.device).await { let mut error: String = Default::default(); errors .iter() @@ -365,7 +376,7 @@ pub fn camera_reset_controls(json: web::Json) -> HttpRespon /// Provides a xml description file that contains information for a specific device, based on: https://mavlink.io/en/services/camera_def.html pub fn xml(xml_file_request: web::Query) -> HttpResponse { debug!("{xml_file_request:#?}"); - let cameras = video_source::cameras_available(); + let cameras = video_source::cameras_available().await; let camera = cameras .iter() .find(|source| source.inner().source_string() == xml_file_request.file); diff --git a/src/lib/settings/manager.rs b/src/lib/settings/manager.rs index 441c50b8..d8a96174 100644 --- a/src/lib/settings/manager.rs +++ b/src/lib/settings/manager.rs @@ -1,14 +1,15 @@ -use anyhow::{anyhow, Error, Result}; +use std::{ + io::Write, + path::Path, + sync::{Arc, RwLock}, +}; + +use anyhow::{anyhow, Result}; use directories::ProjectDirs; use serde::{Deserialize, Serialize}; -use std::io::prelude::*; -use std::path::Path; -use std::sync::{Arc, RwLock}; use tracing::*; -use crate::cli; -use crate::custom; -use crate::video_stream::types::VideoAndStreamInformation; +use crate::{cli, custom, video_stream::types::VideoAndStreamInformation}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct HeaderSettingsFile { @@ -37,21 +38,21 @@ lazy_static! { static ref MANAGER: Arc> = Arc::new(RwLock::new(Manager { content: None })); } -impl Default for SettingsStruct { - fn default() -> Self { +impl SettingsStruct { + async fn new() -> Self { SettingsStruct { header: HeaderSettingsFile { name: "Camera Manager".to_string(), version: 0, }, mavlink_endpoint: cli::manager::mavlink_connection_string(), - streams: custom::create_default_streams(), + streams: custom::create_default_streams().await, } } } impl Manager { - fn with(file_name: &str) -> ManagerStruct { + async fn with(file_name: &str) -> ManagerStruct { let file_name = if !Path::new(file_name).is_absolute() { match ProjectDirs::from("com", "Blue Robotics", env!("CARGO_PKG_NAME")) { Some(project) => { @@ -75,10 +76,10 @@ impl Manager { let config = if cli::manager::is_reset() { debug!("Settings reset, an empty settings will be loaded and stored as {file_name:?}."); - fallback_settings_with_backup_file(&file_name) + fallback_settings_with_backup_file(&file_name).await } else { debug!("Using settings file: {file_name:?}"); - load_settings_from_file(&file_name) + load_settings_from_file(&file_name).await }; let settings = ManagerStruct { @@ -100,13 +101,15 @@ impl Manager { // Init settings manager with the desired settings file, // will be created if does not exist -pub fn init(file_name: Option<&str>) { - let mut manager = MANAGER.write().unwrap(); +pub async fn init(file_name: Option<&str>) { let file_name = file_name.unwrap_or("settings.json"); - manager.content = Some(Manager::with(file_name)); + let new_content = Manager::with(file_name).await; + + let mut manager = MANAGER.write().unwrap(); + manager.content.replace(new_content); } -fn fallback_settings_with_backup_file(file_name: &str) -> SettingsStruct { +async fn fallback_settings_with_backup_file(file_name: &str) -> SettingsStruct { let backup_file_name = format!("{file_name}.bak"); if std::fs::metadata(file_name).is_ok() { @@ -117,17 +120,23 @@ fn fallback_settings_with_backup_file(file_name: &str) -> SettingsStruct { } } - SettingsStruct::default() + SettingsStruct::new().await } -fn load_settings_from_file(file_name: &str) -> SettingsStruct { - std::fs::read_to_string(file_name) - .map_err(Error::msg) - .and_then(|value| serde_json::from_str(&value).map_err(Error::msg)) - .unwrap_or_else(|error| { +async fn load_settings_from_file(file_name: &str) -> SettingsStruct { + match std::fs::read_to_string(file_name) { + Ok(value) => match serde_json::from_str(&value) { + Ok(settings) => settings, + Err(error) => { + warn!("Failed to parse settings file {file_name:?}. Reason: {error}"); + fallback_settings_with_backup_file(file_name).await + } + }, + Err(error) => { warn!("Failed to load settings file {file_name:?}. Reason: {error}"); - fallback_settings_with_backup_file(file_name) - }) + fallback_settings_with_backup_file(file_name).await + } + } } fn create_directories(file_name: &str) -> Result<()> { @@ -218,11 +227,12 @@ pub fn set_streams(streams: &[VideoAndStreamInformation]) { save(); } -pub fn reset() { +pub async fn reset() { + let new_settings = SettingsStruct::new().await; // Take care of scope RwLock { let mut manager = MANAGER.write().unwrap(); - manager.content.as_mut().unwrap().config = SettingsStruct::default(); + manager.content.as_mut().unwrap().config = new_settings } save(); } @@ -251,9 +261,9 @@ mod tests { format!("/tmp/{}.json", rand_string) } - #[test] - fn test_no_aboslute_path() { - init(None); + #[tokio::test] + async fn test_no_aboslute_path() { + init(None).await; let manager = MANAGER.read().unwrap(); let file_name = &manager.content.as_ref().unwrap().file_name; assert!( @@ -262,9 +272,9 @@ mod tests { ); } - #[test] - fn test_store() { - init(Some(&generate_random_settings_file_name())); + #[tokio::test] + async fn test_store() { + init(Some(&generate_random_settings_file_name())).await; let header = header(); assert_eq!(header.name, "Camera Manager".to_string()); diff --git a/src/lib/stream/gst/utils.rs b/src/lib/stream/gst/utils.rs index c15abbf4..d05e5fdd 100644 --- a/src/lib/stream/gst/utils.rs +++ b/src/lib/stream/gst/utils.rs @@ -1,5 +1,8 @@ use anyhow::{anyhow, Result}; use gst::prelude::*; +use tracing::*; + +use crate::video::types::VideoEncodeType; #[derive(Debug)] pub struct PluginRankConfig { @@ -105,3 +108,62 @@ pub async fn wait_for_element_state_async( Ok(()) } + +#[instrument(level = "debug")] +pub async fn get_encode_from_rtspsrc(stream_uri: &url::Url) -> Option { + use gst::prelude::*; + + let description = format!( + concat!( + "rtspsrc location={location} is-live=true latency=0", + " ! fakesink name=fakesink sync=false" + ), + location = stream_uri.to_string(), + ); + + let pipeline = gst::parse::launch(&description) + .expect("Failed to create pipeline") + .downcast::() + .expect("Pipeline is not a valid gst::Pipeline"); + + pipeline + .set_state(gst::State::Playing) + .expect("Failed to set pipeline to Playing"); + + let fakesink = pipeline + .by_name("fakesink") + .expect("Fakesink not found in pipeline"); + let pad = fakesink.static_pad("sink").expect("Sink pad not found"); + + let encode = tokio::time::timeout(tokio::time::Duration::from_secs(5), wait_for_encode(pad)) + .await + .ok() + .flatten(); + + pipeline + .set_state(gst::State::Null) + .expect("Failed to set pipeline to Null"); + + encode +} + +pub async fn wait_for_encode(pad: gst::Pad) -> Option { + loop { + if let Some(caps) = pad.current_caps() { + trace!("caps from rtspsrc: {caps:?}"); + + if let Some(structure) = caps.structure(0) { + if let Ok(encoding_name) = structure.get::("encoding-name") { + let encoding = match encoding_name.to_ascii_uppercase().as_str() { + "H264" => Some(VideoEncodeType::H264), + "H265" => Some(VideoEncodeType::H265), + _unsupported => None, + }; + + break encoding; + } + } + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } +} diff --git a/src/lib/stream/manager.rs b/src/lib/stream/manager.rs index 11d70444..0d68216d 100644 --- a/src/lib/stream/manager.rs +++ b/src/lib/stream/manager.rs @@ -1,26 +1,22 @@ use std::{collections::HashMap, sync::Arc}; +use anyhow::{anyhow, Context, Error, Result}; +use cached::proc_macro::cached; +use futures::stream::StreamExt; use tokio::sync::RwLock; +use tracing::*; use crate::{ settings, - stream::{types::CaptureConfiguration, webrtc::signalling_protocol::BindAnswer}, - video::video_source, -}; -use crate::{stream::sink::SinkInterface, video::types::VideoSourceType}; -use crate::{ - stream::sink::{webrtc_sink::WebRTCSink, Sink}, + stream::{ + sink::{webrtc_sink::WebRTCSink, Sink, SinkInterface}, + types::CaptureConfiguration, + webrtc::signalling_protocol::BindAnswer, + }, + video::{types::VideoSourceType, video_source}, video_stream::types::VideoAndStreamInformation, }; -use anyhow::{anyhow, Context, Error, Result}; - -type ClonableResult = Result>; - -use cached::proc_macro::cached; -use futures::stream::StreamExt; -use tracing::*; - use super::{ pipeline::PipelineGstreamerInterface, types::StreamStatus, @@ -28,6 +24,8 @@ use super::{ Stream, }; +type ClonableResult = Result>; + #[derive(Default)] pub struct Manager { streams: HashMap, @@ -118,8 +116,8 @@ pub async fn start_default() -> Result<()> { remove_all_streams().await?; // Update all local video sources to make sure that they are available - let mut candidates = video_source::cameras_available(); - update_devices(&mut streams, &mut candidates, true); + let mut candidates = video_source::cameras_available().await; + update_devices(&mut streams, &mut candidates, true).await; // Remove all invalid video_sources let streams: Vec = streams @@ -139,7 +137,7 @@ pub async fn start_default() -> Result<()> { } #[instrument(level = "debug")] -pub fn update_devices( +pub async fn update_devices( streams: &mut Vec, candidates: &mut Vec, verbose: bool, @@ -154,7 +152,10 @@ pub fn update_devices( continue; }; - match source.try_identify_device(capture_configuration, candidates) { + match source + .try_identify_device(capture_configuration, candidates) + .await + { Ok(Some(candidate_source_string)) => { let Some((idx, candidate)) = candidates.iter().enumerate().find_map(|(idx, candidate)| { diff --git a/src/lib/stream/mod.rs b/src/lib/stream/mod.rs index 2a7e3f88..885282a5 100644 --- a/src/lib/stream/mod.rs +++ b/src/lib/stream/mod.rs @@ -8,29 +8,30 @@ pub mod webrtc; use std::sync::Arc; -use tokio::sync::RwLock; - -use crate::mavlink::mavlink_camera::MavlinkCamera; -use crate::video::types::{VideoEncodeType, VideoSourceType}; -use crate::video::video_source::cameras_available; -use crate::video_stream::types::VideoAndStreamInformation; - +use ::gst::prelude::*; +use anyhow::{anyhow, Result}; use manager::Manager; use pipeline::Pipeline; use sink::{create_image_sink, create_rtsp_sink, create_udp_sink}; +use tokio::sync::RwLock; +use tracing::*; use types::*; use webrtc::signalling_protocol::PeerId; -use anyhow::{anyhow, Result}; - -use tracing::*; - -use self::gst::utils::wait_for_element_state; -use self::rtsp::rtsp_scheme::RTSPScheme; -use self::rtsp::rtsp_server::RTSP_SERVER_PORT; -use self::sink::SinkInterface; - -use ::gst::prelude::*; +use crate::{ + mavlink::mavlink_camera::MavlinkCamera, + video::{ + types::{VideoEncodeType, VideoSourceType}, + video_source::cameras_available, + }, + video_stream::types::VideoAndStreamInformation, +}; + +use self::{ + gst::utils::wait_for_element_state, + rtsp::{rtsp_scheme::RTSPScheme, rtsp_server::RTSP_SERVER_PORT}, + sink::SinkInterface, +}; #[derive(Debug)] pub struct Stream { @@ -119,7 +120,7 @@ impl Stream { // If it's a camera, try to update the device if let VideoSourceType::Local(_) = video_and_stream_information.video_source { let mut streams = vec![video_and_stream_information.clone()]; - let mut candidates = cameras_available(); + let mut candidates = cameras_available().await; // Discards any source from other running streams, otherwise we'd be trying to create a stream from a device in use (which is not possible) let current_running_streams = manager::streams() @@ -138,7 +139,7 @@ impl Stream { std::time::Instant::now() - last_report_time >= report_interval; // Find the best candidate - manager::update_devices(&mut streams, &mut candidates, should_report); + manager::update_devices(&mut streams, &mut candidates, should_report).await; video_and_stream_information = streams.first().unwrap().clone(); // Check if the chosen video source is available @@ -147,7 +148,9 @@ impl Stream { .video_source .inner() .source_string(), - ) { + ) + .await + { Ok(best_candidate) => { video_and_stream_information.video_source = best_candidate; } @@ -254,10 +257,10 @@ impl StreamState { mavlink_camera: None, }; - // Do not add any Sink if it's a redirect Pipeline + // Do not add any Sink if it's a Redirect/Onvif pipeline if !matches!( &video_and_stream_information.video_source, - VideoSourceType::Redirect(_) + VideoSourceType::Redirect(_), ) { let endpoints = &video_and_stream_information.stream_information.endpoints; diff --git a/src/lib/stream/pipeline/mod.rs b/src/lib/stream/pipeline/mod.rs index f57c2a63..c4b271fa 100644 --- a/src/lib/stream/pipeline/mod.rs +++ b/src/lib/stream/pipeline/mod.rs @@ -8,13 +8,10 @@ pub mod v4l_pipeline; use std::collections::HashMap; -use enum_dispatch::enum_dispatch; - use anyhow::{anyhow, Context, Result}; - -use tracing::*; - +use enum_dispatch::enum_dispatch; use gst::prelude::*; +use tracing::*; use crate::{ stream::{ @@ -285,7 +282,7 @@ impl PipelineState { ); let sink = self.sinks.remove(sink_id).context(format!( - "Failed to remove sink {sink_id} from Sinks of the Pipeline {pipeline_id}" + "Sink {sink_id} not found in Pipeline {pipeline_id}" ))?; // Terminate the Sink diff --git a/src/lib/stream/pipeline/onvif_pipeline.rs b/src/lib/stream/pipeline/onvif_pipeline.rs index 7e669c54..28b5ea9c 100644 --- a/src/lib/stream/pipeline/onvif_pipeline.rs +++ b/src/lib/stream/pipeline/onvif_pipeline.rs @@ -1,5 +1,13 @@ +use anyhow::{anyhow, Result}; +use gst::prelude::*; +use tracing::*; + use crate::{ - stream::types::CaptureConfiguration, video::types::VideoSourceType, + stream::types::CaptureConfiguration, + video::{ + types::{VideoEncodeType, VideoSourceType}, + video_source_onvif::VideoSourceOnvifType, + }, video_stream::types::VideoAndStreamInformation, }; @@ -8,12 +16,6 @@ use super::{ PIPELINE_VIDEO_TEE_NAME, }; -use anyhow::{anyhow, Context, Result}; - -use tracing::*; - -use gst::prelude::*; - #[derive(Debug)] pub struct OnvifPipeline { pub state: PipelineState, @@ -37,7 +39,7 @@ impl OnvifPipeline { } }; - match &video_and_stream_information.video_source { + let video_source = match &video_and_stream_information.video_source { VideoSourceType::Onvif(source) => source, unsupported => { return Err(anyhow!( @@ -46,21 +48,11 @@ impl OnvifPipeline { } }; - if video_and_stream_information - .stream_information - .endpoints - .len() - > 1 - { - return Err(anyhow!("Onvif must only have one endpoint")); - } - let location = &video_and_stream_information - .stream_information - .endpoints - .first() - .context("Failed to access the fisrt endpoint")?; + let location = { + let VideoSourceOnvifType::Onvif(url) = &video_source.source; + url.to_string() + }; - use crate::video::types::VideoEncodeType; let encode = match &video_and_stream_information .stream_information .configuration @@ -80,6 +72,7 @@ impl OnvifPipeline { "rtspsrc location={location} is-live=true latency=0", " ! application/x-rtp", " ! rtph264depay", + // " ! h264parse", // we might want to add this in the future to expand the compatibility, since it can transform the stream format " ! capsfilter name={filter_name} caps=video/x-h264,stream-format=avc,alignment=au", " ! tee name={video_tee_name} allow-not-linked=true", " ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96", @@ -97,6 +90,7 @@ impl OnvifPipeline { "rtspsrc location={location} is-live=true latency=0", " ! application/x-rtp", " ! rtph265depay", + // " ! h265parse", // we might want to add this in the future to expand the compatibility, since it can transform the stream format " ! capsfilter name={filter_name} caps=video/x-h265,profile={profile},stream-format=byte-stream,alignment=au", " ! tee name={video_tee_name} allow-not-linked=true", " ! rtph265pay aggregate-mode=zero-latency config-interval=10 pt=96", @@ -105,8 +99,8 @@ impl OnvifPipeline { location = location, filter_name = filter_name, video_tee_name = video_tee_name, + profile = "main", rtp_tee_name = rtp_tee_name, - profile = "main", ) } unsupported => { diff --git a/src/lib/stream/pipeline/runner.rs b/src/lib/stream/pipeline/runner.rs index 277b4b43..aaa22d9e 100644 --- a/src/lib/stream/pipeline/runner.rs +++ b/src/lib/stream/pipeline/runner.rs @@ -46,15 +46,25 @@ impl PipelineRunner { debug!("Starting PipelineRunner task..."); - Ok(Self { - start: start_tx, - handle: Some(tokio::spawn(async move { - debug!("PipelineRunner task started!"); + let span = span!( + Level::DEBUG, + "PipelineRunner task", + id = pipeline_id.to_string() + ); + let task_handle = tokio::spawn( + async move { + debug!("task started!"); match Self::runner(pipeline_weak, pipeline_id, start_rx, allow_block).await { - Ok(_) => debug!("PipelineRunner task eneded with no errors"), - Err(error) => warn!("PipelineRunner task ended with error: {error:#?}"), + Ok(_) => debug!("task eneded with no errors"), + Err(error) => warn!("task ended with error: {error:#?}"), }; - })), + } + .instrument(span), + ); + + Ok(Self { + start: start_tx, + handle: Some(task_handle), pipeline_id, }) } diff --git a/src/lib/stream/sink/webrtc_sink.rs b/src/lib/stream/sink/webrtc_sink.rs index 4100bc24..54aaf074 100644 --- a/src/lib/stream/sink/webrtc_sink.rs +++ b/src/lib/stream/sink/webrtc_sink.rs @@ -766,6 +766,8 @@ fn sanitize_sdp(sdp: &gst_sdp::SDPMessage) -> Result { fn customize_sent_sdp(sdp: &gst_sdp::SDPMessage) -> Result { let mut new_sdp = sdp.clone(); + println!("SDP: {:?}", new_sdp.as_text()); + new_sdp.medias_mut().enumerate().for_each(|(media_idx, media)| { let old_media = sdp.media(media_idx as u32).unwrap(); @@ -776,7 +778,7 @@ fn customize_sent_sdp(sdp: &gst_sdp::SDPMessage) -> Result let value = attribute.value().unwrap_or_default(); - trace!("Found a rtpmap attribute w/ value: {value:?}"); + println!("Found a rtpmap attribute w/ value: {value:?}"); lazy_static! { // Looking for something like "96 H264/90000" @@ -793,7 +795,7 @@ fn customize_sent_sdp(sdp: &gst_sdp::SDPMessage) -> Result let encoding = &caps["encoding"]; let clockrate = &caps["clockrate"]; - trace!("rtpmap attribute parsed: payload: {payload:?}, encoding: {encoding:?}, clockrate: {clockrate:?}"); + println!("rtpmap attribute parsed: payload: {payload:?}, encoding: {encoding:?}, clockrate: {clockrate:?}"); if let Some((fmtp_idx, fmtp_attribute)) = old_media.attributes().enumerate().find(|(_, attribute)| { @@ -808,43 +810,47 @@ fn customize_sent_sdp(sdp: &gst_sdp::SDPMessage) -> Result .value() .expect("The fmtp we have found should have a value"); - trace!("Found a fmtp attribute: {value:?}"); + println!("Found a fmtp attribute: {value:?}"); - let Some((payload, values)) = value.split_once(' ') else { + let Some((payload, configs_str)) = value.split_once(' ') else { return; }; - trace!("fmtp attribute parsed: payload: {payload:?}, values: {values:?}"); + let mut new_configs = configs_str.split(';').map(|v|v.to_string()).collect::>(); + new_configs.retain(|v| { + v.starts_with("sprop-parameter-sets") + }); - let mut new_value = vec![payload.to_string()]; + println!("fmtp attribute parsed: payload: {payload:?}, values: {new_configs:?}"); match encoding { "H264" => { // Reference: https://www.iana.org/assignments/media-types/video/H264 const CONSTRAINED_BASELINE_LEVEL_ID: u32 = 0x42e01f; - new_value.push(format!("profile-level-id={CONSTRAINED_BASELINE_LEVEL_ID}")); - new_value.push("level-asymmetry-allowed=1".to_string()); - new_value.push("packetization-mode=1".to_string()); + new_configs.push("packetization-mode=1".to_string()); + new_configs.push(format!("profile-level-id={CONSTRAINED_BASELINE_LEVEL_ID:x}")); + new_configs.push("level-asymmetry-allowed=1".to_string()); + } "H265" => { // Rererence: https://www.iana.org/assignments/media-types/video/H265 const LEVEL_ID: u8 = 93; - new_value.push(format!("level-id={LEVEL_ID}")); + new_configs.push(format!("level-id={LEVEL_ID}")); + } _ => (), } - new_value.push(values.to_string()); - - let new_value = new_value.join(" "); + let new_configs_str = new_configs.join(";"); + let new_value = [payload, &new_configs_str].join(" "); let new_fmtp_attribute = gst_sdp::SDPAttribute::new("fmtp", Some(&new_value)); if let Err(error) = media.replace_attribute(fmtp_idx as u32, new_fmtp_attribute) { - warn!("Failed to customize fmtp attribute from {value:?} to {new_value:?}. Error: {error:?}"); + println!("Failed to customize fmtp attribute \nfrom: {value:?}\nto: {new_value:?}.\nError: {error:?}"); } - trace!("fmtp attribute changed from {value:?} to {new_value:?}"); + println!("fmtp attribute changed \nfrom: {value:?}\nto: {new_value:?}"); } }); }); diff --git a/src/lib/stream/webrtc/signalling_server.rs b/src/lib/stream/webrtc/signalling_server.rs index 19cec31f..31dc392c 100644 --- a/src/lib/stream/webrtc/signalling_server.rs +++ b/src/lib/stream/webrtc/signalling_server.rs @@ -1,15 +1,16 @@ use std::net::SocketAddr; -use crate::{cli, stream}; use anyhow::{anyhow, Context, Result}; -use async_tungstenite::tokio::TokioAdapter; -use async_tungstenite::{tungstenite, WebSocketStream}; +use async_tungstenite::{tokio::TokioAdapter, tungstenite, WebSocketStream}; use futures::{SinkExt, StreamExt}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::mpsc; - +use tokio::{ + net::{TcpListener, TcpStream}, + sync::mpsc, +}; use tracing::*; +use crate::{cli, stream}; + use super::signalling_protocol::{self, *}; #[derive(Debug)] @@ -124,10 +125,10 @@ impl SignallingServer { if let Err(error) = stream::Manager::remove_session(&bind, reason).await { - error!("Failed removing session {bind:?}. Reason: {error}",); + error!("Failed removing session: {bind:?}. Reason: {error}",); } - info!("Session {bind:?} ended by consumer"); + info!("Session: {bind:?} ended by consumer"); continue; } @@ -291,9 +292,9 @@ impl SignallingServer { let (height, width, encode, interval) = match &stream.video_and_stream.stream_information.configuration { crate::stream::types::CaptureConfiguration::Video(configuration) => { - // Filter out non-H264 local streams - if configuration.encode != crate::video::types::VideoEncodeType::H264 { - trace!("Stream {:?} will not be listed in available streams because it's encoding isn't H264 (it's {:?} instead)", stream.video_and_stream.name, configuration.encode); + // Filter out non-H264/h265 local streams + if !matches!(configuration.encode, crate::video::types::VideoEncodeType::H264 | crate::video::types::VideoEncodeType::H265) { + trace!("Stream {:?} will not be listed in available streams because it's encoding isn't H264 or H265 (it's {:?} instead)", stream.video_and_stream.name, configuration.encode); return None; } ( diff --git a/src/lib/video/local/video_source_local_linux.rs b/src/lib/video/local/video_source_local_linux.rs index e96863e3..f72a50e1 100644 --- a/src/lib/video/local/video_source_local_linux.rs +++ b/src/lib/video/local/video_source_local_linux.rs @@ -2,20 +2,21 @@ use std::cmp::max; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use crate::controls::types::*; -use crate::stream::types::VideoCaptureConfiguration; - -use crate::video::types::*; -use crate::video::video_source::{VideoSource, VideoSourceAvailable}; +use anyhow::{anyhow, Result}; +use lazy_static::lazy_static; use paperclip::actix::Apiv2Schema; use regex::Regex; use serde::{Deserialize, Serialize}; - -use anyhow::{anyhow, Result}; - use tracing::*; -use lazy_static::lazy_static; +use crate::{ + controls::types::*, + stream::types::VideoCaptureConfiguration, + video::{ + types::*, + video_source::{CamerasAvailable, VideoSource, VideoSourceFormats}, + }, +}; lazy_static! { static ref VIDEO_FORMATS: Arc>>> = Default::default(); @@ -119,7 +120,7 @@ impl VideoSourceLocalType { } impl VideoSourceLocal { - pub fn try_identify_device( + pub async fn try_identify_device( &mut self, capture_configuration: &VideoCaptureConfiguration, candidates: &[VideoSourceType], @@ -136,7 +137,7 @@ impl VideoSourceLocal { // Rule n.2 - All candidates must share the same encode let candidates = - Self::get_cameras_with_same_encode(&candidates, &capture_configuration.encode); + Self::get_cameras_with_same_encode(&candidates, &capture_configuration.encode).await; let len = candidates.len(); if len == 0 { @@ -200,21 +201,24 @@ impl VideoSourceLocal { .collect() } - fn get_cameras_with_same_encode( + async fn get_cameras_with_same_encode( candidates: &[VideoSourceType], encode: &VideoEncodeType, ) -> Vec { - candidates - .iter() - .filter(|candidate| { - candidate - .inner() - .formats() - .iter() - .any(|format| &format.encode == encode) - }) - .cloned() - .collect() + let mut result = Vec::new(); + + for candidate in candidates.iter() { + if candidate + .formats() + .await + .iter() + .any(|format| &format.encode == encode) + { + result.push(candidate.clone()); + } + } + + result } fn get_cameras_with_same_bus( @@ -322,16 +326,8 @@ fn validate_control(control: &Control, value: i64) -> Result<(), String> { Ok(()) } -impl VideoSource for VideoSourceLocal { - fn name(&self) -> &String { - &self.name - } - - fn source_string(&self) -> &str { - &self.device_path - } - - fn formats(&self) -> Vec { +impl VideoSourceFormats for VideoSourceLocal { + async fn formats(&self) -> Vec { let device_path = self.device_path.clone(); let typ = self.typ.clone(); @@ -483,6 +479,16 @@ impl VideoSource for VideoSourceLocal { formats }) } +} + +impl VideoSource for VideoSourceLocal { + fn name(&self) -> &String { + &self.name + } + + fn source_string(&self) -> &str { + &self.device_path + } fn set_control_by_name(&self, control_name: &str, value: i64) -> std::io::Result<()> { let Some(control_id) = self @@ -687,8 +693,8 @@ impl VideoSource for VideoSourceLocal { } } -impl VideoSourceAvailable for VideoSourceLocal { - fn cameras_available() -> Vec { +impl CamerasAvailable for VideoSourceLocal { + async fn cameras_available() -> Vec { let mut cameras: Vec = vec![]; let cameras_path = { @@ -758,8 +764,11 @@ impl VideoSourceAvailable for VideoSourceLocal { #[cfg(test)] mod tests { + use tracing_test::traced_test; + use super::*; + #[traced_test] #[test] fn bus_decode() { let descriptions = vec![ @@ -798,6 +807,9 @@ mod tests { #[cfg(test)] mod device_identification_tests { + use serial_test::serial; + use tracing_test::traced_test; + use super::*; use crate::{ stream::types::{CaptureConfiguration, StreamInformation}, @@ -805,10 +817,6 @@ mod device_identification_tests { }; use VideoEncodeType::*; - use serial_test::serial; - - use tracing_test::traced_test; - fn add_available_camera( name: &str, device_path: &str, @@ -872,9 +880,10 @@ mod device_identification_tests { } } - #[serial("Using a mocked global VIDEO_FORMATS")] - #[test] - fn test_get_cameras_with_same_name() { + #[serial] + #[traced_test] + #[tokio::test] + async fn test_get_cameras_with_same_name() { VIDEO_FORMATS.lock().unwrap().clear(); let candidates = vec![ @@ -892,9 +901,10 @@ mod device_identification_tests { VIDEO_FORMATS.lock().unwrap().clear(); } - #[serial("Using a mocked global VIDEO_FORMATS")] - #[test] - fn test_get_cameras_with_same_encode() { + #[serial] + #[traced_test] + #[tokio::test] + async fn test_get_cameras_with_same_encode() { VIDEO_FORMATS.lock().unwrap().clear(); let candidates = vec![ @@ -905,15 +915,16 @@ mod device_identification_tests { ]; let same_encode_candidates = - VideoSourceLocal::get_cameras_with_same_encode(&candidates, &H264); + VideoSourceLocal::get_cameras_with_same_encode(&candidates, &H264).await; assert_eq!(candidates[..2].to_vec(), same_encode_candidates); VIDEO_FORMATS.lock().unwrap().clear(); } - #[serial("Using a mocked global VIDEO_FORMATS")] - #[test] - fn test_get_cameras_with_same_bus() { + #[serial] + #[traced_test] + #[tokio::test] + async fn test_get_cameras_with_same_bus() { VIDEO_FORMATS.lock().unwrap().clear(); let candidates = vec![ @@ -932,10 +943,11 @@ mod device_identification_tests { VIDEO_FORMATS.lock().unwrap().clear(); } + // #[traced_test] + #[serial] #[traced_test] - #[serial("Using a mocked global VIDEO_FORMATS")] - #[test] - fn identify_a_candidate_with_same_name_and_encode() { + #[tokio::test] + async fn identify_a_candidate_with_same_name_and_encode() { VIDEO_FORMATS.lock().unwrap().clear(); let candidates = vec![ @@ -956,6 +968,7 @@ mod device_identification_tests { let Ok(Some(candidate_source_string)) = source .to_owned() .try_identify_device(capture_configuration, &candidates) + .await else { panic!("Failed to identify the only device with the same name and encode") }; @@ -969,15 +982,17 @@ mod device_identification_tests { source .to_owned() .try_identify_device(capture_configuration, &candidates[1..]) + .await .expect_err("Failed to identify the only device with the same name and encode"); VIDEO_FORMATS.lock().unwrap().clear(); } + // #[traced_test] + #[serial] #[traced_test] - #[serial("Using a mocked global VIDEO_FORMATS")] - #[test] - fn identify_a_candidate_when_usb_port_changed() { + #[tokio::test] + async fn identify_a_candidate_when_usb_port_changed() { VIDEO_FORMATS.lock().unwrap().clear(); // Before this boot, the device candidates[0] was in "usb_port_0" and the device candidates[1] was in "usb_port_1": @@ -1007,6 +1022,7 @@ mod device_identification_tests { let Ok(Some(candidate_source_string)) = source .to_owned() .try_identify_device(capture_configuration, &candidates) + .await else { panic!("Failed to identify the only device with the same name and encode") }; @@ -1021,16 +1037,18 @@ mod device_identification_tests { source .to_owned() .try_identify_device(capture_configuration, &other_candidates) + .await .expect_err("Failed to identify the only device with the same name and encode"); } VIDEO_FORMATS.lock().unwrap().clear(); } + // #[traced_test] + #[serial] #[traced_test] - #[serial("Using a mocked global VIDEO_FORMATS")] - #[test] - fn identify_a_candidate_when_path_changed() { + #[tokio::test] + async fn identify_a_candidate_when_path_changed() { VIDEO_FORMATS.lock().unwrap().clear(); // Before this boot, the device candidates[0] was in "/dev/video1" and the device candidates[1] was in "/dev/video0": @@ -1061,6 +1079,7 @@ mod device_identification_tests { let Ok(Some(candidate_source_string)) = source .to_owned() .try_identify_device(capture_configuration, &candidates) + .await else { panic!("Failed to identify the only device with the same name and encode") }; @@ -1073,10 +1092,11 @@ mod device_identification_tests { VIDEO_FORMATS.lock().unwrap().clear(); } + // #[traced_test] + #[serial] #[traced_test] - #[serial("Using a mocked global VIDEO_FORMATS")] - #[test] - fn do_not_identify_if_several_devices_with_same_name_and_encode() { + #[tokio::test] + async fn do_not_identify_if_several_devices_with_same_name_and_encode() { VIDEO_FORMATS.lock().unwrap().clear(); // Before this boot, the device candidates[0] was in "usb_port_0" and the device candidates[1] was in "usb_port_1": @@ -1106,6 +1126,7 @@ mod device_identification_tests { assert!(source .to_owned() .try_identify_device(capture_configuration, &candidates) + .await .expect("Failed to identify the only device with the same name and encode") .is_none()) } diff --git a/src/lib/video/local/video_source_local_none.rs b/src/lib/video/local/video_source_local_none.rs index 41ad0a47..d301ffef 100644 --- a/src/lib/video/local/video_source_local_none.rs +++ b/src/lib/video/local/video_source_local_none.rs @@ -1,12 +1,15 @@ -use crate::controls::types::Control; -use crate::stream::types::VideoCaptureConfiguration; -use crate::video::types::*; -use crate::video::video_source::{VideoSource, VideoSourceAvailable}; - +use anyhow::Result; use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; -use anyhow::Result; +use crate::{ + controls::types::Control, + stream::types::VideoCaptureConfiguration, + video::{ + types::*, + video_source::{VideoSource, VideoSourceAvailable}, + }, +}; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum VideoSourceLocalType {} @@ -36,7 +39,7 @@ impl VideoSource for VideoSourceLocal { &self.device_path } - fn formats(&self) -> Vec { + async fn formats(&self) -> Vec { return vec![]; } @@ -81,8 +84,8 @@ impl VideoSource for VideoSourceLocal { } } -impl VideoSourceAvailable for VideoSourceLocal { - fn cameras_available() -> Vec { +impl VideoSourceLocal { + pub async fn cameras_available() -> Vec { return vec![]; } } diff --git a/src/lib/video/types.rs b/src/lib/video/types.rs index 51811a41..b31f4050 100644 --- a/src/lib/video/types.rs +++ b/src/lib/video/types.rs @@ -1,11 +1,15 @@ -use super::video_source_gst::VideoSourceGst; -use super::video_source_local::VideoSourceLocal; -use super::video_source_redirect::VideoSourceRedirect; -use super::{video_source::VideoSource, video_source_onvif::VideoSourceOnvif}; use gst; use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; +use super::{ + video_source::{VideoSource, VideoSourceFormats}, + video_source_gst::VideoSourceGst, + video_source_local::VideoSourceLocal, + video_source_onvif::VideoSourceOnvif, + video_source_redirect::VideoSourceRedirect, +}; + #[derive(Apiv2Schema, Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum VideoSourceType { Gst(VideoSourceGst), @@ -64,6 +68,17 @@ impl VideoSourceType { } } +impl VideoSourceFormats for VideoSourceType { + async fn formats(&self) -> Vec { + match self { + VideoSourceType::Gst(gst) => gst.formats().await, + VideoSourceType::Local(local) => local.formats().await, + VideoSourceType::Onvif(onvif) => onvif.formats().await, + VideoSourceType::Redirect(redirect) => redirect.formats().await, + } + } +} + impl std::str::FromStr for VideoEncodeType { type Err = std::convert::Infallible; diff --git a/src/lib/video/video_source.rs b/src/lib/video/video_source.rs index 9088cade..6bda9714 100644 --- a/src/lib/video/video_source.rs +++ b/src/lib/video/video_source.rs @@ -1,17 +1,23 @@ +use tracing::*; + use crate::controls::types::{Control, ControlType}; -use super::types::*; -use super::video_source_gst::VideoSourceGst; -use super::video_source_local::VideoSourceLocal; -use super::video_source_onvif::VideoSourceOnvif; -use super::video_source_redirect::VideoSourceRedirect; +use super::{ + types::*, video_source_gst::VideoSourceGst, video_source_local::VideoSourceLocal, + video_source_onvif::VideoSourceOnvif, video_source_redirect::VideoSourceRedirect, +}; -use tracing::*; +pub(crate) trait VideoSourceFormats { + async fn formats(&self) -> Vec; +} + +pub(crate) trait CamerasAvailable { + async fn cameras_available() -> Vec; +} pub trait VideoSource { fn name(&self) -> &String; fn source_string(&self) -> &str; - fn formats(&self) -> Vec; fn set_control_by_name(&self, control_name: &str, value: i64) -> std::io::Result<()>; fn set_control_by_id(&self, control_id: u64, value: i64) -> std::io::Result<()>; fn control_value_by_name(&self, control_name: &str) -> std::io::Result; @@ -21,21 +27,18 @@ pub trait VideoSource { fn is_shareable(&self) -> bool; } -pub trait VideoSourceAvailable { - fn cameras_available() -> Vec; -} - -pub fn cameras_available() -> Vec { +pub async fn cameras_available() -> Vec { [ - &VideoSourceLocal::cameras_available()[..], - &VideoSourceGst::cameras_available()[..], - &VideoSourceRedirect::cameras_available()[..], + &VideoSourceLocal::cameras_available().await[..], + &VideoSourceGst::cameras_available().await[..], + &VideoSourceOnvif::cameras_available().await[..], + &VideoSourceRedirect::cameras_available().await[..], ] .concat() } -pub fn get_video_source(source_string: &str) -> Result { - let cameras = cameras_available(); +pub async fn get_video_source(source_string: &str) -> Result { + let cameras = cameras_available().await; if let Some(camera) = cameras .iter() @@ -57,14 +60,14 @@ pub fn get_video_source(source_string: &str) -> Result std::io::Result<()> { - let camera = get_video_source(source_string)?; +pub async fn set_control(source_string: &str, control_id: u64, value: i64) -> std::io::Result<()> { + let camera = get_video_source(source_string).await?; debug!("Set camera ({source_string}) control ({control_id}) value ({value})."); return camera.inner().set_control_by_id(control_id, value); } -pub fn reset_controls(source_string: &str) -> Result<(), Vec> { - let camera = match get_video_source(source_string) { +pub async fn reset_controls(source_string: &str) -> Result<(), Vec> { + let camera = match get_video_source(source_string).await { Ok(camera) => camera, Err(error) => return Err(vec![error]), }; @@ -103,8 +106,8 @@ pub fn reset_controls(source_string: &str) -> Result<(), Vec> { mod tests { use super::*; - #[test] - fn simple_test() { - println!("{:#?}", cameras_available()); + #[tokio::test] + async fn simple_test() { + println!("{:#?}", cameras_available().await); } } diff --git a/src/lib/video/video_source_gst.rs b/src/lib/video/video_source_gst.rs index ed2174a2..b4cb4521 100644 --- a/src/lib/video/video_source_gst.rs +++ b/src/lib/video/video_source_gst.rs @@ -1,13 +1,14 @@ -use crate::controls::types::Control; -use crate::stream::gst::utils::is_gst_plugin_available; - -use super::types::*; -use super::video_source::{VideoSource, VideoSourceAvailable}; -use super::video_source_local::VideoSourceLocal; - use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; +use crate::{controls::types::Control, stream::gst::utils::is_gst_plugin_available}; + +use super::{ + types::*, + video_source::{CamerasAvailable, VideoSource, VideoSourceFormats}, + video_source_local::VideoSourceLocal, +}; + #[derive(Apiv2Schema, Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum VideoSourceGstType { // TODO: local should have a pipeline also @@ -22,22 +23,10 @@ pub struct VideoSourceGst { pub source: VideoSourceGstType, } -impl VideoSource for VideoSourceGst { - fn name(&self) -> &String { - &self.name - } - - fn source_string(&self) -> &str { +impl VideoSourceFormats for VideoSourceGst { + async fn formats(&self) -> Vec { match &self.source { - VideoSourceGstType::Local(local) => local.source_string(), - VideoSourceGstType::Fake(string) => string, - VideoSourceGstType::QR(string) => string, - } - } - - fn formats(&self) -> Vec { - match &self.source { - VideoSourceGstType::Local(local) => local.formats(), + VideoSourceGstType::Local(local) => local.formats().await, VideoSourceGstType::Fake(_) => { let intervals: Vec = [60, 30, 24, 16, 10, 5] .iter() @@ -127,6 +116,20 @@ impl VideoSource for VideoSourceGst { } } } +} + +impl VideoSource for VideoSourceGst { + fn name(&self) -> &String { + &self.name + } + + fn source_string(&self) -> &str { + match &self.source { + VideoSourceGstType::Local(local) => local.source_string(), + VideoSourceGstType::Fake(string) => string, + VideoSourceGstType::QR(string) => string, + } + } fn set_control_by_name(&self, _control_name: &str, _value: i64) -> std::io::Result<()> { Err(std::io::Error::new( @@ -179,8 +182,8 @@ impl VideoSource for VideoSourceGst { } } -impl VideoSourceAvailable for VideoSourceGst { - fn cameras_available() -> Vec { +impl CamerasAvailable for VideoSourceGst { + async fn cameras_available() -> Vec { let mut sources = vec![VideoSourceType::Gst(VideoSourceGst { name: "Fake source".into(), source: VideoSourceGstType::Fake("ball".into()), diff --git a/src/lib/video/video_source_onvif.rs b/src/lib/video/video_source_onvif.rs index b3a7e850..ebb7b70a 100644 --- a/src/lib/video/video_source_onvif.rs +++ b/src/lib/video/video_source_onvif.rs @@ -1,11 +1,13 @@ -use super::types::*; -use super::video_source::VideoSource; -use crate::controls::onvif::manager::Manager as OnvifManager; -use crate::controls::types::Control; - use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; +use crate::controls::{onvif::manager::Manager as OnvifManager, types::Control}; + +use super::{ + types::*, + video_source::{CamerasAvailable, VideoSource, VideoSourceFormats}, +}; + #[derive(Apiv2Schema, Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum VideoSourceOnvifType { Onvif(url::Url), @@ -17,8 +19,8 @@ pub struct VideoSourceOnvif { pub source: VideoSourceOnvifType, } -impl VideoSourceOnvif { - pub async fn formats(&self) -> Vec { +impl VideoSourceFormats for VideoSourceOnvif { + async fn formats(&self) -> Vec { let VideoSourceOnvifType::Onvif(stream_uri) = &self.source; OnvifManager::get_formats(stream_uri) .await @@ -80,8 +82,8 @@ impl VideoSource for VideoSourceOnvif { } } -impl VideoSourceOnvif { - pub async fn cameras_available() -> Vec { +impl CamerasAvailable for VideoSourceOnvif { + async fn cameras_available() -> Vec { OnvifManager::streams_available().await.clone() } } diff --git a/src/lib/video/video_source_redirect.rs b/src/lib/video/video_source_redirect.rs index 353fd947..4f8c9bc7 100644 --- a/src/lib/video/video_source_redirect.rs +++ b/src/lib/video/video_source_redirect.rs @@ -1,10 +1,13 @@ -use super::types::*; -use super::video_source::{VideoSource, VideoSourceAvailable}; -use crate::controls::types::Control; - use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; +use crate::controls::types::Control; + +use super::{ + types::*, + video_source::{CamerasAvailable, VideoSource, VideoSourceFormats}, +}; + #[derive(Apiv2Schema, Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum VideoSourceRedirectType { Redirect(String), @@ -16,6 +19,16 @@ pub struct VideoSourceRedirect { pub source: VideoSourceRedirectType, } +impl VideoSourceFormats for VideoSourceRedirect { + async fn formats(&self) -> Vec { + match &self.source { + VideoSourceRedirectType::Redirect(_) => { + vec![] + } + } + } +} + impl VideoSource for VideoSourceRedirect { fn name(&self) -> &String { &self.name @@ -27,14 +40,6 @@ impl VideoSource for VideoSourceRedirect { } } - fn formats(&self) -> Vec { - match &self.source { - VideoSourceRedirectType::Redirect(_) => { - vec![] - } - } - } - fn set_control_by_name(&self, _control_name: &str, _value: i64) -> std::io::Result<()> { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -78,8 +83,8 @@ impl VideoSource for VideoSourceRedirect { } } -impl VideoSourceAvailable for VideoSourceRedirect { - fn cameras_available() -> Vec { +impl CamerasAvailable for VideoSourceRedirect { + async fn cameras_available() -> Vec { vec![VideoSourceType::Redirect(VideoSourceRedirect { name: "Redirect source".into(), source: VideoSourceRedirectType::Redirect("Redirect".into()), diff --git a/src/lib/video/xml.rs b/src/lib/video/xml.rs index a38fb69d..9b27bdf5 100644 --- a/src/lib/video/xml.rs +++ b/src/lib/video/xml.rs @@ -1,9 +1,10 @@ -use super::video_source::VideoSource; -use crate::controls::types::ControlType; - use anyhow::{anyhow, Result}; use serde::Serialize; +use crate::controls::types::ControlType; + +use super::video_source::VideoSource; + #[derive(Debug, Serialize)] #[serde(rename = "mavlinkcamera")] pub struct MavlinkCamera { @@ -191,15 +192,16 @@ pub fn from_video_source(video_source: &dyn VideoSource) -> Result { #[cfg(test)] mod tests { + use quick_xml::se::to_string; + use crate::video::types::VideoSourceType; use super::*; - use quick_xml::se::to_string; - #[test] - fn test_device() { + #[tokio::test] + async fn test_device() { use crate::video::video_source; - for camera in video_source::cameras_available() { + for camera in video_source::cameras_available().await { if let VideoSourceType::Local(camera) = camera { let xml_string = from_video_source(&camera).unwrap(); println!("{}", xml_string); diff --git a/src/main.rs b/src/main.rs index d14be36f..72a2374b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,9 +9,9 @@ async fn main() -> Result<(), std::io::Error> { // Logger should start before everything else to register any log information logger::manager::init(); // Settings should start before everybody else to ensure that the CLI are stored - settings::manager::init(Some(&cli::manager::settings_file())); + settings::manager::init(Some(&cli::manager::settings_file())).await; - controls::onvif::manager::Manager::init(); + controls::onvif::manager::Manager::init().await; mavlink::manager::Manager::init();