diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index b3247fd11..b9e32692c 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -196,8 +196,7 @@ mod tests { use super::*; use crate::{ stages::{ - frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, - l1_traversal::tests::new_test_traversal, + frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, l1_traversal::tests::*, }, traits::test_utils::TestDAP, }; @@ -205,7 +204,7 @@ mod tests { #[test] fn test_ingest_empty_origin() { - let mut traversal = new_test_traversal(false, false); + let mut traversal = new_test_traversal(vec![], vec![]); traversal.block = None; let dap = TestDAP::default(); let retrieval = L1Retrieval::new(traversal, dap); @@ -218,7 +217,7 @@ mod tests { #[test] fn test_ingest_and_prune_channel_bank() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; let retrieval = L1Retrieval::new(traversal, dap); @@ -246,7 +245,7 @@ mod tests { #[tokio::test] async fn test_read_empty_channel_bank() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; let retrieval = L1Retrieval::new(traversal, dap); diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 6b9371555..d2e58cfae 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -85,7 +85,7 @@ where pub(crate) mod tests { use super::*; use crate::{ - stages::l1_traversal::tests::new_test_traversal, traits::test_utils::TestDAP, + stages::l1_traversal::tests::new_populated_test_traversal, traits::test_utils::TestDAP, DERIVATION_VERSION_0, }; use alloc::{vec, vec::Vec}; @@ -114,7 +114,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_empty_bytes() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00]))]; let dap = TestDAP { results }; let retrieval = L1Retrieval::new(traversal, dap); @@ -125,7 +125,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_no_frames_decoded() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let results = vec![Err(StageError::Eof), Ok(Bytes::default())]; let dap = TestDAP { results }; let retrieval = L1Retrieval::new(traversal, dap); @@ -136,7 +136,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_wrong_derivation_version() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x01]))]; let dap = TestDAP { results }; let retrieval = L1Retrieval::new(traversal, dap); @@ -147,7 +147,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_frame_too_short() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let results = vec![Ok(Bytes::from(vec![0x00, 0x01]))]; let dap = TestDAP { results }; let retrieval = L1Retrieval::new(traversal, dap); @@ -159,7 +159,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_single_frame() { let data = new_encoded_test_frames(1); - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![Ok(data)] }; let retrieval = L1Retrieval::new(traversal, dap); let mut frame_queue = FrameQueue::new(retrieval); @@ -173,7 +173,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_frame_queue_multiple_frames() { let data = new_encoded_test_frames(3); - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![Ok(data)] }; let retrieval = L1Retrieval::new(traversal, dap); let mut frame_queue = FrameQueue::new(retrieval); diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 1dc41edd5..c54fc10b9 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -86,7 +86,7 @@ where mod tests { use super::*; use crate::{ - stages::l1_traversal::tests::new_test_traversal, + stages::l1_traversal::tests::*, traits::test_utils::{TestDAP, TestIter}, }; use alloc::vec; @@ -94,7 +94,7 @@ mod tests { #[tokio::test] async fn test_l1_retrieval_origin() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![] }; let retrieval = L1Retrieval::new(traversal, dap); let expected = BlockInfo::default(); @@ -103,7 +103,7 @@ mod tests { #[tokio::test] async fn test_l1_retrieval_next_data() { - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let results = vec![Err(StageError::Eof), Ok(Bytes::default())]; let dap = TestDAP { results }; let mut retrieval = L1Retrieval::new(traversal, dap); @@ -130,7 +130,7 @@ mod tests { // Create a new traversal with no blocks or receipts. // This would bubble up an error if the prev stage // (traversal) is called in the retrieval stage. - let traversal = new_test_traversal(false, false); + let traversal = new_test_traversal(vec![], vec![]); let dap = TestDAP { results: vec![] }; let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; let data = retrieval.next_data().await.unwrap(); @@ -146,7 +146,7 @@ mod tests { open_data_calls: vec![(BlockInfo::default(), Address::default())], results: vec![Err(StageError::Eof)], }; - let traversal = new_test_traversal(true, true); + let traversal = new_populated_test_traversal(); let dap = TestDAP { results: vec![] }; let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; let data = retrieval.next_data().await.unwrap_err(); diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 594f39a71..2dfe1f354 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -1,25 +1,30 @@ -//! Contains the L1 traversal stage of the derivation pipeline. +//! Contains the [L1Traversal] stage of the derivation pipeline. use crate::{ traits::{ChainProvider, ResettableStage}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; -use anyhow::anyhow; use async_trait::async_trait; -/// The L1 traversal stage of the derivation pipeline. +/// The [L1Traversal] stage of the derivation pipeline. +/// +/// This stage sits at the bottom of the pipeline, holding a handle to the data source +/// (a [ChainProvider] implementation) and the current L1 [BlockInfo] in the pipeline, +/// which are used to traverse the L1 chain. When the [L1Traversal] stage is advanced, +/// it fetches the next L1 [BlockInfo] from the data source and updates the [SystemConfig] +/// with the receipts from the block. #[derive(Debug, Clone)] pub struct L1Traversal { /// The current block in the traversal stage. pub(crate) block: Option, /// The data source for the traversal stage. data_source: Provider, - /// Signals whether or not the traversal stage has been completed. + /// Signals whether or not the traversal stage is complete. done: bool, - /// The system config + /// The system config. pub system_config: SystemConfig, - /// The rollup config + /// A reference to the rollup config. pub rollup_config: Arc, } @@ -40,9 +45,10 @@ impl L1Traversal { &self.data_source } - /// Returns the next L1 block in the traversal stage, if the stage has not been completed. - /// This function can only be called once, and will return `None` on subsequent calls - /// unless the stage is reset. + /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. + /// This function can only be called once while the stage is in progress, and will return + /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is + /// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned. pub fn next_l1_block(&mut self) -> StageResult> { if !self.done { self.done = true; @@ -52,35 +58,41 @@ impl L1Traversal { } } - /// Returns the current L1 block in the traversal stage, if it exists. + /// Returns the current L1 [BlockInfo] in the [L1Traversal] stage, if it exists. pub fn origin(&self) -> Option<&BlockInfo> { self.block.as_ref() } /// Advances the internal state of the [L1Traversal] stage to the next L1 block. + /// This function fetches the next L1 [BlockInfo] from the data source and updates the + /// [SystemConfig] with the receipts from the block. pub async fn advance_l1_block(&mut self) -> StageResult<()> { - // Pull the next block or return EOF which has special - // handling further up the pipeline. + // Pull the next block or return EOF. + // StageError::EOF has special handling further up the pipeline. let block = self.block.ok_or(StageError::Eof)?; - let next_l1_origin = self.data_source.block_info_by_number(block.number + 1).await?; + let next_l1_origin = match self.data_source.block_info_by_number(block.number + 1).await { + Ok(block) => block, + Err(e) => return Err(StageError::BlockInfoFetch(e)), + }; - // Check for reorgs + // Check block hashes for reorgs. if block.hash != next_l1_origin.parent_hash { - return Err(anyhow!( - "Detected L1 reorg from {} to {} with conflicting parent", - block.hash, - next_l1_origin.hash - ) - .into()); + return Err(StageError::ReorgDetected(block.hash, next_l1_origin.parent_hash)); } - // Fetch receipts. - let receipts = self.data_source.receipts_by_hash(next_l1_origin.hash).await?; - self.system_config.update_with_receipts( + // Fetch receipts for the next l1 block and update the system config. + let receipts = match self.data_source.receipts_by_hash(next_l1_origin.hash).await { + Ok(receipts) => receipts, + Err(e) => return Err(StageError::ReceiptFetch(e)), + }; + + if let Err(e) = self.system_config.update_with_receipts( receipts.as_slice(), &self.rollup_config, next_l1_origin.timestamp, - )?; + ) { + return Err(StageError::SystemConfigUpdate(e)); + } self.block = Some(next_l1_origin); self.done = false; @@ -126,38 +138,48 @@ pub(crate) mod tests { } } + pub(crate) fn new_receipts() -> alloc::vec::Vec { + let mut receipt = Receipt { success: true, ..Receipt::default() }; + let bad = Log::new( + Address::from([2; 20]), + vec![CONFIG_UPDATE_TOPIC, B256::default()], + Bytes::default(), + ) + .unwrap(); + receipt.logs = vec![new_update_batcher_log(), bad, new_update_batcher_log()]; + vec![receipt.clone(), Receipt::default(), receipt] + } + pub(crate) fn new_test_traversal( - blocks: bool, - receipts: bool, + blocks: alloc::vec::Vec, + receipts: alloc::vec::Vec, ) -> L1Traversal { let mut provider = TestChainProvider::default(); let rollup_config = RollupConfig { l1_system_config_address: L1_SYS_CONFIG_ADDR, ..RollupConfig::default() }; - let block = BlockInfo::default(); - if blocks { - provider.insert_block(0, block); - provider.insert_block(1, block); + for (i, block) in blocks.iter().enumerate() { + provider.insert_block(i as u64, *block); } - if receipts { - let mut receipt = Receipt { success: true, ..Receipt::default() }; - let bad = Log::new( - Address::from([2; 20]), - vec![CONFIG_UPDATE_TOPIC, B256::default()], - Bytes::default(), - ) - .unwrap(); - receipt.logs = vec![new_update_batcher_log(), bad, new_update_batcher_log()]; - let receipts = vec![receipt.clone(), Receipt::default(), receipt]; - provider.insert_receipts(block.hash, receipts); + for (i, receipt) in receipts.iter().enumerate() { + let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); + provider.insert_receipts(hash, vec![receipt.clone()]); } L1Traversal::new(provider, Arc::new(rollup_config)) } + pub(crate) fn new_populated_test_traversal() -> L1Traversal { + let blocks = vec![BlockInfo::default(), BlockInfo::default()]; + let receipts = new_receipts(); + new_test_traversal(blocks, receipts) + } + #[tokio::test] async fn test_l1_traversal() { - let mut traversal = new_test_traversal(true, true); + let blocks = vec![BlockInfo::default(), BlockInfo::default()]; + let receipts = new_receipts(); + let mut traversal = new_test_traversal(blocks, receipts); assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); assert!(traversal.advance_l1_block().await.is_ok()); @@ -165,23 +187,54 @@ pub(crate) mod tests { #[tokio::test] async fn test_l1_traversal_missing_receipts() { - let mut traversal = new_test_traversal(true, false); + let blocks = vec![BlockInfo::default(), BlockInfo::default()]; + let mut traversal = new_test_traversal(blocks, vec![]); assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::Custom(_)); + matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_)); + } + + #[tokio::test] + async fn test_l1_traversal_reorgs() { + let hash = b256!("3333333333333333333333333333333333333333333333333333333333333333"); + let block = BlockInfo { hash, ..BlockInfo::default() }; + let blocks = vec![block, block]; + let receipts = new_receipts(); + let mut traversal = new_test_traversal(blocks, receipts); + assert!(traversal.advance_l1_block().await.is_ok()); + let err = traversal.advance_l1_block().await.unwrap_err(); + assert_eq!(err, StageError::ReorgDetected(block.hash, block.parent_hash)); } #[tokio::test] async fn test_l1_traversal_missing_blocks() { - let mut traversal = new_test_traversal(false, false); + let mut traversal = new_test_traversal(vec![], vec![]); assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::Custom(_)); + matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_)); + } + + #[tokio::test] + async fn test_l1_traversal_system_config_update_fails() { + let first = b256!("3333333333333333333333333333333333333333333333333333333333333333"); + let second = b256!("4444444444444444444444444444444444444444444444444444444444444444"); + let block1 = BlockInfo { hash: first, ..BlockInfo::default() }; + let block2 = BlockInfo { hash: second, ..BlockInfo::default() }; + let blocks = vec![block1, block2]; + let receipts = new_receipts(); + let mut traversal = new_test_traversal(blocks, receipts); + assert!(traversal.advance_l1_block().await.is_ok()); + // Only the second block should fail since the second receipt + // contains invalid logs that will error for a system config update. + let err = traversal.advance_l1_block().await.unwrap_err(); + matches!(err, StageError::SystemConfigUpdate(_)); } #[tokio::test] - async fn test_system_config_updated() { - let mut traversal = new_test_traversal(true, true); + async fn test_l1_traversal_system_config_updated() { + let blocks = vec![BlockInfo::default(), BlockInfo::default()]; + let receipts = new_receipts(); + let mut traversal = new_test_traversal(blocks, receipts); assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); assert!(traversal.advance_l1_block().await.is_ok()); diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 1b97a6249..3184c5f9e 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -1,5 +1,6 @@ //! This module contains derivation errors thrown within the pipeline. +use alloy_primitives::B256; use core::fmt::Display; use super::SpanBatchError; @@ -12,16 +13,33 @@ pub enum StageError { /// There is not enough data progress, but if we wait, the stage will eventually return data /// or produce an EOF error. NotEnoughData, + /// The stage detected a block reorg. + /// The first argument is the expected block hash. + /// The second argument is the paren_hash of the next l1 origin block. + ReorgDetected(B256, B256), + /// Receipt fetching error. + ReceiptFetch(anyhow::Error), + /// [super::BlockInfo] fetching error. + BlockInfoFetch(anyhow::Error), + /// [super::SystemConfig] update error. + SystemConfigUpdate(anyhow::Error), /// Other wildcard error. Custom(anyhow::Error), } impl PartialEq for StageError { fn eq(&self, other: &StageError) -> bool { + // if it's a reorg detected check the block hashes + if let (StageError::ReorgDetected(a, b), StageError::ReorgDetected(c, d)) = (self, other) { + return a == c && b == d; + } matches!( (self, other), (StageError::Eof, StageError::Eof) | (StageError::NotEnoughData, StageError::NotEnoughData) | + (StageError::ReceiptFetch(_), StageError::ReceiptFetch(_)) | + (StageError::BlockInfoFetch(_), StageError::BlockInfoFetch(_)) | + (StageError::SystemConfigUpdate(_), StageError::SystemConfigUpdate(_)) | (StageError::Custom(_), StageError::Custom(_)) ) } @@ -41,6 +59,12 @@ impl Display for StageError { match self { StageError::Eof => write!(f, "End of file"), StageError::NotEnoughData => write!(f, "Not enough data"), + StageError::ReceiptFetch(e) => write!(f, "Receipt fetch error: {}", e), + StageError::SystemConfigUpdate(e) => write!(f, "System config update error: {}", e), + StageError::ReorgDetected(current, next) => { + write!(f, "Block reorg detected: {} -> {}", current, next) + } + StageError::BlockInfoFetch(e) => write!(f, "Block info fetch error: {}", e), StageError::Custom(e) => write!(f, "Custom error: {}", e), } }