diff --git a/examples/async_chat.rs b/examples/async_chat.rs index 923dc72..6d9c8fa 100644 --- a/examples/async_chat.rs +++ b/examples/async_chat.rs @@ -9,7 +9,6 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use async_broadcast::{broadcast, Receiver}; use clap::Parser; use dtls::extension::extension_use_srtp::SrtpProtectionProfile; use log::{error, info}; @@ -77,7 +76,10 @@ struct Cli { level: Level, } -fn init_meter_provider(mut stop_rx: Receiver<()>, worker: Worker) -> SdkMeterProvider { +fn init_meter_provider( + mut stop_rx: async_broadcast::Receiver<()>, + worker: Worker, +) -> SdkMeterProvider { let (tx, rx) = std::sync::mpsc::channel(); std::thread::spawn(move || { @@ -138,7 +140,7 @@ fn main() -> anyhow::Result<()> { ); let media_ports: Vec = (cli.media_port_min..=cli.media_port_max).collect(); - let (stop_tx, mut stop_rx) = broadcast::<()>(1); + let (stop_tx, mut stop_rx) = async_broadcast::broadcast::<()>(1); let mut media_port_thread_map = HashMap::new(); let key_pair = rcgen::KeyPair::generate(&rcgen::PKCS_ECDSA_P256_SHA256)?; diff --git a/examples/sync_chat.rs b/examples/sync_chat.rs index ff08d4e..2aff569 100644 --- a/examples/sync_chat.rs +++ b/examples/sync_chat.rs @@ -68,7 +68,7 @@ struct Cli { } fn init_meter_provider( - stop_rx: crossbeam_channel::Receiver<()>, + mut stop_rx: async_broadcast::Receiver<()>, wait_group: WaitGroup, ) -> SdkMeterProvider { let (tx, rx) = std::sync::mpsc::channel(); @@ -96,7 +96,7 @@ fn init_meter_provider( .build(); let _ = tx.send(meter_provider.clone()); - let _ = stop_rx.recv(); + let _ = stop_rx.recv().await; let _ = meter_provider.shutdown(); worker.done(); info!("meter provider is gracefully down"); @@ -163,8 +163,9 @@ fn main() -> anyhow::Result<()> { .with_sctp_server_config(sctp_server_config) .with_idle_timeout(Duration::from_secs(30)), ); + let (stop_meter_tx, stop_meter_rx) = async_broadcast::broadcast::<()>(1); let wait_group = WaitGroup::new(); - let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.clone()); + let meter_provider = init_meter_provider(stop_meter_rx, wait_group.clone()); for port in media_ports { let worker = wait_group.add(1); @@ -220,7 +221,11 @@ fn main() -> anyhow::Result<()> { println!("Press Ctrl-C to stop"); std::thread::spawn(move || { let mut stop_tx = Some(stop_tx); + let mut stop_meter_tx = Some(stop_meter_tx); ctrlc::set_handler(move || { + if let Some(stop_meter_tx) = stop_meter_tx.take() { + let _ = stop_meter_tx.try_broadcast(()); + } if let Some(stop_tx) = stop_tx.take() { let _ = stop_tx.send(()); }