From ec34bc79ae698e577bf8df608e91f64b1c3f4622 Mon Sep 17 00:00:00 2001 From: yngrtc Date: Fri, 8 Mar 2024 11:14:38 -0800 Subject: [PATCH] integrate OpenTelemetry to sync_chat.rs --- Cargo.toml | 10 +++--- examples/async_chat.rs | 4 +-- {docs => examples}/sync_chat.rs | 58 ++++++++++++++++++++++++++++++--- examples/sync_signal/mod.rs | 4 +++ rtc | 1 - webrtc | 1 - 6 files changed, 64 insertions(+), 14 deletions(-) rename {docs => examples}/sync_chat.rs (76%) delete mode 160000 rtc delete mode 160000 webrtc diff --git a/Cargo.toml b/Cargo.toml index fbbfaf3..e3d6691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,11 +68,11 @@ tokio-util = "0.7" webrtc = "0.10.1" hyper = { version = "0.14.28", features = ["full"] } -#[[example]] -#name = "sync_chat" -#path = "examples/sync_chat.rs" -#test = false -#bench = false +[[example]] +name = "sync_chat" +path = "examples/sync_chat.rs" +test = false +bench = false [[example]] name = "async_chat" diff --git a/examples/async_chat.rs b/examples/async_chat.rs index f1789c6..923dc72 100644 --- a/examples/async_chat.rs +++ b/examples/async_chat.rs @@ -31,7 +31,7 @@ use sfu::{ mod async_signal; -use async_signal::{handle_signaling_message, SignalingMessage, SignalingServer}; +use async_signal::*; #[derive(Default, Debug, Copy, Clone, clap::ValueEnum)] enum Level { @@ -163,8 +163,8 @@ fn main() -> anyhow::Result<()> { .with_sctp_endpoint_config(sctp_endpoint_config) .with_sctp_server_config(sctp_server_config), ); - let wait_group = WaitGroup::new(); let core_num = num_cpus::get(); + let wait_group = WaitGroup::new(); let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.worker()); for port in media_ports { diff --git a/docs/sync_chat.rs b/examples/sync_chat.rs similarity index 76% rename from docs/sync_chat.rs rename to examples/sync_chat.rs index 79e7825..ff08d4e 100644 --- a/docs/sync_chat.rs +++ b/examples/sync_chat.rs @@ -1,5 +1,10 @@ use clap::Parser; use dtls::extension::extension_use_srtp::SrtpProtectionProfile; +use log::info; +use opentelemetry::{/*global,*/ KeyValue}; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_stdout::MetricsExporterBuilder; use rouille::Server; use sfu::{RTCCertificate, ServerConfig}; use std::collections::HashMap; @@ -46,7 +51,7 @@ impl From for log::LevelFilter { struct Cli { #[arg(long, default_value_t = format!("127.0.0.1"))] host: String, - #[arg(long, default_value_t = 8080)] + #[arg(short, long, default_value_t = 8080)] signal_port: u16, #[arg(long, default_value_t = 3478)] media_port_min: u16, @@ -62,7 +67,47 @@ struct Cli { level: Level, } -pub fn main() -> anyhow::Result<()> { +fn init_meter_provider( + stop_rx: crossbeam_channel::Receiver<()>, + wait_group: WaitGroup, +) -> SdkMeterProvider { + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .unwrap(); + + rt.block_on(async move { + let worker = wait_group.add(1); + let exporter = MetricsExporterBuilder::default() + .with_encoder(|writer, data| { + Ok(serde_json::to_writer_pretty(writer, &data).unwrap()) + }) + .build(); + let reader = PeriodicReader::builder(exporter, runtime::TokioCurrentThread) + .with_interval(Duration::from_secs(30)) + .build(); + let meter_provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(Resource::new(vec![KeyValue::new("chat", "metrics")])) + .build(); + let _ = tx.send(meter_provider.clone()); + + let _ = stop_rx.recv(); + let _ = meter_provider.shutdown(); + worker.done(); + info!("meter provider is gracefully down"); + }); + }); + + let meter_provider = rx.recv().unwrap(); + meter_provider +} + +fn main() -> anyhow::Result<()> { let cli = Cli::parse(); if cli.debug { env_logger::Builder::new() @@ -81,8 +126,8 @@ pub fn main() -> anyhow::Result<()> { .init(); } - let certificate = include_bytes!("../examples/util/cer.pem").to_vec(); - let private_key = include_bytes!("../examples/util/key.pem").to_vec(); + let certificate = include_bytes!("util/cer.pem").to_vec(); + let private_key = include_bytes!("util/key.pem").to_vec(); // Figure out some public IP address, since Firefox will not accept 127.0.0.1 for WebRTC traffic. let host_addr = if cli.host == "127.0.0.1" && !cli.force_local_loop { @@ -119,6 +164,7 @@ pub fn main() -> anyhow::Result<()> { .with_idle_timeout(Duration::from_secs(30)), ); let wait_group = WaitGroup::new(); + let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.clone()); for port in media_ports { let worker = wait_group.add(1); @@ -132,9 +178,11 @@ pub fn main() -> anyhow::Result<()> { media_port_thread_map.insert(port, signaling_tx); let server_config = server_config.clone(); + let meter_provider = meter_provider.clone(); // The run loop is on a separate thread to the web server. std::thread::spawn(move || { - if let Err(err) = sync_run(stop_rx, socket, signaling_rx, server_config) { + if let Err(err) = sync_run(stop_rx, socket, signaling_rx, server_config, meter_provider) + { eprintln!("run_sfu got error: {}", err); } worker.done(); diff --git a/examples/sync_signal/mod.rs b/examples/sync_signal/mod.rs index efac884..90625c7 100644 --- a/examples/sync_signal/mod.rs +++ b/examples/sync_signal/mod.rs @@ -2,6 +2,8 @@ use bytes::{Bytes, BytesMut}; use log::error; +use opentelemetry::metrics::MeterProvider; +use opentelemetry_sdk::metrics::SdkMeterProvider; use retty::channel::{InboundPipeline, Pipeline}; use retty::transport::{TaggedBytesMut, TransportContext}; use rouille::{Request, Response, ResponseBody}; @@ -95,10 +97,12 @@ pub fn sync_run( socket: UdpSocket, rx: Receiver, server_config: Arc, + meter_provider: SdkMeterProvider, ) -> anyhow::Result<()> { let server_states = Rc::new(RefCell::new(ServerStates::new( server_config, socket.local_addr()?, + meter_provider.meter(format!("{}", socket.local_addr()?)), )?)); println!("listening {}...", socket.local_addr()?); diff --git a/rtc b/rtc deleted file mode 160000 index 8700cbe..0000000 --- a/rtc +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8700cbe8670ee04f17794b1a05413068f14e4d2e diff --git a/webrtc b/webrtc deleted file mode 160000 index 502671d..0000000 --- a/webrtc +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 502671d8023141b70051e85aaf8748e86ea64dfd