diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 7d3224373..9db254334 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -180,7 +180,9 @@ mod tests { RollupConfig, SingleBatch, StageError, StageResult, }; use crate::{ - stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue}, + stages::test_utils::{ + new_attributes_provider, MockAttributesBuilder, MockAttributesProvider, + }, traits::test_utils::TestTelemetry, types::RawTransaction, }; @@ -191,10 +193,10 @@ mod tests { cfg: Option, origin: Option, batches: Vec>, - ) -> AttributesQueue { + ) -> AttributesQueue { let cfg = cfg.unwrap_or_default(); let telemetry = TestTelemetry::new(); - let mock_batch_queue = new_mock_batch_queue(origin, batches); + let mock_batch_queue = new_attributes_provider(origin, batches); let mock_attributes_builder = MockAttributesBuilder::default(); AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder) } @@ -282,11 +284,11 @@ mod tests { async fn test_create_next_attributes_success() { let cfg = RollupConfig::default(); let telemetry = TestTelemetry::new(); - let mock_batch_queue = new_mock_batch_queue(None, vec![]); + let mock = new_attributes_provider(None, vec![]); let mut payload_attributes = PayloadAttributes::default(); let mock_builder = MockAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; - let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + let mut aq = AttributesQueue::new(cfg, mock, telemetry, mock_builder); let parent = L2BlockInfo::default(); let txs = vec![RawTransaction::default(), RawTransaction::default()]; let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; @@ -309,10 +311,10 @@ mod tests { async fn test_next_attributes_load_batch_last_in_span() { let cfg = RollupConfig::default(); let telemetry = TestTelemetry::new(); - let mock_batch_queue = new_mock_batch_queue(None, vec![Ok(Default::default())]); + let mock = new_attributes_provider(None, vec![Ok(Default::default())]); let mut pa = PayloadAttributes::default(); let mock_builder = MockAttributesBuilder { attributes: vec![Ok(pa.clone())] }; - let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + let mut aq = AttributesQueue::new(cfg, mock, telemetry, mock_builder); // If we load the batch, we should get the last in span. // But it won't take it so it will be available in the next_attributes call. let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index de7f68101..6e9809932 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -1,21 +1,29 @@ //! This module contains the `BatchQueue` stage implementation. use crate::{ - stages::channel_reader::ChannelReader, - traits::{ - ChainProvider, DataAvailabilityProvider, OriginProvider, ResettableStage, SafeBlockFetcher, - TelemetryProvider, - }, + stages::attributes_queue::AttributesProvider, + traits::{LogLevel, OriginProvider, ResettableStage, SafeBlockFetcher, TelemetryProvider}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, }, }; use alloc::{boxed::Box, vec::Vec}; +use alloy_primitives::Bytes; use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; +/// Provides batches for the [BatchQueue] stage. +#[async_trait] +pub trait BatchQueueProvider { + /// Returns the next batch in the [ChannelReader] stage, if the stage is not complete. + /// This function can only be called once while the stage is in progress, and will return + /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is + /// complete and the batch has been consumed, an [StageError::Eof] error is returned. + async fn next_batch(&mut self) -> StageResult; +} + /// [BatchQueue] is responsible for o rdering unordered batches /// and gnerating empty batches when the sequence window has passed. /// @@ -31,20 +39,22 @@ use core::fmt::Debug; /// It is internally responsible for making sure that batches with L1 inclusions block outside it's /// working range are not considered or pruned. #[derive(Debug)] -pub struct BatchQueue +pub struct BatchQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: BatchQueueProvider + OriginProvider + Debug, BF: SafeBlockFetcher + Debug, T: TelemetryProvider + Debug, { /// The rollup config. cfg: RollupConfig, /// The previous stage of the derivation pipeline. - prev: ChannelReader, + prev: P, /// The l1 block ref origin: Option, + /// Telemetry + telemetry: T, + /// A consecutive, time-centric window of L1 Blocks. /// Every L1 origin of unsafe L2 Blocks must be included in this list. /// If every L2 Block corresponding to a single L1 Block becomes safe, @@ -63,19 +73,19 @@ where fetcher: BF, } -impl BatchQueue +impl BatchQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: BatchQueueProvider + OriginProvider + Debug, BF: SafeBlockFetcher + Debug, T: TelemetryProvider + Debug, { /// Creates a new [BatchQueue] stage. - pub fn new(cfg: RollupConfig, prev: ChannelReader, fetcher: BF) -> Self { + pub fn new(cfg: RollupConfig, prev: P, telemetry: T, fetcher: BF) -> Self { Self { cfg, prev, origin: None, + telemetry, l1_blocks: Vec::new(), batches: Vec::new(), next_spans: Vec::new(), @@ -83,11 +93,6 @@ where } } - /// Returns if the previous batch was the last in the span. - pub fn is_last_in_span(&self) -> bool { - self.next_spans.is_empty() - } - /// Pops the next batch from the current queued up span-batch cache. /// The parent is used to set the parent hash of the batch. /// The parent is verified when the batch is later validated. @@ -100,119 +105,6 @@ where Some(next) } - /// Returns the next valid batch upon the given safe head. - /// Also returns the boolean that indicates if the batch is the last block in the batch. - pub async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult { - if !self.next_spans.is_empty() { - // There are cached singular batches derived from the span batch. - // Check if the next cached batch matches the given parent block. - if self.next_spans[0].timestamp == parent.block_info.timestamp + self.cfg.block_time { - return self - .pop_next_batch(parent) - .ok_or(anyhow!("failed to pop next batch from span batch").into()); - } - // Parent block does not match the next batch. - // Means the previously returned batch is invalid. - // Drop cached batches and find another batch. - self.next_spans.clear(); - // TODO: log that the provided parent block does not match the next batch. - // TODO: metrice the internal batch drop. - } - - // If the epoch is advanced, update the l1 blocks. - // Advancing epoch must be done after the pipeline successfully applies the entire span - // batch to the chain. - // Because the span batch can be reverted during processing the batch, then we must - // preserve existing l1 blocks to verify the epochs of the next candidate batch. - if !self.l1_blocks.is_empty() && parent.l1_origin.number > self.l1_blocks[0].number { - for (i, block) in self.l1_blocks.iter().enumerate() { - if parent.l1_origin.number == block.number { - self.l1_blocks.drain(0..i); - // TODO: log that the pipelien has advanced the epoch. - // TODO: metrice the internal epoch advancement. - break; - } - } - // If the origin of the parent block is not included, we must advance the origin. - } - - // NOTE: The origin is used to determine if it's behind. - // It is the future origin that gets saved into the l1 blocks array. - // We always update the origin of this stage if it's not the same so - // after the update code runs, this is consistent. - let origin_behind = - self.origin.map_or(true, |origin| origin.number < parent.l1_origin.number); - - // Advance the origin if needed. - // The entire pipeline has the same origin. - // Batches prior to the l1 origin of the l2 safe head are not accepted. - if self.origin != self.prev.origin().copied() { - self.origin = self.prev.origin().cloned(); - if !origin_behind { - let origin = self.origin.as_ref().ok_or_else(|| anyhow!("missing origin"))?; - self.l1_blocks.push(*origin); - } else { - // This is to handle the special case of startup. - // At startup, the batch queue is reset and includes the - // l1 origin. That is the only time where immediately after - // reset is called, the origin behind is false. - self.l1_blocks.clear(); - } - // TODO: log batch queue origin advancement. - } - - // Load more data into the batch queue. - let mut out_of_data = false; - match self.prev.next_batch().await { - Ok(b) => { - if !origin_behind { - self.add_batch(b, parent).ok(); - } else { - // TODO: metrice when the batch is dropped because the origin is behind. - } - } - Err(StageError::Eof) => out_of_data = true, - Err(e) => return Err(e), - } - - // Skip adding the data unless up to date with the origin, - // but still fully empty the previous stages. - if origin_behind { - if out_of_data { - return Err(StageError::Eof); - } - return Err(StageError::NotEnoughData); - } - - // Attempt to derive more batches. - let batch = match self.derive_next_batch(out_of_data, parent) { - Ok(b) => b, - Err(e) => match e { - StageError::Eof => { - if out_of_data { - return Err(StageError::Eof); - } - return Err(StageError::NotEnoughData); - } - _ => return Err(e), - }, - }; - - // If the next batch is derived from the span batch, it's the last batch of the span. - // For singular batches, the span batch cache should be empty. - match batch { - Batch::Single(sb) => Ok(sb), - Batch::Span(sb) => { - let batches = sb.get_singular_batches(&self.l1_blocks, parent); - self.next_spans = batches; - let nb = self - .pop_next_batch(parent) - .ok_or_else(|| anyhow!("failed to pop next batch from span batch"))?; - Ok(nb) - } - } - } - /// Derives the next batch to apply on top of the current L2 safe head. /// Follows the validity rules imposed on consecutive batches. /// Based on currently available buffered batch and L1 origin information. @@ -220,9 +112,7 @@ where pub fn derive_next_batch(&mut self, empty: bool, parent: L2BlockInfo) -> StageResult { // Cannot derive a batch if no origin was prepared. if self.l1_blocks.is_empty() { - return Err(StageError::Custom(anyhow!( - "failed to derive batch: no origin was prepared" - ))); + return Err(StageError::MissingOrigin); } // Get the epoch @@ -348,10 +238,135 @@ where } } -impl OriginProvider for BatchQueue +#[async_trait] +impl AttributesProvider for BatchQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: BatchQueueProvider + OriginProvider + Send + Debug, + BF: SafeBlockFetcher + Send + Debug, + T: TelemetryProvider + Send + Debug, +{ + /// Returns the next valid batch upon the given safe head. + /// Also returns the boolean that indicates if the batch is the last block in the batch. + async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult { + if !self.next_spans.is_empty() { + // There are cached singular batches derived from the span batch. + // Check if the next cached batch matches the given parent block. + if self.next_spans[0].timestamp == parent.block_info.timestamp + self.cfg.block_time { + return self + .pop_next_batch(parent) + .ok_or(anyhow!("failed to pop next batch from span batch").into()); + } + // Parent block does not match the next batch. + // Means the previously returned batch is invalid. + // Drop cached batches and find another batch. + self.next_spans.clear(); + // TODO: log that the provided parent block does not match the next batch. + // TODO: metrice the internal batch drop. + } + + // If the epoch is advanced, update the l1 blocks. + // Advancing epoch must be done after the pipeline successfully applies the entire span + // batch to the chain. + // Because the span batch can be reverted during processing the batch, then we must + // preserve existing l1 blocks to verify the epochs of the next candidate batch. + if !self.l1_blocks.is_empty() && parent.l1_origin.number > self.l1_blocks[0].number { + for (i, block) in self.l1_blocks.iter().enumerate() { + if parent.l1_origin.number == block.number { + self.l1_blocks.drain(0..i); + self.telemetry + .write(Bytes::from("Advancing internal L1 blocks"), LogLevel::Info); + break; + } + } + // If the origin of the parent block is not included, we must advance the origin. + } + + // NOTE: The origin is used to determine if it's behind. + // It is the future origin that gets saved into the l1 blocks array. + // We always update the origin of this stage if it's not the same so + // after the update code runs, this is consistent. + let origin_behind = + self.origin.map_or(true, |origin| origin.number < parent.l1_origin.number); + + // Advance the origin if needed. + // The entire pipeline has the same origin. + // Batches prior to the l1 origin of the l2 safe head are not accepted. + if self.origin != self.prev.origin().copied() { + self.origin = self.prev.origin().cloned(); + if !origin_behind { + let origin = self.origin.as_ref().ok_or_else(|| anyhow!("missing origin"))?; + self.l1_blocks.push(*origin); + } else { + // This is to handle the special case of startup. + // At startup, the batch queue is reset and includes the + // l1 origin. That is the only time where immediately after + // reset is called, the origin behind is false. + self.l1_blocks.clear(); + } + // TODO: log batch queue origin advancement. + } + + // Load more data into the batch queue. + let mut out_of_data = false; + match self.prev.next_batch().await { + Ok(b) => { + if !origin_behind { + self.add_batch(b, parent).ok(); + } else { + // TODO: metrice when the batch is dropped because the origin is behind. + } + } + Err(StageError::Eof) => out_of_data = true, + Err(e) => return Err(e), + } + + // Skip adding the data unless up to date with the origin, + // but still fully empty the previous stages. + if origin_behind { + if out_of_data { + return Err(StageError::Eof); + } + return Err(StageError::NotEnoughData); + } + + // Attempt to derive more batches. + let batch = match self.derive_next_batch(out_of_data, parent) { + Ok(b) => b, + Err(e) => match e { + StageError::Eof => { + if out_of_data { + return Err(StageError::Eof); + } + return Err(StageError::NotEnoughData); + } + _ => return Err(e), + }, + }; + + // If the next batch is derived from the span batch, it's the last batch of the span. + // For singular batches, the span batch cache should be empty. + match batch { + Batch::Single(sb) => Ok(sb), + Batch::Span(sb) => { + let batches = sb.get_singular_batches(&self.l1_blocks, parent); + self.next_spans = batches; + let nb = self + .pop_next_batch(parent) + .ok_or_else(|| anyhow!("failed to pop next batch from span batch"))?; + Ok(nb) + } + } + } + + /// Returns if the previous batch was the last in the span. + fn is_last_in_span(&self) -> bool { + self.next_spans.is_empty() + } +} + +impl OriginProvider for BatchQueue +where + P: BatchQueueProvider + OriginProvider + Debug, BF: SafeBlockFetcher + Debug, T: TelemetryProvider + Debug, { @@ -361,10 +376,9 @@ where } #[async_trait] -impl ResettableStage for BatchQueue +impl ResettableStage for BatchQueue where - DAP: DataAvailabilityProvider + Send + Debug, - CP: ChainProvider + Send + Debug, + P: BatchQueueProvider + OriginProvider + Send + Debug, BF: SafeBlockFetcher + Send + Debug, T: TelemetryProvider + Send + Debug + Sync, { @@ -383,3 +397,80 @@ where Err(StageError::Eof) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + stages::{channel_reader::BatchReader, test_utils::MockBatchQueueProvider}, + traits::test_utils::{MockBlockFetcher, TestTelemetry}, + types::BatchType, + }; + use alloc::vec; + use miniz_oxide::deflate::compress_to_vec_zlib; + + fn new_batch_reader() -> BatchReader { + let raw_data = include_bytes!("../../testdata/raw_batch.hex"); + let mut typed_data = vec![BatchType::Span as u8]; + typed_data.extend_from_slice(raw_data.as_slice()); + let compressed = compress_to_vec_zlib(typed_data.as_slice(), 5); + BatchReader::from(compressed) + } + + #[test] + fn test_derive_next_batch_missing_origin() { + let telemetry = TestTelemetry::new(); + let data = vec![Ok(Batch::Single(SingleBatch::default()))]; + let cfg = RollupConfig::default(); + let mock = MockBatchQueueProvider::new(data); + let fetcher = MockBlockFetcher::default(); + let mut bq = BatchQueue::new(cfg, mock, telemetry, fetcher); + let parent = L2BlockInfo::default(); + let result = bq.derive_next_batch(false, parent).unwrap_err(); + assert_eq!(result, StageError::MissingOrigin); + } + + #[tokio::test] + async fn test_next_batch_not_enough_data() { + let mut reader = new_batch_reader(); + let batch = reader.next_batch().unwrap(); + let mock = MockBatchQueueProvider::new(vec![Ok(batch)]); + let telemetry = TestTelemetry::new(); + let fetcher = MockBlockFetcher::default(); + let mut bq = BatchQueue::new(RollupConfig::default(), mock, telemetry, fetcher); + let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); + assert_eq!(res, StageError::NotEnoughData); + assert!(bq.is_last_in_span()); + } + + // TODO(refcell): The batch reader here loops forever. + // Maybe the cursor isn't being used? + // #[tokio::test] + // async fn test_next_batch_succeeds() { + // let mut reader = new_batch_reader(); + // let mut batch_vec: Vec> = vec![]; + // while let Some(batch) = reader.next_batch() { + // batch_vec.push(Ok(batch)); + // } + // let mock = MockBatchQueueProvider::new(batch_vec); + // let telemetry = TestTelemetry::new(); + // let fetcher = MockBlockFetcher::default(); + // let mut bq = BatchQueue::new(RollupConfig::default(), mock, telemetry, fetcher); + // let res = bq.next_batch(L2BlockInfo::default()).await.unwrap(); + // assert_eq!(res, SingleBatch::default()); + // assert!(bq.is_last_in_span()); + // } + + #[tokio::test] + async fn test_batch_queue_empty_bytes() { + let telemetry = TestTelemetry::new(); + let data = vec![Ok(Batch::Single(SingleBatch::default()))]; + let cfg = RollupConfig::default(); + let mock = MockBatchQueueProvider::new(data); + let fetcher = MockBlockFetcher::default(); + let mut bq = BatchQueue::new(cfg, mock, telemetry, fetcher); + let parent = L2BlockInfo::default(); + let result = bq.next_batch(parent).await; + assert!(result.is_err()); + } +} diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 700d549f9..a63a3eab1 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -1,12 +1,9 @@ //! This module contains the `ChannelBank` struct. -use super::frame_queue::FrameQueue; use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, - traits::{ - ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage, - TelemetryProvider, - }, + stages::ChannelReaderProvider, + traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider}, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -16,6 +13,13 @@ use async_trait::async_trait; use core::fmt::Debug; use hashbrown::HashMap; +/// Provides frames for the [ChannelBank] stage. +#[async_trait] +pub trait ChannelBankProvider { + /// Retrieves the next [Frame] from the [FrameQueue] stage. + async fn next_frame(&mut self) -> StageResult; +} + /// [ChannelBank] is a stateful stage that does the following: /// 1. Unmarshalls frames from L1 transaction data /// 2. Applies those frames to a channel @@ -28,10 +32,9 @@ use hashbrown::HashMap; /// to `IngestData`. This means that we can do an ingest and then do a read while becoming too /// large. [ChannelBank] buffers channel frames, and emits full channel data #[derive(Debug)] -pub struct ChannelBank +pub struct ChannelBank where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: ChannelBankProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { /// The rollup configuration. @@ -43,17 +46,16 @@ where /// Channels in FIFO order. channel_queue: VecDeque, /// The previous stage of the derivation pipeline. - prev: FrameQueue, + prev: P, } -impl ChannelBank +impl ChannelBank where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: ChannelBankProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { /// Create a new [ChannelBank] stage. - pub fn new(cfg: Arc, prev: FrameQueue, telemetry: Arc) -> Self { + pub fn new(cfg: Arc, prev: P, telemetry: Arc) -> Self { Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } } @@ -156,27 +158,6 @@ where } } - /// Pulls the next piece of data from the channel bank. Note that it attempts to pull data out - /// of the channel bank prior to loading data in (unlike most other stages). This is to - /// ensure maintain consistency around channel bank pruning which depends upon the order - /// of operations. - pub async fn next_data(&mut self) -> StageResult> { - match self.read() { - Err(StageError::Eof) => { - // continue - we will attempt to load data into the channel bank - } - Err(e) => { - return Err(anyhow!("Error fetching next data from channel bank: {:?}", e).into()); - } - data => return data, - }; - - // Load the data into the channel bank - let frame = self.prev.next_frame().await?; - self.ingest_frame(frame)?; - Err(StageError::NotEnoughData) - } - /// Attempts to read the channel at the specified index. If the channel is not ready or timed /// out, it will return an error. /// If the channel read was successful, it will remove the channel from the channel queue. @@ -198,10 +179,33 @@ where } } -impl OriginProvider for ChannelBank +#[async_trait] +impl ChannelReaderProvider for ChannelBank +where + P: ChannelBankProvider + OriginProvider + Send + Debug, + T: TelemetryProvider + Send + Sync + Debug, +{ + async fn next_data(&mut self) -> StageResult> { + match self.read() { + Err(StageError::Eof) => { + // continue - we will attempt to load data into the channel bank + } + Err(e) => { + return Err(anyhow!("Error fetching next data from channel bank: {:?}", e).into()); + } + data => return data, + }; + + // Load the data into the channel bank + let frame = self.prev.next_frame().await?; + self.ingest_frame(frame)?; + Err(StageError::NotEnoughData) + } +} + +impl OriginProvider for ChannelBank where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: ChannelBankProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -210,10 +214,9 @@ where } #[async_trait] -impl ResettableStage for ChannelBank +impl ResettableStage for ChannelBank where - DAP: DataAvailabilityProvider + Send + Debug, - CP: ChainProvider + Send + Debug, + P: ChannelBankProvider + OriginProvider + Send + Debug, T: TelemetryProvider + Send + Sync + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { @@ -227,26 +230,18 @@ where mod tests { use super::*; use crate::{ - stages::{ - frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, l1_traversal::tests::*, - }, - traits::test_utils::{TestDAP, TestTelemetry}, + stages::{frame_queue::tests::new_test_frames, test_utils::MockChannelBankProvider}, + traits::test_utils::TestTelemetry, }; use alloc::vec; #[test] fn test_ingest_empty_origin() { - let mut traversal = new_test_traversal(vec![], vec![]); - traversal.block = None; - let dap = TestDAP::default(); - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mut mock = MockChannelBankProvider::new(vec![]); + mock.block_info = None; let telemetry = Arc::new(TestTelemetry::new()); - let mut channel_bank = ChannelBank::new( - Arc::new(RollupConfig::default()), - frame_queue, - Arc::clone(&telemetry), - ); + let cfg = Arc::new(RollupConfig::default()); + let mut channel_bank = ChannelBank::new(cfg, mock, Arc::clone(&telemetry)); let frame = Frame::default(); let err = channel_bank.ingest_frame(frame).unwrap_err(); assert_eq!(err, StageError::MissingOrigin); @@ -254,13 +249,10 @@ mod tests { #[test] fn test_ingest_invalid_frame() { - let traversal = new_test_traversal(vec![], vec![]); - let dap = TestDAP::default(); - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let mock = MockChannelBankProvider::new(vec![]); let telem = Arc::new(TestTelemetry::new()); let mut channel_bank = - ChannelBank::new(Arc::new(RollupConfig::default()), frame_queue, Arc::clone(&telem)); + ChannelBank::new(Arc::new(RollupConfig::default()), mock, Arc::clone(&telem)); let frame = Frame { id: [0xFF; 16], ..Default::default() }; assert_eq!(channel_bank.size(), 0); assert!(channel_bank.channels.is_empty()); @@ -277,18 +269,13 @@ mod tests { #[test] fn test_ingest_and_prune_channel_bank() { - let traversal = new_populated_test_traversal(); - let results = vec![Ok(Bytes::from(vec![0x00]))]; - let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + use alloc::vec::Vec; + let mut frames: Vec = new_test_frames(100000); + // let data = frames.iter().map(|f| Ok(f)).collect::>>(); + let mock = MockChannelBankProvider::new(vec![]); let telemetry = Arc::new(TestTelemetry::new()); - let mut channel_bank = ChannelBank::new( - Arc::new(RollupConfig::default()), - frame_queue, - Arc::clone(&telemetry), - ); - let mut frames = new_test_frames(100000); + let cfg = Arc::new(RollupConfig::default()); + let mut channel_bank = ChannelBank::new(cfg, mock, Arc::clone(&telemetry)); // Ingest frames until the channel bank is full and it stops increasing in size let mut current_size = 0; let next_frame = frames.pop().unwrap(); @@ -310,17 +297,11 @@ mod tests { #[tokio::test] async fn test_read_empty_channel_bank() { - let traversal = new_populated_test_traversal(); - let results = vec![Ok(Bytes::from(vec![0x00]))]; - let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let frame_queue = FrameQueue::new(retrieval, TestTelemetry::new()); + let frames = new_test_frames(1); + let mock = MockChannelBankProvider::new(vec![Ok(frames[0].clone())]); let telemetry = Arc::new(TestTelemetry::new()); - let mut channel_bank = ChannelBank::new( - Arc::new(RollupConfig::default()), - frame_queue, - Arc::clone(&telemetry), - ); + let cfg = Arc::new(RollupConfig::default()); + let mut channel_bank = ChannelBank::new(cfg, mock, Arc::clone(&telemetry)); let err = channel_bank.read().unwrap_err(); assert_eq!(err, StageError::Eof); let err = channel_bank.next_data().await.unwrap_err(); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 0d63b9d46..724a23c3c 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -1,47 +1,75 @@ //! This module contains the `ChannelReader` struct. -use super::channel_bank::ChannelBank; use crate::{ - traits::{ - ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, TelemetryProvider, - }, + stages::BatchQueueProvider, + traits::{LogLevel, OriginProvider, TelemetryProvider}, types::{Batch, BlockInfo, StageError, StageResult}, }; -use alloc::vec::Vec; -use anyhow::anyhow; +use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use alloy_primitives::Bytes; +use async_trait::async_trait; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec_zlib; +/// The [ChannelReader] provider trait. +#[async_trait] +pub trait ChannelReaderProvider { + /// Pulls the next piece of data from the channel bank. Note that it attempts to pull data out + /// of the channel bank prior to loading data in (unlike most other stages). This is to + /// ensure maintain consistency around channel bank pruning which depends upon the order + /// of operations. + async fn next_data(&mut self) -> StageResult>; +} + /// [ChannelReader] is a stateful stage that does the following: #[derive(Debug)] -pub struct ChannelReader +pub struct ChannelReader where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: ChannelReaderProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { /// The previous stage of the derivation pipeline. - prev: ChannelBank, + prev: P, /// Telemetry - telemetry: T, + telemetry: Arc, /// The batch reader. next_batch: Option, } -impl ChannelReader +impl ChannelReader where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: ChannelReaderProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { /// Create a new [ChannelReader] stage. - pub fn new(prev: ChannelBank, telemetry: T) -> Self { + pub fn new(prev: P, telemetry: Arc) -> Self { Self { prev, telemetry, next_batch: None } } - /// Pulls out the next Batch from the available channel. - pub async fn next_batch(&mut self) -> StageResult { + /// Creates the batch reader from available channel data. + async fn set_batch_reader(&mut self) -> StageResult<()> { + if self.next_batch.is_none() { + let channel = self.prev.next_data().await?.ok_or(StageError::NoChannel)?; + self.next_batch = Some(BatchReader::from(&channel[..])); + } + Ok(()) + } + + /// Forces the read to continue with the next channel, resetting any + /// decoding / decompression state to a fresh start. + pub fn next_channel(&mut self) { + self.next_batch = None; + } +} + +#[async_trait] +impl BatchQueueProvider for ChannelReader +where + P: ChannelReaderProvider + OriginProvider + Send + Debug, + T: TelemetryProvider + Send + Sync + Debug, +{ + async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { self.telemetry .write(alloc::format!("Failed to set batch reader: {:?}", e), LogLevel::Error); @@ -62,27 +90,11 @@ where } } } - - /// Creates the batch reader from available channel data. - async fn set_batch_reader(&mut self) -> StageResult<()> { - if self.next_batch.is_none() { - let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?; - self.next_batch = Some(BatchReader::from(&channel[..])); - } - Ok(()) - } - - /// Forces the read to continue with the next channel, resetting any - /// decoding / decompression state to a fresh start. - pub fn next_channel(&mut self) { - self.next_batch = None; - } } -impl OriginProvider for ChannelReader +impl OriginProvider for ChannelReader where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: ChannelReaderProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -138,11 +150,58 @@ impl From> for BatchReader { #[cfg(test)] mod test { - use crate::{stages::channel_reader::BatchReader, types::BatchType}; + use super::*; + use crate::{ + stages::test_utils::MockChannelReaderProvider, traits::test_utils::TestTelemetry, + types::BatchType, + }; use alloc::vec; use miniz_oxide::deflate::compress_to_vec_zlib; - // TODO(clabby): More tests here for multiple batches, integration w/ channel bank, etc. + fn new_compressed_batch_data() -> Bytes { + let raw_data = include_bytes!("../../testdata/raw_batch.hex"); + let mut typed_data = vec![BatchType::Span as u8]; + typed_data.extend_from_slice(raw_data.as_slice()); + compress_to_vec_zlib(typed_data.as_slice(), 5).into() + } + + #[tokio::test] + async fn test_next_batch_batch_reader_set_fails() { + let mock = MockChannelReaderProvider::new(vec![Err(StageError::Eof)]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + assert_eq!(reader.next_batch().await, Err(StageError::Eof)); + assert!(reader.next_batch.is_none()); + } + + #[tokio::test] + async fn test_next_batch_batch_reader_no_data() { + let mock = MockChannelReaderProvider::new(vec![Ok(None)]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + assert_eq!(reader.next_batch().await, Err(StageError::NoChannel)); + assert!(reader.next_batch.is_none()); + } + + #[tokio::test] + async fn test_next_batch_not_enough_data() { + let mock = MockChannelReaderProvider::new(vec![Ok(Some(Bytes::default()))]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + assert_eq!(reader.next_batch().await, Err(StageError::NotEnoughData)); + assert!(reader.next_batch.is_none()); + } + + #[tokio::test] + async fn test_next_batch_succeeds() { + let raw = new_compressed_batch_data(); + let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + let res = reader.next_batch().await.unwrap(); + matches!(res, Batch::Span(_)); + assert!(reader.next_batch.is_some()); + } #[test] fn test_batch_reader() { diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 4de0131c9..3ae37e8f8 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,48 +2,62 @@ use core::fmt::Debug; -use super::l1_retrieval::L1Retrieval; use crate::{ - traits::{ - ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage, - TelemetryProvider, - }, + stages::ChannelBankProvider, + traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider}, types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; +use alloy_primitives::Bytes; use anyhow::anyhow; use async_trait::async_trait; +/// Provides data frames for the [FrameQueue] stage. +#[async_trait] +pub trait FrameQueueProvider { + /// An item that can be converted into a byte array. + type Item: Into; + + /// Retrieves the next data item from the L1 retrieval stage. + /// If there is data, it pushes it into the next stage. + /// If there is no data, it returns an error. + async fn next_data(&mut self) -> StageResult; +} + /// The [FrameQueue] stage of the derivation pipeline. /// This stage takes the output of the [L1Retrieval] stage and parses it into frames. #[derive(Debug)] -pub struct FrameQueue +pub struct FrameQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: FrameQueueProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { /// The previous stage in the pipeline. - pub prev: L1Retrieval, + pub prev: P, /// Telemetry pub telemetry: T, /// The current frame queue. queue: VecDeque, } -impl FrameQueue +impl FrameQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: FrameQueueProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. - pub fn new(prev: L1Retrieval, telemetry: T) -> Self { + pub fn new(prev: P, telemetry: T) -> Self { Self { prev, telemetry, queue: VecDeque::new() } } +} - /// Fetches the next frame from the [FrameQueue]. - pub async fn next_frame(&mut self) -> StageResult { +#[async_trait] +impl ChannelBankProvider for FrameQueue +where + P: FrameQueueProvider + OriginProvider + Send + Debug, + T: TelemetryProvider + Send + Debug, +{ + async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { match self.prev.next_data().await { Ok(data) => { @@ -52,6 +66,7 @@ where } else { // TODO: log parsing frame error // Failed to parse frames, but there may be more frames in the queue for + // // the pipeline to advance, so don't return an error here. } } @@ -78,10 +93,9 @@ where } } -impl OriginProvider for FrameQueue +impl OriginProvider for FrameQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: FrameQueueProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -90,10 +104,9 @@ where } #[async_trait] -impl ResettableStage for FrameQueue +impl ResettableStage for FrameQueue where - DAP: DataAvailabilityProvider + Send + Debug, - CP: ChainProvider + Send + Debug, + P: FrameQueueProvider + OriginProvider + Send + Debug, T: TelemetryProvider + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { @@ -106,12 +119,10 @@ where pub(crate) mod tests { use super::*; use crate::{ - stages::l1_traversal::tests::new_populated_test_traversal, - traits::test_utils::{TestDAP, TestTelemetry}, + stages::test_utils::MockFrameQueueProvider, traits::test_utils::TestTelemetry, DERIVATION_VERSION_0, }; use alloc::{vec, vec::Vec}; - use alloy_primitives::Bytes; pub(crate) fn new_test_frames(count: usize) -> Vec { (0..count) @@ -137,11 +148,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_empty_bytes() { let telemetry = TestTelemetry::new(); - let traversal = new_populated_test_traversal(); - let results = vec![Ok(Bytes::from(vec![0x00]))]; - let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let mut frame_queue = FrameQueue::new(retrieval, telemetry); + let data = vec![Ok(Bytes::from(vec![0x00]))]; + let mock = MockFrameQueueProvider { data }; + let mut frame_queue = FrameQueue::new(mock, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } @@ -149,11 +158,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_no_frames_decoded() { let telemetry = TestTelemetry::new(); - let traversal = new_populated_test_traversal(); - let results = vec![Err(StageError::Eof), Ok(Bytes::default())]; - let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let mut frame_queue = FrameQueue::new(retrieval, telemetry); + let data = vec![Err(StageError::Eof), Ok(Bytes::default())]; + let mock = MockFrameQueueProvider { data }; + let mut frame_queue = FrameQueue::new(mock, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } @@ -161,11 +168,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_wrong_derivation_version() { let telemetry = TestTelemetry::new(); - let traversal = new_populated_test_traversal(); - let results = vec![Ok(Bytes::from(vec![0x01]))]; - let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let mut frame_queue = FrameQueue::new(retrieval, telemetry); + let data = vec![Ok(Bytes::from(vec![0x01]))]; + let mock = MockFrameQueueProvider { data }; + let mut frame_queue = FrameQueue::new(mock, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } @@ -173,11 +178,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_frame_too_short() { let telemetry = TestTelemetry::new(); - let traversal = new_populated_test_traversal(); - let results = vec![Ok(Bytes::from(vec![0x00, 0x01]))]; - let dap = TestDAP { results }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let mut frame_queue = FrameQueue::new(retrieval, telemetry); + let data = vec![Ok(Bytes::from(vec![0x00, 0x01]))]; + let mock = MockFrameQueueProvider { data }; + let mut frame_queue = FrameQueue::new(mock, telemetry); let err = frame_queue.next_frame().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } @@ -186,10 +189,8 @@ pub(crate) mod tests { async fn test_frame_queue_single_frame() { let data = new_encoded_test_frames(1); let telemetry = TestTelemetry::new(); - let traversal = new_populated_test_traversal(); - let dap = TestDAP { results: vec![Ok(data)] }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let mut frame_queue = FrameQueue::new(retrieval, telemetry); + let mock = MockFrameQueueProvider { data: vec![Ok(data)] }; + let mut frame_queue = FrameQueue::new(mock, telemetry); let frame_decoded = frame_queue.next_frame().await.unwrap(); let frame = new_test_frames(1); assert_eq!(frame[0], frame_decoded); @@ -201,10 +202,8 @@ pub(crate) mod tests { async fn test_frame_queue_multiple_frames() { let telemetry = TestTelemetry::new(); let data = new_encoded_test_frames(3); - let traversal = new_populated_test_traversal(); - let dap = TestDAP { results: vec![Ok(data)] }; - let retrieval = L1Retrieval::new(traversal, dap, TestTelemetry::new()); - let mut frame_queue = FrameQueue::new(retrieval, telemetry); + let mock = MockFrameQueueProvider { data: vec![Ok(data)] }; + let mut frame_queue = FrameQueue::new(mock, telemetry); for i in 0..3 { let frame_decoded = frame_queue.next_frame().await.unwrap(); assert_eq!(frame_decoded.number, i); diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 556d036e2..c8a5e1893 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -1,31 +1,44 @@ //! Contains the [L1Retrieval] stage of the derivation pipeline. -use super::L1Traversal; use crate::{ + stages::FrameQueueProvider, traits::{ - AsyncIterator, ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, - ResettableStage, TelemetryProvider, + AsyncIterator, DataAvailabilityProvider, OriginProvider, ResettableStage, TelemetryProvider, }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; +use alloy_primitives::Address; use anyhow::anyhow; use async_trait::async_trait; +/// Provides L1 blocks for the [L1Retrieval] stage. +/// This is the previous stage in the pipeline. +pub trait L1RetrievalProvider { + /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. + /// This function can only be called once while the stage is in progress, and will return + /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is + /// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned. + fn next_l1_block(&mut self) -> StageResult>; + + /// Returns the batcher [Address] from the [crate::types::SystemConfig]. + fn batcher_addr(&self) -> Address; +} + /// The [L1Retrieval] stage of the derivation pipeline. /// For each L1 [BlockInfo] pulled from the [L1Traversal] stage, /// [L1Retrieval] fetches the associated data from a specified /// [DataAvailabilityProvider]. This data is returned as a generic /// [DataIter] that can be iterated over. #[derive(Debug)] -pub struct L1Retrieval +pub struct L1Retrieval where DAP: DataAvailabilityProvider, - CP: ChainProvider, + P: L1RetrievalProvider + OriginProvider, T: TelemetryProvider, { /// The previous stage in the pipeline. - pub prev: L1Traversal, + pub prev: P, /// Telemetry provider for the L1 retrieval stage. pub telemetry: T, /// The data availability provider to use for the L1 retrieval stage. @@ -34,33 +47,35 @@ where pub(crate) data: Option, } -impl L1Retrieval +impl L1Retrieval where DAP: DataAvailabilityProvider, - CP: ChainProvider, + P: L1RetrievalProvider + OriginProvider, T: TelemetryProvider, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] /// stage and given [DataAvailabilityProvider]. - pub fn new(prev: L1Traversal, provider: DAP, telemetry: T) -> Self { + pub fn new(prev: P, provider: DAP, telemetry: T) -> Self { Self { prev, telemetry, provider, data: None } } +} + +#[async_trait] +impl FrameQueueProvider for L1Retrieval +where + DAP: DataAvailabilityProvider + Send, + P: L1RetrievalProvider + OriginProvider + Send, + T: TelemetryProvider + Send, +{ + type Item = DAP::Item; - /// Retrieves the next data item from the L1 retrieval stage. - /// If there is data, it pushes it into the next stage. - /// If there is no data, it returns an error. - pub async fn next_data(&mut self) -> StageResult { + async fn next_data(&mut self) -> StageResult { if self.data.is_none() { - self.telemetry.write( - alloc::format!("Retrieving data for block: {:?}", self.prev.block), - LogLevel::Debug, - ); let next = self .prev .next_l1_block()? .ok_or_else(|| anyhow!("No block to retrieve data from"))?; - self.data = - Some(self.provider.open_data(&next, self.prev.system_config.batcher_addr).await?); + self.data = Some(self.provider.open_data(&next, self.prev.batcher_addr()).await?); } let data = self.data.as_mut().expect("Cannot be None").next().await.ok_or(StageError::Eof); @@ -75,10 +90,10 @@ where } } -impl OriginProvider for L1Retrieval +impl OriginProvider for L1Retrieval where DAP: DataAvailabilityProvider, - CP: ChainProvider, + P: L1RetrievalProvider + OriginProvider, T: TelemetryProvider, { fn origin(&self) -> Option<&BlockInfo> { @@ -87,10 +102,10 @@ where } #[async_trait] -impl ResettableStage for L1Retrieval +impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, - CP: ChainProvider + Send, + P: L1RetrievalProvider + OriginProvider + Send, T: TelemetryProvider + Send, { async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { @@ -107,7 +122,7 @@ mod tests { traits::test_utils::{TestDAP, TestIter, TestTelemetry}, }; use alloc::vec; - use alloy_primitives::{Address, Bytes}; + use alloy_primitives::Bytes; #[tokio::test] async fn test_l1_retrieval_origin() { diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index c4e04d32e..f20dcebb2 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -1,10 +1,12 @@ //! Contains the [L1Traversal] stage of the derivation pipeline. use crate::{ + stages::L1RetrievalProvider, traits::{ChainProvider, LogLevel, OriginProvider, ResettableStage, TelemetryProvider}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; +use alloy_primitives::Address; use async_trait::async_trait; /// The [L1Traversal] stage of the derivation pipeline. @@ -30,6 +32,21 @@ pub struct L1Traversal { pub rollup_config: Arc, } +impl L1RetrievalProvider for L1Traversal { + fn batcher_addr(&self) -> Address { + self.system_config.batcher_addr + } + + fn next_l1_block(&mut self) -> StageResult> { + if !self.done { + self.done = true; + Ok(self.block) + } else { + Err(StageError::Eof) + } + } +} + impl L1Traversal { /// Creates a new [L1Traversal] instance. pub fn new(data_source: F, cfg: Arc, telemetry: T) -> Self { @@ -48,19 +65,6 @@ impl L1Traversal { &self.data_source } - /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. - /// This function can only be called once while the stage is in progress, and will return - /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is - /// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned. - pub fn next_l1_block(&mut self) -> StageResult> { - if !self.done { - self.done = true; - Ok(self.block) - } else { - Err(StageError::Eof) - } - } - /// Advances the internal state of the [L1Traversal] stage to the next L1 block. /// This function fetches the next L1 [BlockInfo] from the data source and updates the /// [SystemConfig] with the receipts from the block. @@ -132,7 +136,7 @@ pub(crate) mod tests { types::Receipt, }; use alloc::vec; - use alloy_primitives::{address, b256, hex, Address, Bytes, Log, LogData, B256}; + use alloy_primitives::{address, b256, hex, Bytes, Log, LogData, B256}; const L1_SYS_CONFIG_ADDR: Address = address!("1337000000000000000000000000000000000000"); diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index d998b075f..78d80cf41 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -17,22 +17,22 @@ mod l1_traversal; pub use l1_traversal::L1Traversal; mod l1_retrieval; -pub use l1_retrieval::L1Retrieval; +pub use l1_retrieval::{L1Retrieval, L1RetrievalProvider}; mod frame_queue; -pub use frame_queue::FrameQueue; +pub use frame_queue::{FrameQueue, FrameQueueProvider}; mod channel_bank; -pub use channel_bank::ChannelBank; +pub use channel_bank::{ChannelBank, ChannelBankProvider}; mod channel_reader; -pub use channel_reader::ChannelReader; +pub use channel_reader::{ChannelReader, ChannelReaderProvider}; mod batch_queue; -pub use batch_queue::BatchQueue; +pub use batch_queue::{BatchQueue, BatchQueueProvider}; mod attributes_queue; -pub use attributes_queue::AttributesQueue; +pub use attributes_queue::{AttributesProvider, AttributesQueue}; #[cfg(test)] pub mod test_utils; diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs index cb36f3838..aec67c5b6 100644 --- a/crates/derive/src/stages/test_utils/attributes_queue.rs +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -1,10 +1,14 @@ //! Testing utilities for the attributes queue stage. use crate::{ - stages::attributes_queue::AttributesBuilder, - types::{BlockID, L2BlockInfo, PayloadAttributes}, + stages::attributes_queue::{AttributesBuilder, AttributesProvider}, + traits::OriginProvider, + types::{ + BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, SingleBatch, StageError, StageResult, + }, }; -use alloc::vec::Vec; +use alloc::{boxed::Box, vec::Vec}; +use async_trait::async_trait; /// A mock implementation of the [`AttributesBuilder`] for testing. #[derive(Debug, Default)] @@ -23,3 +27,37 @@ impl AttributesBuilder for MockAttributesBuilder { self.attributes.pop().ok_or(anyhow::anyhow!("missing payload attribute"))? } } + +/// A mock implementation of the [`BatchQueue`] stage for testing. +#[derive(Debug, Default)] +pub struct MockAttributesProvider { + /// The origin of the L1 block. + origin: Option, + /// A list of batches to return. + batches: Vec>, +} + +impl OriginProvider for MockAttributesProvider { + fn origin(&self) -> Option<&BlockInfo> { + self.origin.as_ref() + } +} + +#[async_trait] +impl AttributesProvider for MockAttributesProvider { + async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { + self.batches.pop().ok_or(StageError::Eof)? + } + + fn is_last_in_span(&self) -> bool { + self.batches.is_empty() + } +} + +/// Creates a new [`MockAttributesProvider`] with the given origin and batches. +pub fn new_attributes_provider( + origin: Option, + batches: Vec>, +) -> MockAttributesProvider { + MockAttributesProvider { origin, batches } +} diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index 06afd09da..0165b452e 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -1,44 +1,38 @@ //! A mock implementation of the [`BatchQueue`] stage for testing. -use alloc::{boxed::Box, vec::Vec}; -use async_trait::async_trait; - use crate::{ - stages::attributes_queue::AttributesProvider, + stages::batch_queue::BatchQueueProvider, traits::OriginProvider, - types::{BlockInfo, L2BlockInfo, SingleBatch, StageError, StageResult}, + types::{Batch, BlockInfo, StageError, StageResult}, }; +use alloc::{boxed::Box, vec::Vec}; +use async_trait::async_trait; -/// A mock implementation of the [`BatchQueue`] stage for testing. +/// A mock provider for the [BatchQueue] stage. #[derive(Debug, Default)] -pub struct MockBatchQueue { +pub struct MockBatchQueueProvider { /// The origin of the L1 block. origin: Option, /// A list of batches to return. - batches: Vec>, + batches: Vec>, } -impl OriginProvider for MockBatchQueue { +impl MockBatchQueueProvider { + /// Creates a new [MockBatchQueueProvider] with the given origin and batches. + pub fn new(batches: Vec>) -> Self { + Self { origin: Some(BlockInfo::default()), batches } + } +} + +impl OriginProvider for MockBatchQueueProvider { fn origin(&self) -> Option<&BlockInfo> { self.origin.as_ref() } } #[async_trait] -impl AttributesProvider for MockBatchQueue { - async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { +impl BatchQueueProvider for MockBatchQueueProvider { + async fn next_batch(&mut self) -> StageResult { self.batches.pop().ok_or(StageError::Eof)? } - - fn is_last_in_span(&self) -> bool { - self.batches.is_empty() - } -} - -/// Creates a new [`MockBatchQueue`] with the given origin and batches. -pub fn new_mock_batch_queue( - origin: Option, - batches: Vec>, -) -> MockBatchQueue { - MockBatchQueue { origin, batches } } diff --git a/crates/derive/src/stages/test_utils/channel_bank.rs b/crates/derive/src/stages/test_utils/channel_bank.rs new file mode 100644 index 000000000..62db2933a --- /dev/null +++ b/crates/derive/src/stages/test_utils/channel_bank.rs @@ -0,0 +1,38 @@ +//! Mock testing utilities for the [ChannelBank] stage. + +use crate::{ + stages::ChannelBankProvider, + traits::OriginProvider, + types::{BlockInfo, Frame, StageError, StageResult}, +}; +use alloc::{boxed::Box, vec::Vec}; +use async_trait::async_trait; + +/// A mock [ChannelBankProvider] for testing the [ChannelBank] stage. +#[derive(Debug)] +pub struct MockChannelBankProvider { + /// The data to return. + pub data: Vec>, + /// The block info + pub block_info: Option, +} + +impl MockChannelBankProvider { + /// Creates a new [MockChannelBankProvider] with the given data. + pub fn new(data: Vec>) -> Self { + Self { data, block_info: Some(BlockInfo::default()) } + } +} + +impl OriginProvider for MockChannelBankProvider { + fn origin(&self) -> Option<&BlockInfo> { + self.block_info.as_ref() + } +} + +#[async_trait] +impl ChannelBankProvider for MockChannelBankProvider { + async fn next_frame(&mut self) -> StageResult { + self.data.pop().unwrap_or(Err(StageError::Eof)) + } +} diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs new file mode 100644 index 000000000..23cea6416 --- /dev/null +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -0,0 +1,39 @@ +//! Test utilities for the [ChannelReader] stage. + +use crate::{ + stages::ChannelReaderProvider, + traits::OriginProvider, + types::{BlockInfo, StageError, StageResult}, +}; +use alloc::{boxed::Box, vec::Vec}; +use alloy_primitives::Bytes; +use async_trait::async_trait; + +/// A mock [ChannelReaderProvider] for testing the [ChannelReader] stage. +#[derive(Debug)] +pub struct MockChannelReaderProvider { + /// The data to return. + pub data: Vec>>, + /// The origin block info + pub block_info: Option, +} + +impl MockChannelReaderProvider { + /// Creates a new [MockChannelReaderProvider] with the given data. + pub fn new(data: Vec>>) -> Self { + Self { data, block_info: Some(BlockInfo::default()) } + } +} + +impl OriginProvider for MockChannelReaderProvider { + fn origin(&self) -> Option<&BlockInfo> { + self.block_info.as_ref() + } +} + +#[async_trait] +impl ChannelReaderProvider for MockChannelReaderProvider { + async fn next_data(&mut self) -> StageResult> { + self.data.pop().unwrap_or(Err(StageError::Eof)) + } +} diff --git a/crates/derive/src/stages/test_utils/frame_queue.rs b/crates/derive/src/stages/test_utils/frame_queue.rs new file mode 100644 index 000000000..a7cd4f8e5 --- /dev/null +++ b/crates/derive/src/stages/test_utils/frame_queue.rs @@ -0,0 +1,39 @@ +//! Mock types for the [FrameQueue] stage. + +use crate::{ + stages::FrameQueueProvider, + traits::OriginProvider, + types::{BlockInfo, StageError, StageResult}, +}; +use alloc::{boxed::Box, vec::Vec}; +use alloy_primitives::Bytes; +use async_trait::async_trait; + +/// A mock [FrameQueueProvider] for testing the [FrameQueue] stage. +#[derive(Debug)] +pub struct MockFrameQueueProvider { + /// The data to return. + pub data: Vec>, +} + +impl MockFrameQueueProvider { + /// Creates a new [MockFrameQueueProvider] with the given data. + pub fn new(data: Vec>) -> Self { + Self { data } + } +} + +impl OriginProvider for MockFrameQueueProvider { + fn origin(&self) -> Option<&BlockInfo> { + None + } +} + +#[async_trait] +impl FrameQueueProvider for MockFrameQueueProvider { + type Item = Bytes; + + async fn next_data(&mut self) -> StageResult { + self.data.pop().unwrap_or(Err(StageError::Eof)) + } +} diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs index b05611b71..69516c25a 100644 --- a/crates/derive/src/stages/test_utils/mod.rs +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -2,7 +2,18 @@ //! mock implementations of the various stages for testing. mod batch_queue; -pub use batch_queue::{new_mock_batch_queue, MockBatchQueue}; +pub use batch_queue::MockBatchQueueProvider; mod attributes_queue; -pub use attributes_queue::MockAttributesBuilder; +pub use attributes_queue::{ + new_attributes_provider, MockAttributesBuilder, MockAttributesProvider, +}; + +mod frame_queue; +pub use frame_queue::MockFrameQueueProvider; + +mod channel_bank; +pub use channel_bank::MockChannelBankProvider; + +mod channel_reader; +pub use channel_reader::MockChannelReaderProvider; diff --git a/crates/derive/src/traits/test_utils.rs b/crates/derive/src/traits/test_utils.rs index d5ad6387f..062714e71 100644 --- a/crates/derive/src/traits/test_utils.rs +++ b/crates/derive/src/traits/test_utils.rs @@ -1,7 +1,7 @@ //! Test Utilities for derive traits pub mod data_sources; -pub use data_sources::TestChainProvider; +pub use data_sources::{MockBlockFetcher, TestChainProvider}; pub mod data_availability; pub use data_availability::{TestDAP, TestIter}; diff --git a/crates/derive/src/traits/test_utils/data_sources.rs b/crates/derive/src/traits/test_utils/data_sources.rs index 9fb5aa04e..17161cda1 100644 --- a/crates/derive/src/traits/test_utils/data_sources.rs +++ b/crates/derive/src/traits/test_utils/data_sources.rs @@ -1,14 +1,49 @@ //! Data Sources Test Utilities use crate::{ - traits::ChainProvider, - types::{BlockInfo, Receipt, TxEnvelope}, + traits::{ChainProvider, SafeBlockFetcher}, + types::{BlockInfo, ExecutionPayloadEnvelope, L2BlockInfo, Receipt, TxEnvelope}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::B256; use anyhow::Result; use async_trait::async_trait; +/// A mock block fetcher. +#[derive(Debug, Default)] +pub struct MockBlockFetcher { + /// Blocks + pub blocks: Vec, + /// Payloads + pub payloads: Vec, +} + +impl MockBlockFetcher { + /// Creates a new [MockBlockFetcher] with the given origin and batches. + pub fn new(blocks: Vec, payloads: Vec) -> Self { + Self { blocks, payloads } + } +} + +#[async_trait] +impl SafeBlockFetcher for MockBlockFetcher { + async fn l2_block_info_by_number(&self, number: u64) -> Result { + self.blocks + .iter() + .find(|b| b.block_info.number == number) + .cloned() + .ok_or_else(|| anyhow::anyhow!("Block not found")) + } + + async fn payload_by_number(&self, number: u64) -> Result { + self.payloads + .iter() + .find(|p| p.execution_payload.block_number == number) + .cloned() + .ok_or_else(|| anyhow::anyhow!("Payload not found")) + } +} + /// A mock chain provider for testing. #[derive(Debug, Clone, Default)] pub struct TestChainProvider { diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 18a3fb4bf..d226c559b 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -23,6 +23,8 @@ pub enum StageError { Empty, /// No channels are available in the channel bank. NoChannelsAvailable, + /// No channel returned by the [crate::stages::ChannelReader] stage. + NoChannel, /// Failed to find channel. ChannelNotFound, /// Missing L1 origin. @@ -59,6 +61,7 @@ impl PartialEq for StageError { (StageError::Eof, StageError::Eof) | (StageError::NotEnoughData, StageError::NotEnoughData) | (StageError::NoChannelsAvailable, StageError::NoChannelsAvailable) | + (StageError::NoChannel, StageError::NoChannel) | (StageError::ChannelNotFound, StageError::ChannelNotFound) | (StageError::MissingOrigin, StageError::MissingOrigin) | (StageError::AttributesBuild(_), StageError::AttributesBuild(_)) | @@ -94,6 +97,7 @@ impl Display for StageError { } StageError::Empty => write!(f, "Empty"), StageError::NoChannelsAvailable => write!(f, "No channels available"), + StageError::NoChannel => write!(f, "No channel"), StageError::ChannelNotFound => write!(f, "Channel not found"), StageError::MissingOrigin => write!(f, "Missing L1 origin"), StageError::AttributesBuild(e) => write!(f, "Attributes build error: {}", e), diff --git a/crates/derive/src/types/payload.rs b/crates/derive/src/types/payload.rs index 38a0fa57a..9ade00b40 100644 --- a/crates/derive/src/types/payload.rs +++ b/crates/derive/src/types/payload.rs @@ -20,10 +20,10 @@ use serde::{Deserialize, Serialize}; pub struct ExecutionPayloadEnvelope { /// Parent beacon block root. #[cfg_attr(feature = "serde", serde(rename = "parentBeaconBlockRoot"))] - parent_beacon_block_root: Option, + pub parent_beacon_block_root: Option, /// The inner execution payload. #[cfg_attr(feature = "serde", serde(rename = "executionPayload"))] - execution_payload: ExecutionPayload, + pub execution_payload: ExecutionPayload, } impl ExecutionPayloadEnvelope { @@ -41,40 +41,57 @@ impl ExecutionPayloadEnvelope { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone, PartialEq, Eq)] pub struct ExecutionPayload { + /// The parent hash. #[cfg_attr(feature = "serde", serde(rename = "parentHash"))] - parent_hash: B256, + pub parent_hash: B256, + /// The coinbase address. #[cfg_attr(feature = "serde", serde(rename = "feeRecipient"))] - fee_recipient: Address, + pub fee_recipient: Address, + /// The state root. #[cfg_attr(feature = "serde", serde(rename = "stateRoot"))] - state_root: B256, + pub state_root: B256, + /// The transactions root. #[cfg_attr(feature = "serde", serde(rename = "receiptsRoot"))] - receipts_root: B256, + pub receipts_root: B256, + /// The logs bloom. #[cfg_attr(feature = "serde", serde(rename = "logsBloom"))] - logs_bloom: B256, + pub logs_bloom: B256, + /// The mix hash. #[cfg_attr(feature = "serde", serde(rename = "prevRandao"))] - prev_randao: B256, + pub prev_randao: B256, + /// The difficulty. #[cfg_attr(feature = "serde", serde(rename = "blockNumber"))] - block_number: u64, + pub block_number: u64, + /// The gas limit. #[cfg_attr(feature = "serde", serde(rename = "gasLimit"))] - gas_limit: u64, + pub gas_limit: u64, + /// The gas used. #[cfg_attr(feature = "serde", serde(rename = "gasUsed"))] - gas_used: u64, + pub gas_used: u64, + /// The timestamp. #[cfg_attr(feature = "serde", serde(rename = "timestamp"))] - timestamp: u64, + pub timestamp: u64, + /// The extra data. #[cfg_attr(feature = "serde", serde(rename = "extraData"))] - extra_data: B256, + pub extra_data: B256, + /// Base fee per gas. #[cfg_attr(feature = "serde", serde(rename = "baseFeePerGas"))] - base_fee_per_gas: U256, + pub base_fee_per_gas: U256, + /// Block hash. #[cfg_attr(feature = "serde", serde(rename = "blockHash"))] - block_hash: B256, + pub block_hash: B256, + /// The transactions. #[cfg_attr(feature = "serde", serde(rename = "transactions"))] - transactions: Vec, + pub transactions: Vec, + /// The withdrawals. #[cfg_attr(feature = "serde", serde(rename = "withdrawals"))] - withdrawals: Option, + pub withdrawals: Option, + /// The blob gas used. #[cfg_attr(feature = "serde", serde(rename = "blobGasUsed"))] - blob_gas_used: Option, + pub blob_gas_used: Option, + /// The excess blob gas. #[cfg_attr(feature = "serde", serde(rename = "excessBlobGas"))] - excess_blob_gas: Option, + pub excess_blob_gas: Option, } /// Withdrawal Type