From f6062ff465852c5ec1e7fc5a2754dc703211729c Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 3 Apr 2024 11:14:30 -0400 Subject: [PATCH 1/2] feat(derive): initial pass at telemetry --- crates/derive/src/stages/channel_bank.rs | 63 ++++++++++++++----- crates/derive/src/stages/channel_reader.rs | 19 ++++-- crates/derive/src/stages/frame_queue.rs | 58 +++++++++++------ crates/derive/src/stages/l1_retrieval.rs | 37 ++++++++--- crates/derive/src/stages/l1_traversal.rs | 32 +++++++--- crates/derive/src/traits/mod.rs | 3 + crates/derive/src/traits/telemetry.rs | 23 +++++++ crates/derive/src/traits/test_utils.rs | 3 + .../derive/src/traits/test_utils/telemetry.rs | 31 +++++++++ 9 files changed, 211 insertions(+), 58 deletions(-) create mode 100644 crates/derive/src/traits/telemetry.rs create mode 100644 crates/derive/src/traits/test_utils/telemetry.rs diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 4da1ee609..1451b55be 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -3,7 +3,9 @@ use super::frame_queue::FrameQueue; use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, - traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, + traits::{ + ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider, + }, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::sync::Arc; @@ -26,30 +28,35 @@ use hashbrown::HashMap; /// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large. /// [ChannelBank] buffers channel frames, and emits full channel data #[derive(Debug)] -pub struct ChannelBank +pub struct ChannelBank where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// The rollup configuration. cfg: Arc, + /// Telemetry + telemetry: T, /// Map of channels by ID. channels: HashMap, /// Channels in FIFO order. channel_queue: VecDeque, /// The previous stage of the derivation pipeline. - prev: FrameQueue, + prev: FrameQueue, } -impl ChannelBank +impl ChannelBank where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// Create a new [ChannelBank] stage. - pub fn new(cfg: RollupConfig, prev: FrameQueue) -> Self { + pub fn new(cfg: RollupConfig, prev: FrameQueue, telemetry: T) -> Self { Self { cfg: Arc::new(cfg), + telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev, @@ -97,11 +104,23 @@ where // Check if the channel is not timed out. If it has, ignore the frame. if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number { + self.telemetry.write( + alloy_primitives::Bytes::from(alloc::format!("Channel {:?} timed out", frame.id)), + LogLevel::Warning, + ); return Ok(()); } // Ingest the frame. If it fails, ignore the frame. + let frame_id = frame.id; if current_channel.add_frame(frame, origin).is_err() { + self.telemetry.write( + alloy_primitives::Bytes::from(alloc::format!( + "Failed to add frame to channel: {:?}", + frame_id + )), + LogLevel::Warning, + ); return Ok(()); } @@ -114,6 +133,10 @@ where pub fn read(&mut self) -> StageResult> { // Bail if there are no channels to read from. if self.channel_queue.is_empty() { + self.telemetry.write( + alloy_primitives::Bytes::from("No channels to read from"), + LogLevel::Debug, + ); return Err(StageError::Eof); } @@ -128,6 +151,10 @@ where // Remove all timed out channels from the front of the `channel_queue`. if channel.open_block_number() + self.cfg.channel_timeout < origin.number { + self.telemetry.write( + alloy_primitives::Bytes::from(alloc::format!("Channel {:?} timed out", first)), + LogLevel::Warning, + ); self.channels.remove(&first); self.channel_queue.pop_front(); return Ok(None); @@ -195,10 +222,11 @@ where } #[async_trait] -impl ResettableStage for ChannelBank +impl ResettableStage for ChannelBank where DAP: DataAvailabilityProvider + Send + Debug, CP: ChainProvider + Send + Debug, + T: TelemetryProvider + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { self.channels.clear(); @@ -213,7 +241,7 @@ mod tests { use crate::stages::frame_queue::tests::new_test_frames; use crate::stages::l1_retrieval::L1Retrieval; use crate::stages::l1_traversal::tests::new_test_traversal; - use crate::traits::test_utils::TestDAP; + use crate::traits::test_utils::{TestDAP, TestTelemetry}; use alloc::vec; #[test] @@ -221,9 +249,10 @@ mod tests { let mut traversal = new_test_traversal(false, false); traversal.block = None; let dap = TestDAP::default(); - let retrieval = L1Retrieval::new(traversal, dap); - let frame_queue = FrameQueue::new(retrieval); - let mut channel_bank = ChannelBank::new(RollupConfig::default(), frame_queue); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mut channel_bank = + ChannelBank::new(RollupConfig::default(), frame_queue, TestTelemetry::new()); let frame = Frame::default(); let err = channel_bank.ingest_frame(frame).unwrap_err(); assert_eq!(err, StageError::Custom(anyhow!("No origin"))); @@ -234,9 +263,10 @@ mod tests { let traversal = new_test_traversal(true, true); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let frame_queue = FrameQueue::new(retrieval); - let mut channel_bank = ChannelBank::new(RollupConfig::default(), frame_queue); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mut channel_bank = + ChannelBank::new(RollupConfig::default(), frame_queue, TestTelemetry::new()); let mut frames = new_test_frames(100000); // Ingest frames until the channel bank is full and it stops increasing in size let mut current_size = 0; @@ -262,9 +292,10 @@ mod tests { let traversal = new_test_traversal(true, true); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let frame_queue = FrameQueue::new(retrieval); - let mut channel_bank = ChannelBank::new(RollupConfig::default(), frame_queue); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mut channel_bank = + ChannelBank::new(RollupConfig::default(), frame_queue, TestTelemetry::new()); let err = channel_bank.read().unwrap_err(); assert_eq!(err, StageError::Eof); let err = channel_bank.next_data().await.unwrap_err(); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 05c2edc78..199b961ec 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,7 +2,7 @@ use super::channel_bank::ChannelBank; use crate::{ - traits::{ChainProvider, DataAvailabilityProvider}, + traits::{ChainProvider, DataAvailabilityProvider, LogLevel, TelemetryProvider}, types::{Batch, BlockInfo, StageError, StageResult}, }; use alloc::vec::Vec; @@ -12,26 +12,31 @@ use miniz_oxide::inflate::decompress_to_vec; /// [ChannelReader] is a stateful stage that does the following: #[derive(Debug)] -pub struct ChannelReader +pub struct ChannelReader where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// The previous stage of the derivation pipeline. - prev: ChannelBank, + prev: ChannelBank, + /// Telemetry + telemetry: T, /// The batch reader. next_batch: Option, } -impl ChannelReader +impl ChannelReader where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// Create a new [ChannelReader] stage. - pub fn new(prev: ChannelBank) -> Self { + pub fn new(prev: ChannelBank, telemetry: T) -> Self { Self { prev, + telemetry, next_batch: None, } } @@ -39,6 +44,10 @@ where /// Pulls out the next Batch from the available channel. pub async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { + self.telemetry.write( + alloc::format!("Failed to set batch reader: {:?}", e), + LogLevel::Error, + ); self.next_channel(); return Err(e); } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 51221eb63..d6a2686cd 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -4,7 +4,9 @@ use core::fmt::Debug; use super::l1_retrieval::L1Retrieval; use crate::{ - traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, + traits::{ + ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider, + }, types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -13,26 +15,31 @@ use async_trait::async_trait; /// The frame queue stage of the derivation pipeline. #[derive(Debug)] -pub struct FrameQueue +pub struct FrameQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// The previous stage in the pipeline. - pub prev: L1Retrieval, + pub prev: L1Retrieval, + /// Telemetry + pub telemetry: T, /// The current frame queue. queue: VecDeque, } -impl FrameQueue +impl FrameQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// Create a new frame queue stage. - pub fn new(prev: L1Retrieval) -> Self { + pub fn new(prev: L1Retrieval, telemetry: T) -> Self { Self { prev, + telemetry, queue: VecDeque::new(), } } @@ -59,6 +66,12 @@ where } // If we did not add more frames but still have more data, retry this function. if self.queue.is_empty() { + self.telemetry.write( + alloy_primitives::Bytes::from( + "Queue is empty after fetching data. Retrying next_frame.", + ), + LogLevel::Debug, + ); return Err(anyhow!("Not enough data").into()); } @@ -69,10 +82,11 @@ where } #[async_trait] -impl ResettableStage for FrameQueue +impl ResettableStage for FrameQueue where DAP: DataAvailabilityProvider + Send + Debug, CP: ChainProvider + Send + Debug, + T: TelemetryProvider + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { self.queue = VecDeque::default(); @@ -84,7 +98,7 @@ where pub(crate) mod tests { use super::*; use crate::stages::l1_traversal::tests::new_test_traversal; - use crate::traits::test_utils::TestDAP; + use crate::traits::test_utils::{TestDAP, TestTelemetry}; use crate::DERIVATION_VERSION_0; use alloc::vec; use alloc::vec::Vec; @@ -113,44 +127,48 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_empty_bytes() { + let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(true, true); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, anyhow!("Not enough data").into()); } #[tokio::test] async fn test_frame_queue_no_frames_decoded() { + let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(true, true); let results = vec![Err(StageError::Eof), Ok(Bytes::default())]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, anyhow!("Not enough data").into()); } #[tokio::test] async fn test_frame_queue_wrong_derivation_version() { + let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(true, true); let results = vec![Ok(Bytes::from(vec![0x01]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, anyhow!("Unsupported derivation version").into()); } #[tokio::test] async fn test_frame_queue_frame_too_short() { + let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(true, true); let results = vec![Ok(Bytes::from(vec![0x00, 0x01]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, anyhow!("Frame too short to decode").into()); } @@ -158,12 +176,13 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_single_frame() { let data = new_encoded_test_frames(1); + let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(true, true); let dap = TestDAP { results: vec![Ok(data)], }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let frame_decoded = frame_queue.next_frame().await.unwrap(); let frame = new_test_frames(1); assert_eq!(frame[0], frame_decoded); @@ -173,13 +192,14 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_multiple_frames() { + let telemetry = TestTelemetry::new(); let data = new_encoded_test_frames(3); let traversal = new_test_traversal(true, true); let dap = TestDAP { results: vec![Ok(data)], }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); for i in 0..3 { let frame_decoded = frame_queue.next_frame().await.unwrap(); assert_eq!(frame_decoded.number, i); diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index c69dea9c3..273345696 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -2,7 +2,10 @@ use super::L1Traversal; use crate::{ - traits::{ChainProvider, DataAvailabilityProvider, DataIter, ResettableStage}, + traits::{ + ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, ResettableStage, + TelemetryProvider, + }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; @@ -12,28 +15,33 @@ use async_trait::async_trait; /// The L1 retrieval stage of the derivation pipeline. #[derive(Debug)] -pub struct L1Retrieval +pub struct L1Retrieval where DAP: DataAvailabilityProvider, CP: ChainProvider, + T: TelemetryProvider, { /// The previous stage in the pipeline. - pub prev: L1Traversal, + pub prev: L1Traversal, + /// Telemetry provider for the L1 retrieval stage. + pub telemetry: T, /// The data availability provider to use for the L1 retrieval stage. pub provider: DAP, /// The current data iterator. pub(crate) data: Option, } -impl L1Retrieval +impl L1Retrieval where DAP: DataAvailabilityProvider, CP: ChainProvider, + T: TelemetryProvider, { /// Creates a new L1 retrieval stage with the given data availability provider and previous stage. - pub fn new(prev: L1Traversal, provider: DAP) -> Self { + pub fn new(prev: L1Traversal, provider: DAP, telemetry: T) -> Self { Self { prev, + telemetry, provider, data: None, } @@ -49,6 +57,10 @@ where /// If there is no data, it returns an error. pub async fn next_data(&mut self) -> StageResult { if self.data.is_none() { + self.telemetry.write( + alloc::format!("Retrieving data for block: {:?}", self.prev.block), + LogLevel::Debug, + ); let next = self .prev .next_l1_block()? @@ -73,10 +85,11 @@ where } #[async_trait] -impl ResettableStage for L1Retrieval +impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, CP: ChainProvider + Send, + T: TelemetryProvider + Send, { async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?); @@ -88,7 +101,7 @@ where mod tests { use super::*; use crate::stages::l1_traversal::tests::new_test_traversal; - use crate::traits::test_utils::{TestDAP, TestIter}; + use crate::traits::test_utils::{TestDAP, TestIter, TestTelemetry}; use alloc::vec; use alloy_primitives::Address; @@ -96,7 +109,8 @@ mod tests { async fn test_l1_retrieval_origin() { let traversal = new_test_traversal(true, true); let dap = TestDAP { results: vec![] }; - let retrieval = L1Retrieval::new(traversal, dap); + let telemetry = TestTelemetry::new(); + let retrieval = L1Retrieval::new(traversal, dap, telemetry); let expected = BlockInfo::default(); assert_eq!(retrieval.origin(), Some(&expected)); } @@ -106,7 +120,8 @@ mod tests { let traversal = new_test_traversal(true, true); let results = vec![Err(StageError::Eof), Ok(Bytes::default())]; let dap = TestDAP { results }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let telemetry = TestTelemetry::new(); + let mut retrieval = L1Retrieval::new(traversal, dap, telemetry); assert_eq!(retrieval.data, None); let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); @@ -131,9 +146,11 @@ mod tests { // This would bubble up an error if the prev stage // (traversal) is called in the retrieval stage. let traversal = new_test_traversal(false, false); + let telemetry = TestTelemetry::new(); let dap = TestDAP { results: vec![] }; let mut retrieval = L1Retrieval { prev: traversal, + telemetry, provider: dap, data: Some(data), }; @@ -150,10 +167,12 @@ mod tests { open_data_calls: vec![(BlockInfo::default(), Address::default())], results: vec![Err(StageError::Eof)], }; + let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(true, true); let dap = TestDAP { results: vec![] }; let mut retrieval = L1Retrieval { prev: traversal, + telemetry, provider: dap, data: Some(data), }; diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 4e05f7671..e0aa99a53 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -1,7 +1,7 @@ //! Contains the L1 traversal stage of the derivation pipeline. use crate::{ - traits::{ChainProvider, ResettableStage}, + traits::{ChainProvider, LogLevel, ResettableStage, TelemetryProvider}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; @@ -11,11 +11,13 @@ use async_trait::async_trait; /// The L1 traversal stage of the derivation pipeline. #[derive(Debug, Clone)] -pub struct L1Traversal { +pub struct L1Traversal { /// The current block in the traversal stage. pub(crate) block: Option, /// The data source for the traversal stage. data_source: Provider, + /// The telemetry provider for the traversal stage. + telemetry: Telemetry, /// Signals whether or not the traversal stage has been completed. done: bool, /// The system config @@ -24,12 +26,13 @@ pub struct L1Traversal { pub rollup_config: Arc, } -impl L1Traversal { +impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, cfg: RollupConfig) -> Self { + pub fn new(data_source: F, cfg: RollupConfig, telemetry: T) -> Self { Self { block: Some(BlockInfo::default()), data_source, + telemetry, done: false, system_config: SystemConfig::default(), rollup_config: Arc::new(cfg), @@ -62,7 +65,17 @@ impl L1Traversal { pub async fn advance_l1_block(&mut self) -> StageResult<()> { // Pull the next block or return EOF which has special // handling further up the pipeline. - let block = self.block.ok_or(StageError::Eof)?; + let block = match self.block { + Some(block) => block, + None => { + self.telemetry.write( + alloy_primitives::Bytes::from("L1Traversal: No block to advance to"), + LogLevel::Warning, + ); + return Err(StageError::Eof); + } + }; + let next_l1_origin = self .data_source .block_info_by_number(block.number + 1) @@ -96,7 +109,7 @@ impl L1Traversal { } #[async_trait] -impl ResettableStage for L1Traversal { +impl ResettableStage for L1Traversal { async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { self.block = Some(base); self.done = false; @@ -108,7 +121,7 @@ impl ResettableStage for L1Traversal { #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::traits::test_utils::TestChainProvider; + use crate::traits::test_utils::{TestChainProvider, TestTelemetry}; use crate::types::{Receipt, CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC}; use alloc::vec; use alloy_primitives::{address, b256, hex, Address, Bytes, Log, LogData, B256}; @@ -134,8 +147,9 @@ pub(crate) mod tests { pub(crate) fn new_test_traversal( blocks: bool, receipts: bool, - ) -> L1Traversal { + ) -> L1Traversal { let mut provider = TestChainProvider::default(); + let telemetry = TestTelemetry::default(); let rollup_config = RollupConfig { l1_system_config_address: L1_SYS_CONFIG_ADDR, ..RollupConfig::default() @@ -160,7 +174,7 @@ pub(crate) mod tests { let receipts = vec![receipt.clone(), Receipt::default(), receipt]; provider.insert_receipts(block.hash, receipts); } - L1Traversal::new(provider, rollup_config) + L1Traversal::new(provider, rollup_config, telemetry) } #[tokio::test] diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index cd1d1ab9f..6f10332bf 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -6,5 +6,8 @@ pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter}; mod stages; pub use stages::ResettableStage; +mod telemetry; +pub use telemetry::{LogLevel, TelemetryProvider}; + #[cfg(test)] pub mod test_utils; diff --git a/crates/derive/src/traits/telemetry.rs b/crates/derive/src/traits/telemetry.rs new file mode 100644 index 000000000..49d0d57cf --- /dev/null +++ b/crates/derive/src/traits/telemetry.rs @@ -0,0 +1,23 @@ +//! Traits for telemetry. + +use alloy_primitives::Bytes; + +/// Logging Levels. +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogLevel { + /// Debug level. + Debug, + /// Info level. + #[default] + Info, + /// Warning level. + Warning, + /// Error level. + Error, +} + +/// A trait for telemetry providers. +pub trait TelemetryProvider { + /// Write the telemetry data with LOG_LEVEL. + fn write>(&self, data: I, level: LogLevel); +} diff --git a/crates/derive/src/traits/test_utils.rs b/crates/derive/src/traits/test_utils.rs index 94eaa6cbc..d5ad6387f 100644 --- a/crates/derive/src/traits/test_utils.rs +++ b/crates/derive/src/traits/test_utils.rs @@ -5,3 +5,6 @@ pub use data_sources::TestChainProvider; pub mod data_availability; pub use data_availability::{TestDAP, TestIter}; + +mod telemetry; +pub use telemetry::TestTelemetry; diff --git a/crates/derive/src/traits/test_utils/telemetry.rs b/crates/derive/src/traits/test_utils/telemetry.rs new file mode 100644 index 000000000..3571cb1fd --- /dev/null +++ b/crates/derive/src/traits/test_utils/telemetry.rs @@ -0,0 +1,31 @@ +//! Test Utilities for Telemetry + +use crate::traits::{LogLevel, TelemetryProvider}; +use alloc::rc::Rc; +use alloc::vec::Vec; +use alloy_primitives::Bytes; +use core::cell::RefCell; + +/// Mock telemetry provider +#[derive(Debug, Default)] +pub struct TestTelemetry { + /// Holds telemetry data with log levels for assertions. + pub(crate) telemetry_calls: Rc>>, +} + +impl TestTelemetry { + /// Creates a new [TestTelemetry] instance. + pub fn new() -> Self { + Self::default() + } +} + +impl TelemetryProvider for TestTelemetry { + fn write>(&self, data: I, level: LogLevel) { + let data = (data.into(), level); + { + let mut calls = self.telemetry_calls.borrow_mut(); + (*calls).push(data); + } + } +} From 62f3efe9f8a89d97f83b7146eafb3c266937fd4f Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 3 Apr 2024 13:30:14 -0400 Subject: [PATCH 2/2] fix(derive): formatting --- crates/derive/src/stages/channel_bank.rs | 6 ++---- crates/derive/src/stages/channel_reader.rs | 6 ++---- crates/derive/src/stages/frame_queue.rs | 3 ++- crates/derive/src/stages/l1_retrieval.rs | 9 ++++++--- crates/derive/src/traits/test_utils/telemetry.rs | 3 +-- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index e157d5933..554d218f3 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -126,10 +126,8 @@ where pub fn read(&mut self) -> StageResult> { // Bail if there are no channels to read from. if self.channel_queue.is_empty() { - self.telemetry.write( - alloy_primitives::Bytes::from("No channels to read from"), - LogLevel::Debug, - ); + self.telemetry + .write(alloy_primitives::Bytes::from("No channels to read from"), LogLevel::Debug); return Err(StageError::Eof); } diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 8182de7d6..70fef4a09 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -40,10 +40,8 @@ where /// Pulls out the next Batch from the available channel. pub async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { - self.telemetry.write( - alloc::format!("Failed to set batch reader: {:?}", e), - LogLevel::Error, - ); + self.telemetry + .write(alloc::format!("Failed to set batch reader: {:?}", e), LogLevel::Error); self.next_channel(); return Err(e); } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 56347f604..6d4179cf0 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -92,7 +92,8 @@ where pub(crate) mod tests { use super::*; use crate::{ - stages::l1_traversal::tests::new_test_traversal, traits::test_utils::{TestDAP, TestTelemetry}, + stages::l1_traversal::tests::new_test_traversal, + traits::test_utils::{TestDAP, TestTelemetry}, DERIVATION_VERSION_0, }; use alloc::{vec, vec::Vec}; diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 26fbb1007..76800db0e 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -37,7 +37,8 @@ where CP: ChainProvider, T: TelemetryProvider, { - /// Creates a new L1 retrieval stage with the given data availability provider and previous stage. + /// Creates a new L1 retrieval stage with the given data availability provider and previous + /// stage. pub fn new(prev: L1Traversal, provider: DAP, telemetry: T) -> Self { Self { prev, telemetry, provider, data: None } } @@ -142,7 +143,8 @@ mod tests { let traversal = new_test_traversal(false, false); let telemetry = TestTelemetry::new(); let dap = TestDAP { results: vec![] }; - let mut retrieval = L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) }; + let mut retrieval = + L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) }; let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); assert!(retrieval.data.is_some()); @@ -159,7 +161,8 @@ mod tests { let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(true, true); let dap = TestDAP { results: vec![] }; - let mut retrieval = L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) }; + let mut retrieval = + L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) }; let data = retrieval.next_data().await.unwrap_err(); assert_eq!(data, StageError::Eof); assert!(retrieval.data.is_none()); diff --git a/crates/derive/src/traits/test_utils/telemetry.rs b/crates/derive/src/traits/test_utils/telemetry.rs index 3571cb1fd..bb1448cfd 100644 --- a/crates/derive/src/traits/test_utils/telemetry.rs +++ b/crates/derive/src/traits/test_utils/telemetry.rs @@ -1,8 +1,7 @@ //! Test Utilities for Telemetry use crate::traits::{LogLevel, TelemetryProvider}; -use alloc::rc::Rc; -use alloc::vec::Vec; +use alloc::{rc::Rc, vec::Vec}; use alloy_primitives::Bytes; use core::cell::RefCell;