Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry OTLP setup for tracing #670

Closed
wants to merge 10 commits into from
309 changes: 260 additions & 49 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ move-vm-ext = { path = "types/move-vm-ext" }
num-derive = "0.4.2"
num-traits = "0.2.14"
once_cell = "1.8.0"
opentelemetry = { version = "0.26" }
opentelemetry-otlp = { version = "0.26" }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] }
opentelemetry-semantic-conventions = { version = "0.26" }
parking_lot = { version = "0.12.1" }
poem = { version = "=1.3.59", features = ["anyhow", "rustls"] }
poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] }
Expand Down Expand Up @@ -283,7 +287,7 @@ tonic-reflection = "0.11"
tonic-web = "0.11"
### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm
tracing = "0.1.40"
tracing-appender = "0.2"
tracing-opentelemetry = { version = "0.27" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-test = "0.2.5"
trie-db = "0.28.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use tokio::sync::RwLock;
use tracing::info;
use url::Url;

const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG";

pub fn get_suzuka_config(
dot_movement: &DotMovement,
) -> Result<suzuka_config::Config, anyhow::Error> {
Expand Down Expand Up @@ -247,10 +245,7 @@ pub async fn basic_coin_transfers(

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let tracing_config = movement_tracing::Config {
timing_log_path: std::env::var_os(TIMING_LOG_ENV).map(Into::into),
};
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
movement_tracing::init_tracing_subscriber();

// get the lead dot movement from the environment
let dot_movement = DotMovement::try_from_env()?;
Expand Down
1 change: 1 addition & 0 deletions networks/suzuka/suzuka-full-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tonic = { workspace = true }
movement-types = { workspace = true }
movement-rest = { workspace = true }
movement-tracing = { workspace = true }
opentelemetry = { workspace = true }
suzuka-config = { workspace = true }
dot-movement = { workspace = true }
godfig = { workspace = true }
Expand Down
13 changes: 7 additions & 6 deletions networks/suzuka/suzuka-full-node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use suzuka_full_node::manager::Manager;

use std::env;
use std::process::ExitCode;

const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG";

#[tokio::main]
async fn main() -> Result<ExitCode, anyhow::Error> {
let tracing_config =
movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) };
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::telemetry::Config::from_env()?;
movement_tracing::telemetry::init_tracer_provider(
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
tracing_config,
)?;

// get the config file
let dot_movement = dot_movement::DotMovement::try_from_env()?;
Expand Down
25 changes: 18 additions & 7 deletions networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ use maptos_dof_execution::{
SignatureVerifiedTransaction, SignedTransaction, Transaction,
};
use mcr_settlement_manager::{CommitmentEventStream, McrSettlementManagerOperations};
use movement_tracing::telemetry;
use movement_types::block::{Block, BlockCommitment, BlockCommitmentEvent};

use anyhow::Context;
use anyhow::Context as _;
use futures::{future::Either, stream};
use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _};
use opentelemetry::{Context as OtelContext, KeyValue};
use suzuka_config::execution_extension;
use tokio::select;
use tokio_stream::{Stream, StreamExt};
use tracing::{debug, error, info, info_span, Instrument};
use tracing::{debug, error, info};

pub struct Task<E, S> {
executor: E,
Expand Down Expand Up @@ -138,11 +141,19 @@ where
})
.await??;

// get the transactions
// get the transactions count before the block is consumed
let transactions_count = block.transactions().len();
let span = info_span!(target: "movement_timing", "execute_block", id = %block_id);
let commitment =
self.execute_block_with_retries(block, block_timestamp).instrument(span).await?;

// execute the block
let tracer = telemetry::tracer();
let span = tracer
.span_builder("execute_block")
.with_attributes([KeyValue::new("id", block_id.to_string())])
.start(&tracer);
let commitment = self
.execute_block_with_retries(block, block_timestamp)
.with_context(OtelContext::current_with_span(span))
.await?;

// decrement the number of transactions in flight on the executor
self.executor.decrement_transactions_in_flight(transactions_count as u64);
Expand All @@ -152,7 +163,7 @@ where
self.da_db.set_synced_height(da_height - 1).await?;

// set the block as executed
self.da_db.add_executed_block(block_id.to_string()).await?;
self.da_db.add_executed_block(block_id.clone()).await?;

// todo: this needs defaults
if self.settlement_enabled() {
Expand Down
64 changes: 43 additions & 21 deletions networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
use m1_da_light_node_client::{BatchWriteRequest, BlobWrite, LightNodeServiceClient};
use m1_da_light_node_util::config::Config as LightNodeConfig;
use maptos_dof_execution::SignedTransaction;
use movement_tracing::telemetry;

use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _};
use opentelemetry::{Context as OtelContext, KeyValue};
use tokio::sync::mpsc;
use tracing::{info, warn};
use tracing::warn;

use std::ops::ControlFlow;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{self, AtomicU64};
use std::time::{Duration, Instant};

const LOGGING_UID: AtomicU64 = AtomicU64::new(0);
Expand All @@ -29,8 +32,21 @@ impl Task {
}

pub async fn run(mut self) -> anyhow::Result<()> {
while let ControlFlow::Continue(()) = self.spawn_write_next_transaction_batch().await? {}
Ok(())
let tracer = telemetry::tracer();
loop {
let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed);
let span = tracer
.span_builder("build_batch")
.with_attributes([KeyValue::new("batch_id", batch_id as i64)])
.start(&tracer);
if let ControlFlow::Break(()) = self
.spawn_write_next_transaction_batch()
.with_context(OtelContext::current_with_span(span))
.await?
{
break Ok(());
}
}
}

/// Constructs a batch of transactions then spawns the write request to the DA in the background.
Expand All @@ -45,7 +61,6 @@ impl Task {

let mut transactions = Vec::new();

let batch_id = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
loop {
let remaining = match half_building_time.checked_sub(start.elapsed().as_millis() as u64)
{
Expand All @@ -64,13 +79,17 @@ impl Task {
{
Ok(transaction) => match transaction {
Some(transaction) => {
info!(
target : "movement_timing",
batch_id = %batch_id,
tx_hash = %transaction.committed_hash(),
sender = %transaction.sender(),
sequence_number = transaction.sequence_number(),
"received transaction",
let otel_cx = OtelContext::current();
otel_cx.span().add_event(
"received_transaction",
vec![
KeyValue::new("tx_hash", transaction.committed_hash().to_string()),
KeyValue::new("sender", transaction.sender().to_string()),
KeyValue::new(
"sequence_number",
transaction.sequence_number() as i64,
),
],
);
let serialized_aptos_transaction = serde_json::to_vec(&transaction)?;
let movement_transaction = movement_types::transaction::Transaction::new(
Expand All @@ -92,20 +111,23 @@ impl Task {
}

if transactions.len() > 0 {
info!(
target: "movement_timing",
batch_id = %batch_id,
transaction_count = transactions.len(),
"built_batch_write"
let otel_cx = OtelContext::current();
otel_cx.span().add_event(
"built_batch_write",
vec![KeyValue::new("transaction_count", transactions.len() as i64)],
);
let batch_write = BatchWriteRequest { blobs: transactions };
// spawn the actual batch write request in the background
let write_span = telemetry::tracer().start_with_context("batch_write", &otel_cx);
let mut da_light_node_client = self.da_light_node_client.clone();
tokio::spawn(async move {
if let Err(e) = da_light_node_client.batch_write(batch_write).await {
warn!("failed to write batch to DA: {:?}", e);
tokio::spawn(
async move {
if let Err(e) = da_light_node_client.batch_write(batch_write).await {
warn!("failed to write batch to DA: {:?}", e);
}
}
});
.with_context(otel_cx.with_span(write_span)),
);
}

Ok(Continue(()))
Expand Down
24 changes: 24 additions & 0 deletions process-compose/suzuka-full-node/process-compose.telemetry.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3"

mzabaluev marked this conversation as resolved.
Show resolved Hide resolved
environment:

processes:
otlp-collector:
is_daemon: true
command: |
docker run -d --rm --name suzuka-otlp-collector -p16686:16686 -p4317:4317 -e COLLECTOR_OTLP_ENABLED=true jaegertracing/all-in-one:latest
shutdown:
command: |
docker stop suzuka-otlp-collector
suzuka-full-node:
depends_on:
otlp-collector:
condition: process_started
environment:
- MOVEMENT_OTLP=http://localhost:4317
m1-da-light-node:
depends_on:
otlp-collector:
condition: process_started
environment:
- MOVEMENT_OTLP=http://localhost:4317
4 changes: 2 additions & 2 deletions process-compose/suzuka-full-node/process-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ processes:
suzuka-full-node:
command: |
suzuka-full-node
env:
RUST_LOG: info,aptos-indexer=debug
environment:
- RUST_LOG=info
depends_on:
m1-da-light-node:
condition: process_healthy
Expand Down
14 changes: 7 additions & 7 deletions protocol-units/da/m1/light-node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use m1_da_light_node::v1::{LightNodeV1, Manager};

use std::env;

const TIMING_LOG_ENV: &str = "M1_DA_LIGHT_NODE_TIMING_LOG";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tracing_config =
movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) };
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::telemetry::Config::from_env()?;
movement_tracing::telemetry::init_tracer_provider(
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
tracing_config,
)?;

let dot_movement = dot_movement::DotMovement::try_from_env()?;
let config_path = dot_movement.get_config_json_path();
Expand Down
2 changes: 1 addition & 1 deletion protocol-units/da/m1/light-node/src/v1/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl LightNodeV1 {
Ok(verified_blobs)
}

#[tracing::instrument(target = "movement_timing", level = "debug")]
#[tracing::instrument(target = "movement_telemetry", level = "debug")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have comments on each of these explaining why they are needed to achieve a certain metric as outlined here:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need all of these for telemetry. Originally they were added for perf research.
Do you think we can just remove those that don't contribute to a particular metric goal, or convert them to a plain spans/events with the default target?

async fn get_blobs_at_height(&self, height: u64) -> Result<Vec<Blob>, anyhow::Error> {
let celestia_blobs = self.get_celestia_blobs_at_height(height).await?;
let mut blobs = Vec::new();
Expand Down
22 changes: 11 additions & 11 deletions protocol-units/da/m1/light-node/src/v1/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,25 @@ impl LightNodeV1 {
// this has an internal timeout based on its building time
// so in the worst case scenario we will roughly double the internal timeout
let uid = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
debug!(target: "movement_timing", uid = %uid, "waiting_for_next_block",);
debug!(target: "movement_telemetry", uid = %uid, "waiting_for_next_block",);
let block = memseq.wait_for_next_block().await?;
match block {
Some(block) => {
info!(target: "movement_timing", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block");
info!(target: "movement_telemetry", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block");
sender.send(block).await?;
Ok(())
}
None => {
// no transactions to include
debug!(target: "movement_timing", uid = %uid, "no_transactions_to_include");
debug!(target: "movement_telemetry", uid = %uid, "no_transactions_to_include");
Ok(())
}
}
}

async fn submit_blocks(&self, blocks: &Vec<block::WrappedBlock>) -> Result<(), anyhow::Error> {
for block in blocks {
info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitting_block");
info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitting_block");
}
// get references to celestia blobs in the wrapped blocks
let block_blobs = blocks
Expand All @@ -108,14 +108,14 @@ impl LightNodeV1 {
// use deref on the wrapped block to get the blob
self.pass_through.submit_celestia_blobs(&block_blobs).await?;
for block in blocks {
info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitted_block");
info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitted_block");
}
Ok(())
}

pub async fn submit_with_heuristic(&self, blocks: Vec<Block>) -> Result<(), anyhow::Error> {
for block in &blocks {
info!(target: "movement_timing", block_id = %block.id(), "submitting_block");
info!(target: "movement_telemetry", block_id = %block.id(), "submitting_block");
}

// wrap the blocks in a struct that can be split and compressed
Expand Down Expand Up @@ -168,7 +168,7 @@ impl LightNodeV1 {

info!("block group results: {:?}", block_group_results);
for block_group_result in &block_group_results {
info!(target: "movement_timing", block_group_result = ?block_group_result, "block_group_result");
info!(target: "movement_telemetry", block_group_result = ?block_group_result, "block_group_result");
}

Ok(())
Expand Down Expand Up @@ -204,7 +204,7 @@ impl LightNodeV1 {
Err(_) => {
// The operation timed out
debug!(
target: "movement_timing",
target: "movement_telemetry",
batch_size = blocks.len(),
"timed_out_building_block"
);
Expand All @@ -213,7 +213,7 @@ impl LightNodeV1 {
}
}

info!(target: "movement_timing", block_count = blocks.len(), "read_blocks");
info!(target: "movement_telemetry", block_count = blocks.len(), "read_blocks");

Ok(blocks)
}
Expand All @@ -232,11 +232,11 @@ impl LightNodeV1 {

// submit the blobs, resizing as needed
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitting_block_batch");
info!(target: "movement_telemetry", %block_id, "submitting_block_batch");
}
self.submit_with_heuristic(blocks).await?;
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitted_block_batch");
info!(target: "movement_telemetry", %block_id, "submitted_block_batch");
}

Ok(())
Expand Down
Loading