Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry OTLP setup for tracing #670

Closed
wants to merge 10 commits into from
298 changes: 250 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ move-vm-ext = { path = "types/move-vm-ext" }
num-derive = "0.4.2"
num-traits = "0.2.14"
once_cell = "1.8.0"
opentelemetry = { version = "0.25" }
opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.25" }
parking_lot = { version = "0.12.1" }
poem = { version = "=1.3.59", features = ["anyhow", "rustls"] }
poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] }
Expand Down Expand Up @@ -283,7 +286,7 @@ tonic-reflection = "0.11"
tonic-web = "0.11"
### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm
tracing = "0.1.40"
tracing-appender = "0.2"
tracing-opentelemetry = { version = "0.26" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-test = "0.2.5"
trie-db = "0.28.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use tokio::sync::RwLock;
use tracing::info;
use url::Url;

const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG";

pub fn get_suzuka_config(
dot_movement: &DotMovement,
) -> Result<suzuka_config::Config, anyhow::Error> {
Expand Down Expand Up @@ -247,10 +245,8 @@ pub async fn basic_coin_transfers(

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let tracing_config = movement_tracing::Config {
timing_log_path: std::env::var_os(TIMING_LOG_ENV).map(Into::into),
};
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
let tracing_config = movement_tracing::Config::from_env()?;
movement_tracing::init_tracing_subscriber(tracing_config)?;

// get the lead dot movement from the environment
let dot_movement = DotMovement::try_from_env()?;
Expand Down
8 changes: 2 additions & 6 deletions networks/suzuka/suzuka-full-node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use suzuka_full_node::manager::Manager;

use std::env;
use std::process::ExitCode;

const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG";

#[tokio::main]
async fn main() -> Result<ExitCode, anyhow::Error> {
let tracing_config =
movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) };
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
let tracing_config = movement_tracing::Config::from_env()?;
movement_tracing::init_tracing_subscriber(tracing_config)?;

// get the config file
let dot_movement = dot_movement::DotMovement::try_from_env()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where

// get the transactions
let transactions_count = block.transactions().len();
let span = info_span!(target: "movement_timing", "execute_block", id = %block_id);
let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id);
let commitment =
self.execute_block_with_retries(block, block_timestamp).instrument(span).await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Task {
Ok(transaction) => match transaction {
Some(transaction) => {
info!(
target : "movement_timing",
target : "movement_telemetry",
batch_id = %batch_id,
tx_hash = %transaction.committed_hash(),
sender = %transaction.sender(),
Expand Down Expand Up @@ -93,7 +93,7 @@ impl Task {

if transactions.len() > 0 {
info!(
target: "movement_timing",
target: "movement_telemetry",
batch_id = %batch_id,
transaction_count = transactions.len(),
"built_batch_write"
Expand Down
11 changes: 11 additions & 0 deletions process-compose/suzuka-full-node/process-compose.telemetry.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: "3"

mzabaluev marked this conversation as resolved.
Show resolved Hide resolved
environment:

processes:
suzuka-full-node:
env:
MOVEMENT_OTLP: http://localhost:4317
m1-da-light-node:
env:
MOVEMENT_OTLP: http://localhost:4317
9 changes: 2 additions & 7 deletions protocol-units/da/m1/light-node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use m1_da_light_node::v1::{LightNodeV1, Manager};

use std::env;

const TIMING_LOG_ENV: &str = "M1_DA_LIGHT_NODE_TIMING_LOG";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tracing_config =
movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) };
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
let tracing_config = movement_tracing::Config::from_env()?;
movement_tracing::init_tracing_subscriber(tracing_config)?;

let dot_movement = dot_movement::DotMovement::try_from_env()?;
let config_path = dot_movement.get_config_json_path();
Expand Down
2 changes: 1 addition & 1 deletion protocol-units/da/m1/light-node/src/v1/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl LightNodeV1 {
Ok(verified_blobs)
}

#[tracing::instrument(target = "movement_timing", level = "debug")]
#[tracing::instrument(target = "movement_telemetry", level = "debug")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have comments on each of these explaining why they are needed to achieve a certain metric as outlined here:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need all of these for telemetry. Originally they were added for perf research.
Do you think we can just remove those that don't contribute to a particular metric goal, or convert them to a plain spans/events with the default target?

async fn get_blobs_at_height(&self, height: u64) -> Result<Vec<Blob>, anyhow::Error> {
let celestia_blobs = self.get_celestia_blobs_at_height(height).await?;
let mut blobs = Vec::new();
Expand Down
22 changes: 11 additions & 11 deletions protocol-units/da/m1/light-node/src/v1/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,25 @@ impl LightNodeV1 {
// this has an internal timeout based on its building time
// so in the worst case scenario we will roughly double the internal timeout
let uid = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
debug!(target: "movement_timing", uid = %uid, "waiting_for_next_block",);
debug!(target: "movement_telemetry", uid = %uid, "waiting_for_next_block",);
let block = memseq.wait_for_next_block().await?;
match block {
Some(block) => {
info!(target: "movement_timing", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block");
info!(target: "movement_telemetry", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block");
sender.send(block).await?;
Ok(())
}
None => {
// no transactions to include
debug!(target: "movement_timing", uid = %uid, "no_transactions_to_include");
debug!(target: "movement_telemetry", uid = %uid, "no_transactions_to_include");
Ok(())
}
}
}

async fn submit_blocks(&self, blocks: &Vec<block::WrappedBlock>) -> Result<(), anyhow::Error> {
for block in blocks {
info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitting_block");
info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitting_block");
}
// get references to celestia blobs in the wrapped blocks
let block_blobs = blocks
Expand All @@ -108,14 +108,14 @@ impl LightNodeV1 {
// use deref on the wrapped block to get the blob
self.pass_through.submit_celestia_blobs(&block_blobs).await?;
for block in blocks {
info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitted_block");
info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitted_block");
}
Ok(())
}

pub async fn submit_with_heuristic(&self, blocks: Vec<Block>) -> Result<(), anyhow::Error> {
for block in &blocks {
info!(target: "movement_timing", block_id = %block.id(), "submitting_block");
info!(target: "movement_telemetry", block_id = %block.id(), "submitting_block");
}

// wrap the blocks in a struct that can be split and compressed
Expand Down Expand Up @@ -168,7 +168,7 @@ impl LightNodeV1 {

info!("block group results: {:?}", block_group_results);
for block_group_result in &block_group_results {
info!(target: "movement_timing", block_group_result = ?block_group_result, "block_group_result");
info!(target: "movement_telemetry", block_group_result = ?block_group_result, "block_group_result");
}

Ok(())
Expand Down Expand Up @@ -204,7 +204,7 @@ impl LightNodeV1 {
Err(_) => {
// The operation timed out
debug!(
target: "movement_timing",
target: "movement_telemetry",
batch_size = blocks.len(),
"timed_out_building_block"
);
Expand All @@ -213,7 +213,7 @@ impl LightNodeV1 {
}
}

info!(target: "movement_timing", block_count = blocks.len(), "read_blocks");
info!(target: "movement_telemetry", block_count = blocks.len(), "read_blocks");

Ok(blocks)
}
Expand All @@ -232,11 +232,11 @@ impl LightNodeV1 {

// submit the blobs, resizing as needed
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitting_block_batch");
info!(target: "movement_telemetry", %block_id, "submitting_block_batch");
}
self.submit_with_heuristic(blocks).await?;
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitted_block_batch");
info!(target: "movement_telemetry", %block_id, "submitted_block_batch");
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Executor {
self.transactions_in_flight
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
info!(
target: "movement_timing",
target: "movement_telemetry",
count,
current,
"decrementing_transactions_in_flight",
Comment on lines 48 to 52
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this one looks like a metric would be better (I haven't converted it to OpenTelemetry yet).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl TransactionPipe {
match request {
MempoolClientRequest::SubmitTransaction(transaction, callback) => {
let span = info_span!(
target: "movement_timing",
target: "movement_telemetry",
"submit_transaction",
tx_hash = %transaction.committed_hash(),
sender = %transaction.sender(),
Expand Down Expand Up @@ -124,13 +124,13 @@ impl TransactionPipe {
// For now, we are going to consider a transaction in flight until it exits the mempool and is sent to the DA as is indicated by WriteBatch.
let in_flight = self.transactions_in_flight.load(std::sync::atomic::Ordering::Relaxed);
info!(
target: "movement_timing",
target: "movement_telemetry",
in_flight = %in_flight,
"transactions_in_flight"
);
if in_flight > self.in_flight_limit {
info!(
target: "movement_timing",
target: "movement_telemetry",
"shedding_load"
);
let status = MempoolStatus::new(MempoolStatusCode::MempoolIsFull);
Expand Down
8 changes: 6 additions & 2 deletions util/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ publish.workspace = true
rust-version.workspace = true

[dependencies]
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json"] }
anyhow = { workspace = true }
tracing-subscriber = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-otlp = { workspace = true }
tracing-opentelemetry = { workspace = true }
#console-subscriber = { workspace = true }

[lints]
Expand Down
97 changes: 41 additions & 56 deletions util/tracing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,63 @@
use tracing_appender::non_blocking::WorkerGuard as AppenderGuard;
use anyhow::anyhow;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig as _;
use opentelemetry_sdk::runtime;
use tracing_subscriber::filter::{self, EnvFilter, LevelFilter};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::prelude::*;

use std::{env, fs::File, path::PathBuf};
use std::env;

const TIMING_ENV: &str = "MOVEMENT_TIMING";

/// The default path name for the timing log file.
/// If the path not specified in [`Config`] and the `MOVEMENT_TIMING`
/// environment variable is set, the log file with this name will be created.
pub const DEFAULT_TIMING_LOG_FILE: &str = "movement-timing.log";

/// A guard for background log appender(s) returned by `init_tracing_subscriber`.
pub struct WorkerGuard {
_drop_me: Option<AppenderGuard>,
}
const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP";

/// Options for the tracing subscriber.
#[derive(Default)]
pub struct Config {
/// Custom name for the timing log file.
pub timing_log_path: Option<PathBuf>,
/// URL of the collector endpoint using the OTLP gRPC protocol.
pub otlp_grpc_url: Option<String>,
}

impl Config {
/// Get the tracing configuration from well-known environment variables.
pub fn from_env() -> Result<Self, anyhow::Error> {
let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) {
Ok(url) => Some(url),
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(s)) => {
return Err(anyhow!(
"value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}",
s.to_string_lossy()
));
}
};
Ok(Self { otlp_grpc_url })
}
}

/// Sets up the tracing subscribers for a Movement process. This should be
/// called at the beginning of a process' `main` function.
/// Returns a guard object that should be dropped at the end of the process'
/// `main`` function scope.
///
/// This function may output encounted errors to the standard error stream,
/// as this is the only facility
pub fn init_tracing_subscriber(config: Config) -> WorkerGuard {
pub fn init_tracing_subscriber(config: Config) -> Result<(), anyhow::Error> {
// TODO: compose console_subscriber as a layer
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
let log_layer = tracing_subscriber::fmt::layer().with_filter(env_filter);

let (timing_layer, timing_writer_guard) = match env::var(TIMING_ENV) {
Err(env::VarError::NotPresent) => {
// Disable timing
(None, None)
}
Ok(timing_directives) => {
let env_filter = EnvFilter::new(timing_directives);
let timing_log_path = config
.timing_log_path
.as_deref()
.unwrap_or_else(|| DEFAULT_TIMING_LOG_FILE.as_ref());
match File::create(timing_log_path) {
Ok(file) => {
let (writer, guard) = tracing_appender::non_blocking(file);
let layer = tracing_subscriber::fmt::layer()
.with_writer(writer)
.json()
.with_span_events(FmtSpan::CLOSE)
.with_filter(env_filter)
.with_filter(filter::filter_fn(|meta| meta.target() == "movement_timing"));
(Some(layer), Some(guard))
}
Err(e) => {
eprintln!("can't create `{}`: {}", timing_log_path.display(), e);
(None, None)
}
}
}
Err(e) => {
eprintln!("invalid {TIMING_ENV}: {e}");
(None, None)
}
let telemetry_layer = if let Some(endpoint) = config.otlp_grpc_url {
let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.install_batch(runtime::Tokio)?
.tracer("movement_tracing");
let layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter::filter_fn(|meta| meta.target() == "movement_telemetry"));
Some(layer)
} else {
None
};

tracing_subscriber::registry().with(log_layer).with(timing_layer).init();
tracing_subscriber::registry().with(log_layer).with(telemetry_layer).init();

WorkerGuard { _drop_me: timing_writer_guard }
Ok(())
}