diff --git a/src/lib/controls/onvif/client.rs b/src/lib/controls/onvif/camera.rs similarity index 58% rename from src/lib/controls/onvif/client.rs rename to src/lib/controls/onvif/camera.rs index 582a6957..6aea78a6 100644 --- a/src/lib/controls/onvif/client.rs +++ b/src/lib/controls/onvif/camera.rs @@ -4,7 +4,10 @@ use onvif_schema::{devicemgmt::GetDeviceInformationResponse, transport}; use anyhow::{anyhow, Result}; use tracing::*; -pub struct Clients { +use crate::video::types::{Format, VideoEncodeType}; + +#[derive(Clone)] +pub struct OnvifCamera { devicemgmt: soap::client::Client, event: Option, deviceio: Option, @@ -17,18 +20,24 @@ pub struct Clients { pub struct Auth { pub credentials: Option, - pub url: Box, + pub url: url::Url, +} + +#[derive(Debug, Clone)] +pub struct OnvifStreamInformation { + pub stream_uri: url::Url, + pub format: Format, } -impl Clients { - #[instrument(level = "debug", skip(auth))] +impl OnvifCamera { + #[instrument(level = "trace", skip(auth))] pub async fn try_new(auth: &Auth) -> Result { let creds = &auth.credentials; - let devicemgmt_uri = url::Url::parse(&auth.url)?; + let devicemgmt_uri = &auth.url; let base_uri = &devicemgmt_uri.origin().ascii_serialization(); let mut this = Self { - devicemgmt: soap::client::ClientBuilder::new(&devicemgmt_uri) + devicemgmt: soap::client::ClientBuilder::new(devicemgmt_uri) .credentials(creds.clone()) .build(), imaging: None, @@ -57,7 +66,7 @@ impl Clients { ); match service.namespace.as_str() { "http://www.onvif.org/ver10/device/wsdl" => { - if service_url != devicemgmt_uri { + if &service_url != devicemgmt_uri { warn!( "advertised device mgmt uri {service_url} not expected {devicemgmt_uri}" ); @@ -70,14 +79,14 @@ impl Clients { "http://www.onvif.org/ver20/imaging/wsdl" => this.imaging = svc, "http://www.onvif.org/ver20/ptz/wsdl" => this.ptz = svc, "http://www.onvif.org/ver20/analytics/wsdl" => this.analytics = svc, - _ => debug!("unknown service: {:?}", service), + _ => trace!("unknown service: {:?}", service), } } Ok(this) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "trace", skip(self))] pub async fn get_device_information( &self, ) -> Result { @@ -85,16 +94,20 @@ impl Clients { .await } - pub async fn get_stream_uris(&self) -> Result, transport::Error> { - let mut urls: Vec = vec![]; + pub async fn get_streams_information( + &self, + ) -> Result, transport::Error> { + let mut streams_information = vec![]; + let media_client = self .media .as_ref() .ok_or_else(|| transport::Error::Other("Client media is not available".into()))?; - let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()).await?; - debug!("get_profiles response: {:#?}", &profiles); + let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()) + .await? + .profiles; + trace!("get_profiles response: {:#?}", &profiles); let requests: Vec<_> = profiles - .profiles .iter() .map( |p: &onvif_schema::onvif::Profile| onvif_schema::media::GetStreamUri { @@ -116,34 +129,61 @@ impl Clients { .map(|r| onvif_schema::media::get_stream_uri(media_client, r)), ) .await?; - for (p, resp) in profiles.profiles.iter().zip(responses.iter()) { - debug!("token={} name={}", &p.token.0, &p.name.0); - debug!(" {}", &resp.media_uri.uri); - match url::Url::parse(&resp.media_uri.uri) { - Ok(address) => urls.push(address), + for (profile, stream_uri_response) in profiles.iter().zip(responses.iter()) { + trace!("token={} name={}", &profile.token.0, &profile.name.0); + trace!("\t{}", &stream_uri_response.media_uri.uri); + + let stream_uri = match url::Url::parse(&stream_uri_response.media_uri.uri) { + Ok(stream_url) => stream_url, Err(error) => { error!( "Failed to parse stream url: {}, reason: {error:?}", - &resp.media_uri.uri - ) + &stream_uri_response.media_uri.uri + ); + continue; } - } - if let Some(ref v) = p.video_encoder_configuration { - debug!( - " {:?}, {}x{}", - v.encoding, v.resolution.width, v.resolution.height - ); - if let Some(ref r) = v.rate_control { - debug!(" {} fps, {} kbps", r.frame_rate_limit, r.bitrate_limit); + }; + + let Some(video_encoder_configuration) = &profile.video_encoder_configuration else { + warn!("Skipping uri with no encoders"); + continue; + }; + + let encode = match &video_encoder_configuration.encoding { + onvif_schema::onvif::VideoEncoding::H264 => VideoEncodeType::H264, + onvif_schema::onvif::VideoEncoding::__Unknown__(encoding) if encoding == "H265" => { + VideoEncodeType::H265 } - } - if let Some(ref a) = p.audio_encoder_configuration { - debug!( - " audio: {:?}, {} kbps, {} kHz", - a.encoding, a.bitrate, a.sample_rate - ); - } + _unsupported => { + warn!("Skipping unsupported encoding: {_unsupported:?}"); + continue; + } + }; + + let video_rate = video_encoder_configuration + .rate_control + .as_ref() + .map(|rate_control| { + (rate_control.frame_rate_limit as f32 + / rate_control.encoding_interval.max(1) as f32) as u32 + }) + .unwrap_or_default(); + + let intervals = vec![crate::video::types::FrameInterval { + numerator: 1, + denominator: video_rate, + }]; + + let sizes = vec![crate::video::types::Size { + width: video_encoder_configuration.resolution.width.max(0) as u32, + height: video_encoder_configuration.resolution.height.max(0) as u32, + intervals, + }]; + + let format = Format { encode, sizes }; + + streams_information.push(OnvifStreamInformation { stream_uri, format }); } - Ok(urls) + Ok(streams_information) } } diff --git a/src/lib/controls/onvif/manager.rs b/src/lib/controls/onvif/manager.rs index fad17eab..1ae8d395 100644 --- a/src/lib/controls/onvif/manager.rs +++ b/src/lib/controls/onvif/manager.rs @@ -1,49 +1,66 @@ -use std::sync::{Arc, Mutex}; +use std::collections::HashMap; +use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Error, Result}; +use onvif::soap::client::Credentials; +use tokio::sync::RwLock; use tracing::*; -use crate::stream::types::CaptureConfiguration; -use crate::stream::{manager as stream_manager, types::StreamInformation}; -use crate::video::types::VideoSourceType; -use crate::video::video_source_redirect::{VideoSourceRedirect, VideoSourceRedirectType}; -use crate::video_stream::types::VideoAndStreamInformation; +use crate::video::types::{Format, VideoSourceType}; +use crate::video::video_source_onvif::{VideoSourceOnvif, VideoSourceOnvifType}; -use super::client::*; +use super::camera::*; lazy_static! { - static ref MANAGER: Arc> = Default::default(); + static ref MANAGER: Arc> = Default::default(); } -#[derive(Debug)] pub struct Manager { - _process: tokio::task::JoinHandle>, + mcontext: Arc>, + _task: tokio::task::JoinHandle>, } +pub struct ManagerContext { + cameras: HashMap, + /// Credentials can be either added in runtime, or passed via ENV or CLI args + credentials: HashMap>>, +} + +type StreamURI = url::Url; +type CameraAddress = String; + impl Drop for Manager { fn drop(&mut self) { - self._process.abort(); + self._task.abort(); } } impl Default for Manager { #[instrument(level = "trace")] fn default() -> Self { - Self { - _process: tokio::spawn(async move { Manager::discover_loop().await }), - } + let mcontext = Arc::new(RwLock::new(ManagerContext { + cameras: HashMap::new(), + credentials: HashMap::new(), + })); + + let mcontext_clone = mcontext.clone(); + let _task = tokio::spawn(async { Manager::discover_loop(mcontext_clone).await }); + + Self { mcontext, _task } } } impl Manager { // Construct our manager, should be done inside main - #[instrument(level = "debug")] + #[instrument(level = "trace")] pub fn init() { MANAGER.as_ref(); + + // todo: fill MANAGER.context.credentials with credentials passed by ENV and CLI } - #[instrument(level = "debug")] - async fn discover_loop() -> Result<()> { + #[instrument(level = "trace", skip(context))] + async fn discover_loop(context: Arc>) -> Result<()> { use futures::stream::StreamExt; use std::net::{IpAddr, Ipv4Addr}; @@ -57,88 +74,96 @@ impl Manager { .duration(tokio::time::Duration::from_secs(5)) .run() .await? - .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| async move { - debug!("Device found: {device:#?}"); - - //TODO: We should add support to auth later - let credentials = None; - let clients = match Clients::try_new(&Auth { - credentials: credentials.clone(), - url: device.urls.first().unwrap().to_string().into(), - }) - .await - { - Ok(clients) => clients, - Err(error) => { - error!("Failed creating clients: {error:#?}"); - return; - } - }; - - match clients.get_stream_uris().await { - Ok(stream_uris) => { - let mut url = stream_uris[0].clone(); - - let name = if let Ok(device) = &clients.get_device_information().await { - format!("{} - {} - {}", device.model, device.serial_number, url) - } else { - if let Some(name) = device.name { - format!("{name} - {url}") - } else { - format!("{url}") + .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| { + let context = context.clone(); + + async move { + trace!("Device found: {device:#?}"); + + let credentials = context + .read() + .await + .credentials + .get(&device.address) + .map(|c| c.blocking_read().clone()); + + for url in device.urls { + let camera = match OnvifCamera::try_new(&Auth { + credentials: credentials.clone(), + url: url.clone(), + }) + .await + { + Ok(camera) => camera, + Err(error) => { + error!("Failed creating camera: {error:#?}"); + return; } }; - if let Some(credentials) = credentials { - if url.set_username(&credentials.username).is_err() { - error!("Failed setting username for {url}"); - } - if url.set_password(Some(&credentials.password)).is_err() { - error!("Failed setting password for {url}"); + let streams_informations = match camera.get_streams_information().await + { + Ok(streams_informations) => streams_informations, + Err(error) => { + error!("Failed getting stream uris: {error:#?}"); + continue; } - } - let video_source_redirect = VideoSourceRedirect { - name: name.clone(), - source: VideoSourceRedirectType::Redirect( - stream_uris[0].to_string(), - ), }; - let video_and_stream = VideoAndStreamInformation { - name: name.clone(), - stream_information: StreamInformation { - endpoints: vec![url], - configuration: CaptureConfiguration::Redirect( - Default::default(), - ), - extended_configuration: None, - }, - video_source: VideoSourceType::Redirect(video_source_redirect), - }; + trace!("Found streams {streams_informations:?}"); - if let Ok(streams) = stream_manager::streams().await { - for stream in streams { - if let Err(error) = - video_and_stream.conflicts_with(&stream.video_and_stream) - { - debug!("Stream {name} is already registered: {error}"); - return; - } - } - } + let mut context = context.write().await; + for stream_information in streams_informations { + context + .cameras + .entry(stream_information.stream_uri.clone()) + .or_insert_with(|| { + debug!("New stream inserted: {stream_information:?}"); - if let Err(error) = - stream_manager::add_stream_and_start(video_and_stream).await - { - error!("Failed adding stream: {error:#?}"); + camera.clone() + }); } } - Err(error) => { - error!("Failed getting stream uris: {error:#?}"); - } } }) .await; } } + + #[instrument(level = "trace")] + pub async fn get_formats(stream_uri: &StreamURI) -> Result> { + let manager = MANAGER.read().await; + let mcontext = manager.mcontext.read().await; + + let camera = mcontext + .cameras + .get(stream_uri) + .context("Camera not found")?; + + let streams_information = camera.get_streams_information().await.map_err(Error::msg)?; + + let stream_information = streams_information + .iter() + .find(|&stream_information| &stream_information.stream_uri == stream_uri) + .context("Camera not found")?; + + Ok(vec![stream_information.format.clone()]) + } + + #[instrument(level = "trace")] + pub async fn streams_available() -> Vec { + let mut result = vec![]; + + let manager = MANAGER.read().await; + let context = manager.mcontext.read().await; + + for stream_uri in context.cameras.keys() { + result.push(VideoSourceType::Onvif(VideoSourceOnvif { + name: format!("{stream_uri}"), + source: VideoSourceOnvifType::Onvif(stream_uri.clone()), + })); + } + + result + } } diff --git a/src/lib/controls/onvif/mod.rs b/src/lib/controls/onvif/mod.rs index af2485e8..2c424d20 100644 --- a/src/lib/controls/onvif/mod.rs +++ b/src/lib/controls/onvif/mod.rs @@ -1,2 +1,2 @@ -mod client; +pub mod camera; pub mod manager; diff --git a/src/lib/stream/pipeline/mod.rs b/src/lib/stream/pipeline/mod.rs index 0098d876..f57c2a63 100644 --- a/src/lib/stream/pipeline/mod.rs +++ b/src/lib/stream/pipeline/mod.rs @@ -1,4 +1,5 @@ pub mod fake_pipeline; +pub mod onvif_pipeline; pub mod qr_pipeline; pub mod redirect_pipeline; pub mod runner; @@ -26,6 +27,7 @@ use crate::{ }; use fake_pipeline::FakePipeline; +use onvif_pipeline::OnvifPipeline; use qr_pipeline::QrPipeline; use redirect_pipeline::RedirectPipeline; use runner::PipelineRunner; @@ -45,6 +47,7 @@ pub enum Pipeline { V4l(V4lPipeline), Fake(FakePipeline), QR(QrPipeline), + Onvif(OnvifPipeline), Redirect(RedirectPipeline), } @@ -55,6 +58,7 @@ impl Pipeline { Pipeline::V4l(pipeline) => &mut pipeline.state, Pipeline::Fake(pipeline) => &mut pipeline.state, Pipeline::QR(pipeline) => &mut pipeline.state, + Pipeline::Onvif(pipeline) => &mut pipeline.state, Pipeline::Redirect(pipeline) => &mut pipeline.state, } } @@ -65,6 +69,7 @@ impl Pipeline { Pipeline::V4l(pipeline) => &pipeline.state, Pipeline::Fake(pipeline) => &pipeline.state, Pipeline::QR(pipeline) => &pipeline.state, + Pipeline::Onvif(pipeline) => &pipeline.state, Pipeline::Redirect(pipeline) => &pipeline.state, } } @@ -96,6 +101,9 @@ impl Pipeline { }), #[cfg(not(target_os = "linux"))] VideoSourceType::Local(_) => unreachable!("Local is only supported on linux"), + VideoSourceType::Onvif(_) => Pipeline::Onvif(OnvifPipeline { + state: pipeline_state, + }), VideoSourceType::Redirect(_) => Pipeline::Redirect(RedirectPipeline { state: pipeline_state, }), @@ -152,6 +160,9 @@ impl PipelineState { VideoSourceType::Local(_) => { unreachable!("Local source only supported on linux"); } + VideoSourceType::Onvif(_) => { + OnvifPipeline::try_new(pipeline_id, video_and_stream_information) + } VideoSourceType::Redirect(_) => { RedirectPipeline::try_new(pipeline_id, video_and_stream_information) } diff --git a/src/lib/stream/pipeline/onvif_pipeline.rs b/src/lib/stream/pipeline/onvif_pipeline.rs new file mode 100644 index 00000000..7e669c54 --- /dev/null +++ b/src/lib/stream/pipeline/onvif_pipeline.rs @@ -0,0 +1,134 @@ +use crate::{ + stream::types::CaptureConfiguration, video::types::VideoSourceType, + video_stream::types::VideoAndStreamInformation, +}; + +use super::{ + PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_RTP_TEE_NAME, + PIPELINE_VIDEO_TEE_NAME, +}; + +use anyhow::{anyhow, Context, Result}; + +use tracing::*; + +use gst::prelude::*; + +#[derive(Debug)] +pub struct OnvifPipeline { + pub state: PipelineState, +} + +impl OnvifPipeline { + #[instrument(level = "debug")] + pub fn try_new( + pipeline_id: &uuid::Uuid, + video_and_stream_information: &VideoAndStreamInformation, + ) -> Result { + match &video_and_stream_information + .stream_information + .configuration + { + CaptureConfiguration::Video(configuration) => configuration, + unsupported => { + return Err(anyhow!( + "{unsupported:?} is not supported as Onvif Pipeline" + )) + } + }; + + match &video_and_stream_information.video_source { + VideoSourceType::Onvif(source) => source, + unsupported => { + return Err(anyhow!( + "SourceType {unsupported:?} is not supported as V4l Pipeline" + )) + } + }; + + if video_and_stream_information + .stream_information + .endpoints + .len() + > 1 + { + return Err(anyhow!("Onvif must only have one endpoint")); + } + let location = &video_and_stream_information + .stream_information + .endpoints + .first() + .context("Failed to access the fisrt endpoint")?; + + use crate::video::types::VideoEncodeType; + let encode = match &video_and_stream_information + .stream_information + .configuration + { + CaptureConfiguration::Video(configuration) => Some(configuration.encode.clone()), + _unknown => None, + }; + + let filter_name = format!("{PIPELINE_FILTER_NAME}-{pipeline_id}"); + let video_tee_name = format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}"); + let rtp_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}"); + + let description = match encode { + Some(VideoEncodeType::H264) => { + format!( + concat!( + "rtspsrc location={location} is-live=true latency=0", + " ! application/x-rtp", + " ! rtph264depay", + " ! capsfilter name={filter_name} caps=video/x-h264,stream-format=avc,alignment=au", + " ! tee name={video_tee_name} allow-not-linked=true", + " ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96", + " ! tee name={rtp_tee_name} allow-not-linked=true" + ), + location = location, + filter_name = filter_name, + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, + ) + } + Some(VideoEncodeType::H265) => { + format!( + concat!( + "rtspsrc location={location} is-live=true latency=0", + " ! application/x-rtp", + " ! rtph265depay", + " ! capsfilter name={filter_name} caps=video/x-h265,profile={profile},stream-format=byte-stream,alignment=au", + " ! tee name={video_tee_name} allow-not-linked=true", + " ! rtph265pay aggregate-mode=zero-latency config-interval=10 pt=96", + " ! tee name={rtp_tee_name} allow-not-linked=true" + ), + location = location, + filter_name = filter_name, + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, + profile = "main", + ) + } + unsupported => { + return Err(anyhow!( + "Encode {unsupported:?} is not supported for Onvif Pipeline" + )) + } + }; + + let pipeline = gst::parse::launch(&description)?; + + let pipeline = pipeline + .downcast::() + .expect("Couldn't downcast pipeline"); + + Ok(pipeline) + } +} + +impl PipelineGstreamerInterface for OnvifPipeline { + #[instrument(level = "trace")] + fn is_running(&self) -> bool { + self.state.pipeline_runner.is_running() + } +} diff --git a/src/lib/video/mod.rs b/src/lib/video/mod.rs index 5bc44562..4dee6af1 100644 --- a/src/lib/video/mod.rs +++ b/src/lib/video/mod.rs @@ -6,4 +6,5 @@ pub mod xml; pub mod video_source_gst; pub mod video_source_local; +pub mod video_source_onvif; pub mod video_source_redirect; diff --git a/src/lib/video/types.rs b/src/lib/video/types.rs index eec06957..51811a41 100644 --- a/src/lib/video/types.rs +++ b/src/lib/video/types.rs @@ -1,7 +1,7 @@ -use super::video_source::VideoSource; use super::video_source_gst::VideoSourceGst; use super::video_source_local::VideoSourceLocal; use super::video_source_redirect::VideoSourceRedirect; +use super::{video_source::VideoSource, video_source_onvif::VideoSourceOnvif}; use gst; use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; pub enum VideoSourceType { Gst(VideoSourceGst), Local(VideoSourceLocal), + Onvif(VideoSourceOnvif), Redirect(VideoSourceRedirect), } @@ -57,6 +58,7 @@ impl VideoSourceType { match self { VideoSourceType::Local(local) => local, VideoSourceType::Gst(gst) => gst, + VideoSourceType::Onvif(onvif) => onvif, VideoSourceType::Redirect(redirect) => redirect, } } diff --git a/src/lib/video/video_source.rs b/src/lib/video/video_source.rs index 156244e9..9088cade 100644 --- a/src/lib/video/video_source.rs +++ b/src/lib/video/video_source.rs @@ -3,7 +3,9 @@ use crate::controls::types::{Control, ControlType}; use super::types::*; use super::video_source_gst::VideoSourceGst; use super::video_source_local::VideoSourceLocal; +use super::video_source_onvif::VideoSourceOnvif; use super::video_source_redirect::VideoSourceRedirect; + use tracing::*; pub trait VideoSource { diff --git a/src/lib/video/video_source_onvif.rs b/src/lib/video/video_source_onvif.rs new file mode 100644 index 00000000..b3a7e850 --- /dev/null +++ b/src/lib/video/video_source_onvif.rs @@ -0,0 +1,87 @@ +use super::types::*; +use super::video_source::VideoSource; +use crate::controls::onvif::manager::Manager as OnvifManager; +use crate::controls::types::Control; + +use paperclip::actix::Apiv2Schema; +use serde::{Deserialize, Serialize}; + +#[derive(Apiv2Schema, Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum VideoSourceOnvifType { + Onvif(url::Url), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct VideoSourceOnvif { + pub name: String, + pub source: VideoSourceOnvifType, +} + +impl VideoSourceOnvif { + pub async fn formats(&self) -> Vec { + let VideoSourceOnvifType::Onvif(stream_uri) = &self.source; + OnvifManager::get_formats(stream_uri) + .await + .unwrap_or_default() + } +} + +impl VideoSource for VideoSourceOnvif { + fn name(&self) -> &String { + &self.name + } + + fn source_string(&self) -> &str { + match &self.source { + VideoSourceOnvifType::Onvif(url) => url.as_str(), + } + } + + fn set_control_by_name(&self, _control_name: &str, _value: i64) -> std::io::Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Onvif source doesn't have controls.", + )) + } + + fn set_control_by_id(&self, _control_id: u64, _value: i64) -> std::io::Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Onvif source doesn't have controls.", + )) + } + + fn control_value_by_name(&self, _control_name: &str) -> std::io::Result { + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Onvif source doesn't have controls.", + )) + } + + fn control_value_by_id(&self, _control_id: u64) -> std::io::Result { + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Onvif source doesn't have controls.", + )) + } + + fn controls(&self) -> Vec { + vec![] + } + + fn is_valid(&self) -> bool { + match &self.source { + VideoSourceOnvifType::Onvif(_) => true, + } + } + + fn is_shareable(&self) -> bool { + true + } +} + +impl VideoSourceOnvif { + pub async fn cameras_available() -> Vec { + OnvifManager::streams_available().await.clone() + } +}