diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index b1ba033c5..d7fc4498b 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -2,7 +2,10 @@ use crate::{ stages::channel_reader::ChannelReader, - traits::{ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher}, + traits::{ + ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher, + TelemetryProvider, + }, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -28,16 +31,17 @@ use core::fmt::Debug; /// It is internally responsible for making sure that batches with L1 inclusions block outside it's /// working range are not considered or pruned. #[derive(Debug)] -pub struct BatchQueue +pub struct BatchQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, { /// The rollup config. cfg: RollupConfig, /// The previous stage of the derivation pipeline. - prev: ChannelReader, + prev: ChannelReader, /// The l1 block ref origin: Option, @@ -59,14 +63,15 @@ where fetcher: BF, } -impl BatchQueue +impl BatchQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, { /// Creates a new [BatchQueue] stage. - pub fn new(cfg: RollupConfig, prev: ChannelReader, fetcher: BF) -> Self { + pub fn new(cfg: RollupConfig, prev: ChannelReader, fetcher: BF) -> Self { Self { cfg, prev, @@ -344,11 +349,12 @@ where } #[async_trait] -impl ResettableStage for BatchQueue +impl ResettableStage for BatchQueue where DAP: DataAvailabilityProvider + Send + Debug, CP: ChainProvider + Send + Debug, BF: SafeBlockFetcher + Send + Debug, + T: TelemetryProvider + Send + Debug, { async fn reset(&mut self, base: BlockInfo, _: SystemConfig) -> StageResult<()> { // Copy over the Origin from the next stage. diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index b9e32692c..fd9518328 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -3,7 +3,9 @@ use super::frame_queue::FrameQueue; use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, - traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, + traits::{ + ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider, + }, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -25,29 +27,33 @@ use hashbrown::HashMap; /// to `IngestData`. This means that we can do an ingest and then do a read while becoming too /// large. [ChannelBank] buffers channel frames, and emits full channel data #[derive(Debug)] -pub struct ChannelBank +pub struct ChannelBank where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// The rollup configuration. cfg: Arc, + /// Telemetry + telemetry: T, /// Map of channels by ID. channels: HashMap, /// Channels in FIFO order. channel_queue: VecDeque, /// The previous stage of the derivation pipeline. - prev: FrameQueue, + prev: FrameQueue, } -impl ChannelBank +impl ChannelBank where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// Create a new [ChannelBank] stage. - pub fn new(cfg: Arc, prev: FrameQueue) -> Self { - Self { cfg, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } + pub fn new(cfg: Arc, prev: FrameQueue, telemetry: T) -> Self { + Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } } /// Returns the L1 origin [BlockInfo]. @@ -85,11 +91,23 @@ where // Check if the channel is not timed out. If it has, ignore the frame. if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number { + self.telemetry.write( + alloy_primitives::Bytes::from(alloc::format!("Channel {:?} timed out", frame.id)), + LogLevel::Warning, + ); return Ok(()); } // Ingest the frame. If it fails, ignore the frame. + let frame_id = frame.id; if current_channel.add_frame(frame, origin).is_err() { + self.telemetry.write( + alloy_primitives::Bytes::from(alloc::format!( + "Failed to add frame to channel: {:?}", + frame_id + )), + LogLevel::Warning, + ); return Ok(()); } @@ -102,6 +120,8 @@ where pub fn read(&mut self) -> StageResult> { // Bail if there are no channels to read from. if self.channel_queue.is_empty() { + self.telemetry + .write(alloy_primitives::Bytes::from("No channels to read from"), LogLevel::Debug); return Err(StageError::Eof); } @@ -113,6 +133,10 @@ where // Remove all timed out channels from the front of the `channel_queue`. if channel.open_block_number() + self.cfg.channel_timeout < origin.number { + self.telemetry.write( + alloy_primitives::Bytes::from(alloc::format!("Channel {:?} timed out", first)), + LogLevel::Warning, + ); self.channels.remove(&first); self.channel_queue.pop_front(); return Ok(None); @@ -179,10 +203,11 @@ where } #[async_trait] -impl ResettableStage for ChannelBank +impl ResettableStage for ChannelBank where DAP: DataAvailabilityProvider + Send + Debug, CP: ChainProvider + Send + Debug, + T: TelemetryProvider + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { self.channels.clear(); @@ -198,7 +223,7 @@ mod tests { stages::{ frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, l1_traversal::tests::*, }, - traits::test_utils::TestDAP, + traits::test_utils::{TestDAP, TestTelemetry}, }; use alloc::vec; @@ -207,9 +232,10 @@ mod tests { let mut traversal = new_test_traversal(vec![], vec![]); traversal.block = None; let dap = TestDAP::default(); - let retrieval = L1Retrieval::new(traversal, dap); - let frame_queue = FrameQueue::new(retrieval); - let mut channel_bank = ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mut channel_bank = + ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new()); let frame = Frame::default(); let err = channel_bank.ingest_frame(frame).unwrap_err(); assert_eq!(err, StageError::Custom(anyhow!("No origin"))); @@ -220,9 +246,10 @@ mod tests { let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let frame_queue = FrameQueue::new(retrieval); - let mut channel_bank = ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mut channel_bank = + ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new()); let mut frames = new_test_frames(100000); // Ingest frames until the channel bank is full and it stops increasing in size let mut current_size = 0; @@ -248,9 +275,10 @@ mod tests { let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let frame_queue = FrameQueue::new(retrieval); - let mut channel_bank = ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mut channel_bank = + ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, TestTelemetry::new()); let err = channel_bank.read().unwrap_err(); assert_eq!(err, StageError::Eof); let err = channel_bank.next_data().await.unwrap_err(); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index e89d28122..5708f97e5 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,7 +2,7 @@ use super::channel_bank::ChannelBank; use crate::{ - traits::{ChainProvider, DataAvailabilityProvider}, + traits::{ChainProvider, DataAvailabilityProvider, LogLevel, TelemetryProvider}, types::{Batch, BlockInfo, StageError, StageResult}, }; @@ -13,30 +13,36 @@ use miniz_oxide::inflate::decompress_to_vec_zlib; /// [ChannelReader] is a stateful stage that does the following: #[derive(Debug)] -pub struct ChannelReader +pub struct ChannelReader where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// The previous stage of the derivation pipeline. - prev: ChannelBank, + prev: ChannelBank, + /// Telemetry + telemetry: T, /// The batch reader. next_batch: Option, } -impl ChannelReader +impl ChannelReader where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// Create a new [ChannelReader] stage. - pub fn new(prev: ChannelBank) -> Self { - Self { prev, next_batch: None } + pub fn new(prev: ChannelBank, telemetry: T) -> Self { + Self { prev, telemetry, next_batch: None } } /// Pulls out the next Batch from the available channel. pub async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { + self.telemetry + .write(alloc::format!("Failed to set batch reader: {:?}", e), LogLevel::Error); self.next_channel(); return Err(e); } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index d2e58cfae..73d1b9fd0 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -4,7 +4,9 @@ use core::fmt::Debug; use super::l1_retrieval::L1Retrieval; use crate::{ - traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, + traits::{ + ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider, + }, types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -14,25 +16,29 @@ use async_trait::async_trait; /// The [FrameQueue] stage of the derivation pipeline. /// This stage takes the output of the [L1Retrieval] stage and parses it into frames. #[derive(Debug)] -pub struct FrameQueue +pub struct FrameQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// The previous stage in the pipeline. - pub prev: L1Retrieval, + pub prev: L1Retrieval, + /// Telemetry + pub telemetry: T, /// The current frame queue. queue: VecDeque, } -impl FrameQueue +impl FrameQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, + T: TelemetryProvider + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. - pub fn new(prev: L1Retrieval) -> Self { - Self { prev, queue: VecDeque::new() } + pub fn new(prev: L1Retrieval, telemetry: T) -> Self { + Self { prev, telemetry, queue: VecDeque::new() } } /// Returns the L1 [BlockInfo] origin. @@ -62,6 +68,12 @@ where } // If we did not add more frames but still have more data, retry this function. if self.queue.is_empty() { + self.telemetry.write( + alloy_primitives::Bytes::from( + "Queue is empty after fetching data. Retrying next_frame.", + ), + LogLevel::Debug, + ); return Err(StageError::NotEnoughData); } @@ -70,10 +82,11 @@ where } #[async_trait] -impl ResettableStage for FrameQueue +impl ResettableStage for FrameQueue where DAP: DataAvailabilityProvider + Send + Debug, CP: ChainProvider + Send + Debug, + T: TelemetryProvider + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { self.queue = VecDeque::default(); @@ -85,7 +98,8 @@ where pub(crate) mod tests { use super::*; use crate::{ - stages::l1_traversal::tests::new_populated_test_traversal, traits::test_utils::TestDAP, + stages::l1_traversal::tests::new_populated_test_traversal, + traits::test_utils::{TestDAP, TestTelemetry}, DERIVATION_VERSION_0, }; use alloc::{vec, vec::Vec}; @@ -114,44 +128,48 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_empty_bytes() { + let telemetry = TestTelemetry::new(); let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } #[tokio::test] async fn test_frame_queue_no_frames_decoded() { + let telemetry = TestTelemetry::new(); let traversal = new_populated_test_traversal(); let results = vec![Err(StageError::Eof), Ok(Bytes::default())]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } #[tokio::test] async fn test_frame_queue_wrong_derivation_version() { + let telemetry = TestTelemetry::new(); let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x01]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } #[tokio::test] async fn test_frame_queue_frame_too_short() { + let telemetry = TestTelemetry::new(); let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00, 0x01]))]; let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } @@ -159,10 +177,11 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_single_frame() { let data = new_encoded_test_frames(1); + let telemetry = TestTelemetry::new(); let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![Ok(data)] }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); let frame_decoded = frame_queue.next_frame().await.unwrap(); let frame = new_test_frames(1); assert_eq!(frame[0], frame_decoded); @@ -172,11 +191,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_multiple_frames() { + let telemetry = TestTelemetry::new(); let data = new_encoded_test_frames(3); let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![Ok(data)] }; - let retrieval = L1Retrieval::new(traversal, dap); - let mut frame_queue = FrameQueue::new(retrieval); + let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); + let mut frame_queue = FrameQueue::new(retrieval, telemetry); for i in 0..3 { let frame_decoded = frame_queue.next_frame().await.unwrap(); assert_eq!(frame_decoded.number, i); diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index c54fc10b9..39f09bb57 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -2,7 +2,10 @@ use super::L1Traversal; use crate::{ - traits::{ChainProvider, DataAvailabilityProvider, DataIter, ResettableStage}, + traits::{ + ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, ResettableStage, + TelemetryProvider, + }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; @@ -16,28 +19,32 @@ use async_trait::async_trait; /// [DataAvailabilityProvider]. This data is returned as a generic /// [DataIter] that can be iterated over. #[derive(Debug)] -pub struct L1Retrieval +pub struct L1Retrieval where DAP: DataAvailabilityProvider, CP: ChainProvider, + T: TelemetryProvider, { /// The previous stage in the pipeline. - pub prev: L1Traversal, + pub prev: L1Traversal, + /// Telemetry provider for the L1 retrieval stage. + pub telemetry: T, /// The data availability provider to use for the L1 retrieval stage. pub provider: DAP, /// The current data iterator. pub(crate) data: Option, } -impl L1Retrieval +impl L1Retrieval where DAP: DataAvailabilityProvider, CP: ChainProvider, + T: TelemetryProvider, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] /// stage and given [DataAvailabilityProvider]. - pub fn new(prev: L1Traversal, provider: DAP) -> Self { - Self { prev, provider, data: None } + pub fn new(prev: L1Traversal, provider: DAP, telemetry: T) -> Self { + Self { prev, telemetry, provider, data: None } } /// Returns the current L1 [BlockInfo] origin from the previous @@ -50,6 +57,10 @@ where /// Returns an error if there is no data. pub async fn next_data(&mut self) -> StageResult { if self.data.is_none() { + self.telemetry.write( + alloc::format!("Retrieving data for block: {:?}", self.prev.block), + LogLevel::Debug, + ); let next = self .prev .next_l1_block()? @@ -71,10 +82,11 @@ where } #[async_trait] -impl ResettableStage for L1Retrieval +impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, CP: ChainProvider + Send, + T: TelemetryProvider + Send, { async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?); @@ -87,7 +99,7 @@ mod tests { use super::*; use crate::{ stages::l1_traversal::tests::*, - traits::test_utils::{TestDAP, TestIter}, + traits::test_utils::{TestDAP, TestIter, TestTelemetry}, }; use alloc::vec; use alloy_primitives::Address; @@ -96,7 +108,8 @@ mod tests { async fn test_l1_retrieval_origin() { let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![] }; - let retrieval = L1Retrieval::new(traversal, dap); + let telemetry = TestTelemetry::new(); + let retrieval = L1Retrieval::new(traversal, dap, telemetry); let expected = BlockInfo::default(); assert_eq!(retrieval.origin(), Some(&expected)); } @@ -106,7 +119,8 @@ mod tests { let traversal = new_populated_test_traversal(); let results = vec![Err(StageError::Eof), Ok(Bytes::default())]; let dap = TestDAP { results }; - let mut retrieval = L1Retrieval::new(traversal, dap); + let telemetry = TestTelemetry::new(); + let mut retrieval = L1Retrieval::new(traversal, dap, telemetry); assert_eq!(retrieval.data, None); let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); @@ -130,9 +144,11 @@ mod tests { // Create a new traversal with no blocks or receipts. // This would bubble up an error if the prev stage // (traversal) is called in the retrieval stage. + let telemetry = TestTelemetry::new(); let traversal = new_test_traversal(vec![], vec![]); let dap = TestDAP { results: vec![] }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + let mut retrieval = + L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) }; let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); assert!(retrieval.data.is_some()); @@ -146,9 +162,11 @@ mod tests { open_data_calls: vec![(BlockInfo::default(), Address::default())], results: vec![Err(StageError::Eof)], }; + let telemetry = TestTelemetry::new(); let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![] }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + let mut retrieval = + L1Retrieval { prev: traversal, telemetry, provider: dap, data: Some(data) }; let data = retrieval.next_data().await.unwrap_err(); assert_eq!(data, StageError::Eof); assert!(retrieval.data.is_none()); diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 2dfe1f354..c4efca9df 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -1,7 +1,7 @@ //! Contains the [L1Traversal] stage of the derivation pipeline. use crate::{ - traits::{ChainProvider, ResettableStage}, + traits::{ChainProvider, LogLevel, ResettableStage, TelemetryProvider}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; @@ -15,11 +15,13 @@ use async_trait::async_trait; /// it fetches the next L1 [BlockInfo] from the data source and updates the [SystemConfig] /// with the receipts from the block. #[derive(Debug, Clone)] -pub struct L1Traversal { +pub struct L1Traversal { /// The current block in the traversal stage. pub(crate) block: Option, /// The data source for the traversal stage. data_source: Provider, + /// The telemetry provider for the traversal stage. + telemetry: Telemetry, /// Signals whether or not the traversal stage is complete. done: bool, /// The system config. @@ -28,12 +30,13 @@ pub struct L1Traversal { pub rollup_config: Arc, } -impl L1Traversal { +impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, cfg: Arc) -> Self { + pub fn new(data_source: F, cfg: Arc, telemetry: T) -> Self { Self { block: Some(BlockInfo::default()), data_source, + telemetry, done: false, system_config: SystemConfig::default(), rollup_config: cfg, @@ -69,7 +72,16 @@ impl L1Traversal { pub async fn advance_l1_block(&mut self) -> StageResult<()> { // Pull the next block or return EOF. // StageError::EOF has special handling further up the pipeline. - let block = self.block.ok_or(StageError::Eof)?; + let block = match self.block { + Some(block) => block, + None => { + self.telemetry.write( + alloy_primitives::Bytes::from("L1Traversal: No block to advance to"), + LogLevel::Warning, + ); + return Err(StageError::Eof); + } + }; let next_l1_origin = match self.data_source.block_info_by_number(block.number + 1).await { Ok(block) => block, Err(e) => return Err(StageError::BlockInfoFetch(e)), @@ -101,7 +113,7 @@ impl L1Traversal { } #[async_trait] -impl ResettableStage for L1Traversal { +impl ResettableStage for L1Traversal { async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { self.block = Some(base); self.done = false; @@ -114,7 +126,7 @@ impl ResettableStage for L1Traversal { pub(crate) mod tests { use super::*; use crate::{ - traits::test_utils::TestChainProvider, + traits::test_utils::{TestChainProvider, TestTelemetry}, types::{Receipt, CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC}, }; use alloc::vec; @@ -153,8 +165,9 @@ pub(crate) mod tests { pub(crate) fn new_test_traversal( blocks: alloc::vec::Vec, receipts: alloc::vec::Vec, - ) -> L1Traversal { + ) -> L1Traversal { let mut provider = TestChainProvider::default(); + let telemetry = TestTelemetry::default(); let rollup_config = RollupConfig { l1_system_config_address: L1_SYS_CONFIG_ADDR, ..RollupConfig::default() @@ -166,10 +179,10 @@ pub(crate) mod tests { let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); provider.insert_receipts(hash, vec![receipt.clone()]); } - L1Traversal::new(provider, Arc::new(rollup_config)) + L1Traversal::new(provider, Arc::new(rollup_config), telemetry) } - pub(crate) fn new_populated_test_traversal() -> L1Traversal { + pub(crate) fn new_populated_test_traversal() -> L1Traversal { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); new_test_traversal(blocks, receipts) diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 5e9281617..b2db574c5 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -7,5 +7,8 @@ pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter, SafeBl mod stages; pub use stages::ResettableStage; +mod telemetry; +pub use telemetry::{LogLevel, TelemetryProvider}; + #[cfg(test)] pub mod test_utils; diff --git a/crates/derive/src/traits/telemetry.rs b/crates/derive/src/traits/telemetry.rs new file mode 100644 index 000000000..49d0d57cf --- /dev/null +++ b/crates/derive/src/traits/telemetry.rs @@ -0,0 +1,23 @@ +//! Traits for telemetry. + +use alloy_primitives::Bytes; + +/// Logging Levels. +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogLevel { + /// Debug level. + Debug, + /// Info level. + #[default] + Info, + /// Warning level. + Warning, + /// Error level. + Error, +} + +/// A trait for telemetry providers. +pub trait TelemetryProvider { + /// Write the telemetry data with LOG_LEVEL. + fn write>(&self, data: I, level: LogLevel); +} diff --git a/crates/derive/src/traits/test_utils.rs b/crates/derive/src/traits/test_utils.rs index 94eaa6cbc..d5ad6387f 100644 --- a/crates/derive/src/traits/test_utils.rs +++ b/crates/derive/src/traits/test_utils.rs @@ -5,3 +5,6 @@ pub use data_sources::TestChainProvider; pub mod data_availability; pub use data_availability::{TestDAP, TestIter}; + +mod telemetry; +pub use telemetry::TestTelemetry; diff --git a/crates/derive/src/traits/test_utils/telemetry.rs b/crates/derive/src/traits/test_utils/telemetry.rs new file mode 100644 index 000000000..bb1448cfd --- /dev/null +++ b/crates/derive/src/traits/test_utils/telemetry.rs @@ -0,0 +1,30 @@ +//! Test Utilities for Telemetry + +use crate::traits::{LogLevel, TelemetryProvider}; +use alloc::{rc::Rc, vec::Vec}; +use alloy_primitives::Bytes; +use core::cell::RefCell; + +/// Mock telemetry provider +#[derive(Debug, Default)] +pub struct TestTelemetry { + /// Holds telemetry data with log levels for assertions. + pub(crate) telemetry_calls: Rc>>, +} + +impl TestTelemetry { + /// Creates a new [TestTelemetry] instance. + pub fn new() -> Self { + Self::default() + } +} + +impl TelemetryProvider for TestTelemetry { + fn write>(&self, data: I, level: LogLevel) { + let data = (data.into(), level); + { + let mut calls = self.telemetry_calls.borrow_mut(); + (*calls).push(data); + } + } +}