From 15a008fedce367363995601d1392e1dd485e30d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Thu, 7 Nov 2024 12:03:48 -0300 Subject: [PATCH] src: lib: controls: onvif: Refactor Onvif code --- .../controls/onvif/{client.rs => camera.rs} | 114 ++++++--- src/lib/controls/onvif/manager.rs | 222 +++++++++++------- src/lib/controls/onvif/mod.rs | 2 +- 3 files changed, 208 insertions(+), 130 deletions(-) rename src/lib/controls/onvif/{client.rs => camera.rs} (59%) diff --git a/src/lib/controls/onvif/client.rs b/src/lib/controls/onvif/camera.rs similarity index 59% rename from src/lib/controls/onvif/client.rs rename to src/lib/controls/onvif/camera.rs index 582a6957..f4be9293 100644 --- a/src/lib/controls/onvif/client.rs +++ b/src/lib/controls/onvif/camera.rs @@ -4,7 +4,10 @@ use onvif_schema::{devicemgmt::GetDeviceInformationResponse, transport}; use anyhow::{anyhow, Result}; use tracing::*; -pub struct Clients { +use crate::{stream::gst::utils::get_encode_from_rtspsrc, video::types::Format}; + +#[derive(Clone)] +pub struct OnvifCamera { devicemgmt: soap::client::Client, event: Option, deviceio: Option, @@ -13,22 +16,29 @@ pub struct Clients { imaging: Option, ptz: Option, analytics: Option, + pub streams_information: Option>, } pub struct Auth { pub credentials: Option, - pub url: Box, + pub url: url::Url, +} + +#[derive(Debug, Clone)] +pub struct OnvifStreamInformation { + pub stream_uri: url::Url, + pub format: Format, } -impl Clients { - #[instrument(level = "debug", skip(auth))] +impl OnvifCamera { + #[instrument(level = "trace", skip(auth))] pub async fn try_new(auth: &Auth) -> Result { let creds = &auth.credentials; - let devicemgmt_uri = url::Url::parse(&auth.url)?; + let devicemgmt_uri = &auth.url; let base_uri = &devicemgmt_uri.origin().ascii_serialization(); let mut this = Self { - devicemgmt: soap::client::ClientBuilder::new(&devicemgmt_uri) + devicemgmt: soap::client::ClientBuilder::new(devicemgmt_uri) .credentials(creds.clone()) .build(), imaging: None, @@ -38,6 +48,7 @@ impl Clients { media: None, media2: None, analytics: None, + streams_information: None, }; let services = @@ -57,7 +68,7 @@ impl Clients { ); match service.namespace.as_str() { "http://www.onvif.org/ver10/device/wsdl" => { - if service_url != devicemgmt_uri { + if &service_url != devicemgmt_uri { warn!( "advertised device mgmt uri {service_url} not expected {devicemgmt_uri}" ); @@ -70,14 +81,17 @@ impl Clients { "http://www.onvif.org/ver20/imaging/wsdl" => this.imaging = svc, "http://www.onvif.org/ver20/ptz/wsdl" => this.ptz = svc, "http://www.onvif.org/ver20/analytics/wsdl" => this.analytics = svc, - _ => debug!("unknown service: {:?}", service), + _ => trace!("unknown service: {:?}", service), } } + this.streams_information + .replace(this.get_streams_information().await?); + Ok(this) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "trace", skip(self))] pub async fn get_device_information( &self, ) -> Result { @@ -85,16 +99,20 @@ impl Clients { .await } - pub async fn get_stream_uris(&self) -> Result, transport::Error> { - let mut urls: Vec = vec![]; + async fn get_streams_information( + &self, + ) -> Result, transport::Error> { + let mut streams_information = vec![]; + let media_client = self .media .as_ref() .ok_or_else(|| transport::Error::Other("Client media is not available".into()))?; - let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()).await?; - debug!("get_profiles response: {:#?}", &profiles); + let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()) + .await? + .profiles; + trace!("get_profiles response: {:#?}", &profiles); let requests: Vec<_> = profiles - .profiles .iter() .map( |p: &onvif_schema::onvif::Profile| onvif_schema::media::GetStreamUri { @@ -116,34 +134,54 @@ impl Clients { .map(|r| onvif_schema::media::get_stream_uri(media_client, r)), ) .await?; - for (p, resp) in profiles.profiles.iter().zip(responses.iter()) { - debug!("token={} name={}", &p.token.0, &p.name.0); - debug!(" {}", &resp.media_uri.uri); - match url::Url::parse(&resp.media_uri.uri) { - Ok(address) => urls.push(address), + for (profile, stream_uri_response) in profiles.iter().zip(responses.iter()) { + trace!("token={} name={}", &profile.token.0, &profile.name.0); + trace!("\t{}", &stream_uri_response.media_uri.uri); + + let stream_uri = match url::Url::parse(&stream_uri_response.media_uri.uri) { + Ok(stream_url) => stream_url, Err(error) => { error!( "Failed to parse stream url: {}, reason: {error:?}", - &resp.media_uri.uri - ) - } - } - if let Some(ref v) = p.video_encoder_configuration { - debug!( - " {:?}, {}x{}", - v.encoding, v.resolution.width, v.resolution.height - ); - if let Some(ref r) = v.rate_control { - debug!(" {} fps, {} kbps", r.frame_rate_limit, r.bitrate_limit); + &stream_uri_response.media_uri.uri + ); + continue; } - } - if let Some(ref a) = p.audio_encoder_configuration { - debug!( - " audio: {:?}, {} kbps, {} kHz", - a.encoding, a.bitrate, a.sample_rate - ); - } + }; + + let Some(video_encoder_configuration) = &profile.video_encoder_configuration else { + warn!("Skipping uri with no encoders"); + continue; + }; + + let Some(encode) = get_encode_from_rtspsrc(&stream_uri).await else { + continue; + }; + + let video_rate = video_encoder_configuration + .rate_control + .as_ref() + .map(|rate_control| { + (rate_control.frame_rate_limit as f32 + / rate_control.encoding_interval.max(1) as f32) as u32 + }) + .unwrap_or_default(); + + let intervals = vec![crate::video::types::FrameInterval { + numerator: 1, + denominator: video_rate, + }]; + + let sizes = vec![crate::video::types::Size { + width: video_encoder_configuration.resolution.width.max(0) as u32, + height: video_encoder_configuration.resolution.height.max(0) as u32, + intervals, + }]; + + let format = Format { encode, sizes }; + + streams_information.push(OnvifStreamInformation { stream_uri, format }); } - Ok(urls) + Ok(streams_information) } } diff --git a/src/lib/controls/onvif/manager.rs b/src/lib/controls/onvif/manager.rs index 8444670c..e5e22d5b 100644 --- a/src/lib/controls/onvif/manager.rs +++ b/src/lib/controls/onvif/manager.rs @@ -1,55 +1,80 @@ -use std::sync::{Arc, Mutex}; +use std::collections::HashMap; +use std::sync::Arc; -use anyhow::Result; +use anyhow::{anyhow, Context, Result}; +use onvif::soap::client::Credentials; +use tokio::sync::RwLock; use tracing::*; -use crate::{ - stream::{ - manager as stream_manager, - types::{CaptureConfiguration, StreamInformation}, - }, - video::{ - types::VideoSourceType, - video_source_redirect::{VideoSourceRedirect, VideoSourceRedirectType}, - }, - video_stream::types::VideoAndStreamInformation, +use crate::video::{ + types::{Format, VideoSourceType}, + video_source_onvif::{VideoSourceOnvif, VideoSourceOnvifType}, }; -use super::client::*; +use super::camera::*; lazy_static! { - static ref MANAGER: Arc> = Default::default(); + static ref MANAGER: Arc> = Default::default(); } -#[derive(Debug)] pub struct Manager { - _process: tokio::task::JoinHandle>, + mcontext: Arc>, + _task: tokio::task::JoinHandle>, } +pub struct ManagerContext { + cameras: HashMap, + /// Credentials can be either added in runtime, or passed via ENV or CLI args + credentials: HashMap>>, +} + +type StreamURI = String; +type Host = String; + impl Drop for Manager { fn drop(&mut self) { - self._process.abort(); + self._task.abort(); } } impl Default for Manager { #[instrument(level = "trace")] fn default() -> Self { - Self { - _process: tokio::spawn(async move { Manager::discover_loop().await }), - } + let mcontext = Arc::new(RwLock::new(ManagerContext { + cameras: HashMap::new(), + credentials: HashMap::new(), + })); + + let mcontext_clone = mcontext.clone(); + let _task = tokio::spawn(async { Manager::discover_loop(mcontext_clone).await }); + + Self { mcontext, _task } } } impl Manager { // Construct our manager, should be done inside main - #[instrument(level = "debug")] - pub fn init() { + #[instrument(level = "trace")] + pub async fn init() { MANAGER.as_ref(); + + let manager = MANAGER.write().await; + + let mut mcontext = manager.mcontext.write().await; + + // TODO: fill MANAGER.context.credentials with credentials passed by ENV and CLI + // It can be in the form of ":@", but we need to escape special characters need to. + let _ = mcontext.credentials.insert( + "192.168.0.168".to_string(), + Arc::new(RwLock::new(Credentials { + username: "admin".to_string(), + password: "12345".to_string(), + })), + ); } - #[instrument(level = "debug")] - async fn discover_loop() -> Result<()> { + #[instrument(level = "trace", skip(context))] + async fn discover_loop(context: Arc>) -> Result<()> { use futures::stream::StreamExt; use std::net::{IpAddr, Ipv4Addr}; @@ -60,91 +85,106 @@ impl Manager { onvif::discovery::DiscoveryBuilder::default() .listen_address(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) - .duration(tokio::time::Duration::from_secs(5)) + .duration(tokio::time::Duration::from_secs(20)) .run() .await? - .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| async move { - debug!("Device found: {device:#?}"); - - //TODO: We should add support to auth later - let credentials = None; - let clients = match Clients::try_new(&Auth { - credentials: credentials.clone(), - url: device.urls.first().unwrap().to_string().into(), - }) - .await - { - Ok(clients) => clients, - Err(error) => { - error!("Failed creating clients: {error:#?}"); - return; - } - }; + .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| { + let context = context.clone(); - match clients.get_stream_uris().await { - Ok(stream_uris) => { - let mut url = stream_uris[0].clone(); + async move { + trace!("Device found: {device:#?}"); - let name = if let Ok(device) = &clients.get_device_information().await { - format!("{} - {} - {}", device.model, device.serial_number, url) + for url in device.urls { + let host = url + .host() + .map(|host| host.to_string()) + .unwrap_or(url.to_string()); + + let credentials = if let Some(credentials) = + context.read().await.credentials.get(&host) + { + Some(credentials.read().await.clone()) } else { - if let Some(name) = device.name { - format!("{name} - {url}") - } else { - format!("{url}") - } + None }; - if let Some(credentials) = credentials { - if url.set_username(&credentials.username).is_err() { - error!("Failed setting username for {url}"); - } - if url.set_password(Some(&credentials.password)).is_err() { - error!("Failed setting password for {url}"); + trace!("Device {host}. Using credentials: {credentials:?}"); + + let camera = match OnvifCamera::try_new(&Auth { + credentials: credentials.clone(), + url: url.clone(), + }) + .await + { + Ok(camera) => camera, + Err(error) => { + error!(host, "Failed creating camera: {error:?}"); + return; } - } - let video_source_redirect = VideoSourceRedirect { - name: name.clone(), - source: VideoSourceRedirectType::Redirect( - stream_uris[0].to_string(), - ), }; - let video_and_stream = VideoAndStreamInformation { - name: name.clone(), - stream_information: StreamInformation { - endpoints: vec![url], - configuration: CaptureConfiguration::Redirect( - Default::default(), - ), - extended_configuration: None, - }, - video_source: VideoSourceType::Redirect(video_source_redirect), + let Some(streams_informations) = &camera.streams_information else { + error!(host, "Failed getting stream information"); + continue; }; - if let Ok(streams) = stream_manager::streams().await { - for stream in streams { - if let Err(error) = - video_and_stream.conflicts_with(&stream.video_and_stream) - { - debug!("Stream {name} is already registered: {error}"); - return; - } - } - } + trace!(host, "Found streams {streams_informations:?}"); - if let Err(error) = - stream_manager::add_stream_and_start(video_and_stream).await - { - error!("Failed adding stream: {error:#?}"); + let mut context = context.write().await; + for stream_information in streams_informations { + context + .cameras + .entry(stream_information.stream_uri.to_string()) + .and_modify(|old_camera| *old_camera = camera.clone()) + .or_insert_with(|| { + debug!(host, "New stream inserted: {stream_information:?}"); + + camera.clone() + }); } } - Err(error) => { - error!("Failed getting stream uris: {error:#?}"); - } } }) .await; } } + + #[instrument(level = "trace")] + pub async fn get_formats(stream_uri: &StreamURI) -> Result> { + let mcontext = MANAGER.read().await.mcontext.clone(); + let mcontext = mcontext.read().await; + + let camera = mcontext + .cameras + .get(stream_uri) + .context("Camera not found")?; + + let Some(streams_information) = &camera.streams_information else { + return Err(anyhow!("Failed getting stream information")); + }; + + let stream_information = streams_information + .iter() + .find(|&stream_information| &stream_information.stream_uri.to_string() == stream_uri) + .context("Camera not found")?; + + Ok(vec![stream_information.format.clone()]) + } + + #[instrument(level = "trace")] + pub async fn streams_available() -> Vec { + let mcontext = MANAGER.read().await.mcontext.clone(); + let mcontext = mcontext.read().await; + + mcontext + .cameras + .keys() + .map(|stream_uri| { + VideoSourceType::Onvif(VideoSourceOnvif { + name: format!("{stream_uri}"), + source: VideoSourceOnvifType::Onvif(stream_uri.clone()), + }) + }) + .collect::>() + } } diff --git a/src/lib/controls/onvif/mod.rs b/src/lib/controls/onvif/mod.rs index af2485e8..2c424d20 100644 --- a/src/lib/controls/onvif/mod.rs +++ b/src/lib/controls/onvif/mod.rs @@ -1,2 +1,2 @@ -mod client; +pub mod camera; pub mod manager;