Skip to content

Commit

Permalink
integrate OpenTelemetry to sync_chat.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 8, 2024
1 parent 2bf3730 commit 2962483
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 12 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ tokio-util = "0.7"
webrtc = "0.10.1"
hyper = { version = "0.14.28", features = ["full"] }

#[[example]]
#name = "sync_chat"
#path = "examples/sync_chat.rs"
#test = false
#bench = false
[[example]]
name = "sync_chat"
path = "examples/sync_chat.rs"
test = false
bench = false

[[example]]
name = "async_chat"
Expand Down
4 changes: 2 additions & 2 deletions examples/async_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use sfu::{

mod async_signal;

use async_signal::{handle_signaling_message, SignalingMessage, SignalingServer};
use async_signal::*;

#[derive(Default, Debug, Copy, Clone, clap::ValueEnum)]
enum Level {
Expand Down Expand Up @@ -163,8 +163,8 @@ fn main() -> anyhow::Result<()> {
.with_sctp_endpoint_config(sctp_endpoint_config)
.with_sctp_server_config(sctp_server_config),
);
let wait_group = WaitGroup::new();
let core_num = num_cpus::get();
let wait_group = WaitGroup::new();
let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.worker());

for port in media_ports {
Expand Down
58 changes: 53 additions & 5 deletions docs/sync_chat.rs → examples/sync_chat.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use clap::Parser;
use dtls::extension::extension_use_srtp::SrtpProtectionProfile;
use log::info;
use opentelemetry::{/*global,*/ KeyValue};
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use opentelemetry_stdout::MetricsExporterBuilder;
use rouille::Server;
use sfu::{RTCCertificate, ServerConfig};
use std::collections::HashMap;
Expand Down Expand Up @@ -46,7 +51,7 @@ impl From<Level> for log::LevelFilter {
struct Cli {
#[arg(long, default_value_t = format!("127.0.0.1"))]
host: String,
#[arg(long, default_value_t = 8080)]
#[arg(short, long, default_value_t = 8080)]
signal_port: u16,
#[arg(long, default_value_t = 3478)]
media_port_min: u16,
Expand All @@ -62,7 +67,47 @@ struct Cli {
level: Level,
}

pub fn main() -> anyhow::Result<()> {
fn init_meter_provider(
stop_rx: crossbeam_channel::Receiver<()>,
wait_group: WaitGroup,
) -> SdkMeterProvider {
let (tx, rx) = std::sync::mpsc::channel();

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

rt.block_on(async move {
let worker = wait_group.add(1);
let exporter = MetricsExporterBuilder::default()
.with_encoder(|writer, data| {
Ok(serde_json::to_writer_pretty(writer, &data).unwrap())
})
.build();
let reader = PeriodicReader::builder(exporter, runtime::TokioCurrentThread)
.with_interval(Duration::from_secs(30))
.build();
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new("chat", "metrics")]))
.build();
let _ = tx.send(meter_provider.clone());

let _ = stop_rx.recv();
let _ = meter_provider.shutdown();
worker.done();
info!("meter provider is gracefully down");
});
});

let meter_provider = rx.recv().unwrap();
meter_provider
}

fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
if cli.debug {
env_logger::Builder::new()
Expand All @@ -81,8 +126,8 @@ pub fn main() -> anyhow::Result<()> {
.init();
}

let certificate = include_bytes!("../examples/util/cer.pem").to_vec();
let private_key = include_bytes!("../examples/util/key.pem").to_vec();
let certificate = include_bytes!("util/cer.pem").to_vec();
let private_key = include_bytes!("util/key.pem").to_vec();

// Figure out some public IP address, since Firefox will not accept 127.0.0.1 for WebRTC traffic.
let host_addr = if cli.host == "127.0.0.1" && !cli.force_local_loop {
Expand Down Expand Up @@ -119,6 +164,7 @@ pub fn main() -> anyhow::Result<()> {
.with_idle_timeout(Duration::from_secs(30)),
);
let wait_group = WaitGroup::new();
let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.clone());

for port in media_ports {
let worker = wait_group.add(1);
Expand All @@ -132,9 +178,11 @@ pub fn main() -> anyhow::Result<()> {

media_port_thread_map.insert(port, signaling_tx);
let server_config = server_config.clone();
let meter_provider = meter_provider.clone();
// The run loop is on a separate thread to the web server.
std::thread::spawn(move || {
if let Err(err) = sync_run(stop_rx, socket, signaling_rx, server_config) {
if let Err(err) = sync_run(stop_rx, socket, signaling_rx, server_config, meter_provider)
{
eprintln!("run_sfu got error: {}", err);
}
worker.done();
Expand Down
4 changes: 4 additions & 0 deletions examples/sync_signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use bytes::{Bytes, BytesMut};
use log::error;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use retty::channel::{InboundPipeline, Pipeline};
use retty::transport::{TaggedBytesMut, TransportContext};
use rouille::{Request, Response, ResponseBody};
Expand Down Expand Up @@ -95,10 +97,12 @@ pub fn sync_run(
socket: UdpSocket,
rx: Receiver<SignalingMessage>,
server_config: Arc<ServerConfig>,
meter_provider: SdkMeterProvider,
) -> anyhow::Result<()> {
let server_states = Rc::new(RefCell::new(ServerStates::new(
server_config,
socket.local_addr()?,
meter_provider.meter(format!("{}", socket.local_addr()?)),
)?));

println!("listening {}...", socket.local_addr()?);
Expand Down

0 comments on commit 2962483

Please sign in to comment.