diff --git a/crates/derive/src/errors.rs b/crates/derive/src/errors.rs index 330c66386..006285f3a 100644 --- a/crates/derive/src/errors.rs +++ b/crates/derive/src/errors.rs @@ -99,6 +99,9 @@ pub enum PipelineError { /// [L1Retrieval]: crate::stages::L1Retrieval #[error("L1 Retrieval missing data")] MissingL1Data, + /// Invalid batch type passed. + #[error("Invalid batch type passed to stage")] + InvalidBatchType, /// Invalid batch validity variant. #[error("Invalid batch validity")] InvalidBatchValidity, diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch/batch_queue.rs similarity index 96% rename from crates/derive/src/stages/batch_queue.rs rename to crates/derive/src/stages/batch/batch_queue.rs index d20340c5b..d691dca2e 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch/batch_queue.rs @@ -1,5 +1,6 @@ //! This module contains the `BatchQueue` stage implementation. +use super::NextBatchProvider; use crate::{ batch::{Batch, BatchValidity, BatchWithInclusionBlock, SingleBatch}, errors::{PipelineEncodingError, PipelineError, PipelineErrorKind, PipelineResult, ResetError}, @@ -14,26 +15,6 @@ use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use tracing::{error, info, warn}; -/// Provides [Batch]es for the [BatchQueue] stage. -#[async_trait] -pub trait BatchQueueProvider { - /// Returns the next [Batch] in the [ChannelReader] stage, if the stage is not complete. - /// This function can only be called once while the stage is in progress, and will return - /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is - /// complete and the batch has been consumed, an [PipelineError::Eof] error is returned. - /// - /// [ChannelReader]: crate::stages::ChannelReader - async fn next_batch( - &mut self, - parent: L2BlockInfo, - l1_origins: &[BlockInfo], - ) -> PipelineResult; - - /// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream] - /// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op. - fn flush(&mut self); -} - /// [BatchQueue] is responsible for o rdering unordered batches /// and gnerating empty batches when the sequence window has passed. /// @@ -51,7 +32,7 @@ pub trait BatchQueueProvider { #[derive(Debug)] pub struct BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { /// The rollup config. @@ -79,7 +60,7 @@ where impl BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. @@ -282,7 +263,7 @@ where #[async_trait] impl OriginAdvancer for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { @@ -293,7 +274,7 @@ where #[async_trait] impl AttributesProvider for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { /// Returns the next valid batch upon the given safe head. @@ -450,7 +431,7 @@ where impl OriginProvider for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option { @@ -461,7 +442,7 @@ where #[async_trait] impl SignalReceiver for BatchQueue where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, { async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { @@ -509,7 +490,7 @@ mod tests { fn new_batch_reader() -> BatchReader { let file_contents = - alloc::string::String::from_utf8_lossy(include_bytes!("../../testdata/batch.hex")); + alloc::string::String::from_utf8_lossy(include_bytes!("../../../testdata/batch.hex")); let file_contents = &(&*file_contents)[..file_contents.len() - 1]; let data = alloy_primitives::hex::decode(file_contents).unwrap(); let bytes: alloy_primitives::Bytes = data.into(); diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch/batch_stream.rs similarity index 97% rename from crates/derive/src/stages/batch_stream.rs rename to crates/derive/src/stages/batch/batch_stream.rs index 6d7bcca05..504f1fda0 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch/batch_stream.rs @@ -4,7 +4,7 @@ use crate::{ batch::{Batch, BatchValidity, BatchWithInclusionBlock, SingleBatch, SpanBatch}, errors::{PipelineEncodingError, PipelineError, PipelineResult}, pipeline::L2ChainProvider, - stages::BatchQueueProvider, + stages::NextBatchProvider, traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -100,7 +100,7 @@ where } #[async_trait] -impl BatchQueueProvider for BatchStream +impl NextBatchProvider for BatchStream where P: BatchStreamProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, BF: L2ChainProvider + Send + Debug, @@ -113,6 +113,10 @@ where } } + fn span_buffer_size(&self) -> usize { + self.buffer.len() + } + async fn next_batch( &mut self, parent: L2BlockInfo, @@ -334,7 +338,7 @@ mod test { let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err(); assert_eq!(err, PipelineError::Eof.temp()); - assert_eq!(stream.buffer.len(), 0); + assert_eq!(stream.span_buffer_size(), 0); assert!(stream.span.is_none()); // Add more data into the provider, see if the buffer is re-hydrated. @@ -359,7 +363,7 @@ mod test { let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err(); assert_eq!(err, PipelineError::Eof.temp()); - assert_eq!(stream.buffer.len(), 0); + assert_eq!(stream.span_buffer_size(), 0); assert!(stream.span.is_none()); } @@ -376,7 +380,7 @@ mod test { // The next batch should be passed through to the [BatchQueue] stage. let batch = stream.next_batch(Default::default(), &[]).await.unwrap(); assert!(matches!(batch, Batch::Single(_))); - assert_eq!(stream.buffer.len(), 0); + assert_eq!(stream.span_buffer_size(), 0); assert!(stream.span.is_none()); } } diff --git a/crates/derive/src/stages/batch/batch_validator.rs b/crates/derive/src/stages/batch/batch_validator.rs new file mode 100644 index 000000000..dbe266511 --- /dev/null +++ b/crates/derive/src/stages/batch/batch_validator.rs @@ -0,0 +1,622 @@ +//! Contains the [BatchValidator] stage. + +use super::NextBatchProvider; +use crate::{ + batch::{Batch, BatchValidity, SingleBatch}, + errors::ResetError, + pipeline::{OriginAdvancer, PipelineResult, Signal, SignalReceiver}, + prelude::{OriginProvider, PipelineError, PipelineErrorKind}, + stages::AttributesProvider, + traits::ResetSignal, +}; +use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use async_trait::async_trait; +use core::fmt::Debug; +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; +use tracing::{debug, error, info, warn}; + +/// The [BatchValidator] stage is responsible for validating the [SingleBatch]es from +/// the [BatchStream] [AttributesQueue]'s consumption. +/// +/// [BatchStream]: crate::stages::BatchStream +/// [AttributesQueue]: crate::stages::attributes_queue::AttributesQueue +#[derive(Debug)] +pub struct BatchValidator

+where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + /// The rollup configuration. + cfg: Arc, + /// The previous stage of the derivation pipeline. + prev: P, + /// The L1 origin of the batch sequencer. + origin: Option, + /// A consecutive, time-centric window of L1 Blocks. + /// Every L1 origin of unsafe L2 Blocks must be included in this list. + /// If every L2 Block corresponding to a single L1 Block becomes safe, + /// the block is popped from this list. + /// If new L2 Block's L1 origin is not included in this list, fetch and + /// push it to the list. + l1_blocks: Vec, +} + +impl

BatchValidator

+where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + /// Create a new [BatchValidator] stage. + pub const fn new(cfg: Arc, prev: P) -> Self { + Self { cfg, prev, origin: None, l1_blocks: Vec::new() } + } + + /// Returns `true` if the pipeline origin is behind the parent origin. + /// + /// ## Takes + /// - `parent`: The parent block of the current batch. + /// + /// ## Returns + /// - `true` if the origin is behind the parent origin. + fn origin_behind(&self, parent: &L2BlockInfo) -> bool { + self.prev.origin().map_or(true, |origin| origin.number < parent.l1_origin.number) + } + + /// Updates the [BatchValidator]'s view of the L1 origin blocks. + /// + /// ## Takes + /// - `parent`: The parent block of the current batch. + /// + /// ## Returns + /// - `Ok(())` if the update was successful. + /// - `Err(PipelineError)` if the update failed. + pub(crate) fn update_origins(&mut self, parent: &L2BlockInfo) -> PipelineResult<()> { + // NOTE: The origin is used to determine if it's behind. + // It is the future origin that gets saved into the l1 blocks array. + // We always update the origin of this stage if it's not the same so + // after the update code runs, this is consistent. + let origin_behind = self.origin_behind(parent); + + // Advance the origin if needed. + // The entire pipeline has the same origin. + // Batches prior to the l1 origin of the l2 safe head are not accepted. + if self.origin != self.prev.origin() { + self.origin = self.prev.origin(); + if !origin_behind { + let origin = self.origin.as_ref().ok_or(PipelineError::MissingOrigin.crit())?; + self.l1_blocks.push(*origin); + } else { + // This is to handle the special case of startup. + // At startup, the batch validator is reset and includes the + // l1 origin. That is the only time when immediately after + // reset is called, the origin behind is false. + self.l1_blocks.clear(); + } + debug!( + target: "batch-validator", + "Advancing batch validator origin to L1 block #{}.{}", + self.origin.map(|b| b.number).unwrap_or_default(), + origin_behind.then_some(" (origin behind)").unwrap_or_default() + ); + } + + // If the epoch is advanced, update the l1 blocks. + // Advancing epoch must be done after the pipeline successfully applies the entire span + // batch to the chain. + // Because the span batch can be reverted during processing the batch, then we must + // preserve existing l1 blocks to verify the epochs of the next candidate batch. + if !self.l1_blocks.is_empty() && parent.l1_origin.number > self.l1_blocks[0].number { + for (i, block) in self.l1_blocks.iter().enumerate() { + if parent.l1_origin.number == block.number { + self.l1_blocks.drain(0..i); + debug!(target: "batch-validator", "Advancing internal L1 epoch"); + break; + } + } + // If the origin of the parent block is not included, we must advance the origin. + } + + Ok(()) + } + + /// Attempts to derive an empty batch, if the sequencing window is expired. + /// + /// ## Takes + /// - `parent`: The parent block of the current batch. + /// + /// ## Returns + /// - `Ok(SingleBatch)` if an empty batch was derived. + /// - `Err(PipelineError)` if an empty batch could not be derived. + pub(crate) fn try_derive_empty_batch( + &mut self, + parent: &L2BlockInfo, + ) -> PipelineResult { + let epoch = self.l1_blocks[0]; + + // If the current epoch is too old compared to the L1 block we are at, + // i.e. if the sequence window expired, we create empty batches for the current epoch + let stage_origin = self.origin.ok_or(PipelineError::MissingOrigin.crit())?; + let expiry_epoch = epoch.number + self.cfg.seq_window_size; + let force_empty_batches = expiry_epoch <= stage_origin.number; + let first_of_epoch = epoch.number == parent.l1_origin.number + 1; + let next_timestamp = epoch.timestamp + self.cfg.block_time; + + // If the sequencer window did not expire, + // there is still room to receive batches for the current epoch. + // No need to force-create empty batch(es) towards the next epoch yet. + if !force_empty_batches { + return Err(PipelineError::Eof.temp()); + } + + // The next L1 block is needed to proceed towards the next epoch. + if self.l1_blocks.len() < 2 { + return Err(PipelineError::Eof.temp()); + } + + let next_epoch = self.l1_blocks[1]; + + // Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin, + // to preserve that L2 time >= L1 time. If this is the first block of the epoch, always + // generate a batch to ensure that we at least have one batch per epoch. + if next_timestamp < next_epoch.timestamp || first_of_epoch { + info!(target: "batch-validator", "Generating empty batch for epoch #{}", epoch.number); + return Ok(SingleBatch { + parent_hash: parent.block_info.hash, + epoch_num: epoch.number, + epoch_hash: epoch.hash, + timestamp: next_timestamp, + transactions: Vec::new(), + }); + } + + // At this point we have auto generated every batch for the current epoch + // that we can, so we can advance to the next epoch. + debug!( + target: "batch-validator", + "Advancing batch validator epoch: {}, timestamp: {}, epoch timestamp: {}", + next_epoch.number, next_timestamp, next_epoch.timestamp + ); + self.l1_blocks.remove(0); + Err(PipelineError::Eof.temp()) + } +} + +#[async_trait] +impl

AttributesProvider for BatchValidator

+where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, +{ + async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult { + // Update the L1 origin blocks within the stage. + self.update_origins(&parent)?; + + // If the origin is behind, we must drain previous stages to catch up. + let stage_origin = self.origin.ok_or(PipelineError::MissingOrigin.crit())?; + if self.origin_behind(&parent) || parent.l1_origin.number == stage_origin.number { + self.prev.next_batch(parent, self.l1_blocks.as_ref()).await?; + return Err(PipelineError::NotEnoughData.temp()); + } + + // At least the L1 origin of the safe block and the L1 origin of the following block must + // be included in the l1 blocks. + if self.l1_blocks.len() < 2 { + return Err(PipelineError::MissingOrigin.crit()); + } + + // Note: epoch origin can now be one block ahead of the L2 Safe Head + // This is in the case where we auto generate all batches in an epoch & advance the epoch + // but don't advance the L2 Safe Head's epoch + let epoch = self.l1_blocks[0]; + if parent.l1_origin != epoch.id() && parent.l1_origin.number != epoch.number - 1 { + return Err(PipelineErrorKind::Reset(ResetError::L1OriginMismatch( + parent.l1_origin.number, + epoch.number - 1, + ))); + } + + // Pull the next batch from the previous stage. + let next_batch = match self.prev.next_batch(parent, self.l1_blocks.as_ref()).await { + Ok(batch) => batch, + Err(PipelineErrorKind::Temporary(PipelineError::Eof)) => { + return self.try_derive_empty_batch(&parent); + } + Err(e) => { + return Err(e); + } + }; + + // The batch must be a single batch - this stage does not support span batches. + let Batch::Single(next_batch) = next_batch else { + error!( + target: "batch-validator", + "BatchValidator received a batch that is not a SingleBatch" + ); + return Err(PipelineError::InvalidBatchType.crit()); + }; + + // Check the validity of the single batch before forwarding it. + match next_batch.check_batch( + self.cfg.as_ref(), + self.l1_blocks.as_ref(), + parent, + &stage_origin, + ) { + BatchValidity::Accept => { + info!(target: "batch-validator", "Found next batch (epoch #{})", next_batch.epoch_num); + Ok(next_batch) + } + BatchValidity::Past => { + warn!(target: "batch-validator", "Dropping old batch"); + Err(PipelineError::NotEnoughData.temp()) + } + BatchValidity::Drop => { + warn!(target: "batch-validator", "Invalid singular batch, flushing current channel."); + self.prev.flush(); + Err(PipelineError::NotEnoughData.temp()) + } + BatchValidity::Undecided => Err(PipelineError::NotEnoughData.temp()), + BatchValidity::Future => { + error!(target: "batch-validator", "Future batch detected in BatchValidator."); + Err(PipelineError::InvalidBatchValidity.crit()) + } + } + } + + fn is_last_in_span(&self) -> bool { + self.prev.span_buffer_size() == 0 + } +} + +impl

OriginProvider for BatchValidator

+where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + fn origin(&self) -> Option { + self.prev.origin() + } +} + +#[async_trait] +impl

OriginAdvancer for BatchValidator

+where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, +{ + async fn advance_origin(&mut self) -> PipelineResult<()> { + self.prev.advance_origin().await + } +} + +#[async_trait] +impl

SignalReceiver for BatchValidator

+where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, +{ + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + s @ Signal::Reset(ResetSignal { l1_origin, .. }) => { + self.prev.signal(s).await?; + self.origin = Some(l1_origin); + // Include the new origin as an origin to build on. + // This is only for the initialization case. + // During normal resets we will later throw out this block. + self.l1_blocks.clear(); + self.l1_blocks.push(l1_origin); + crate::inc!(STAGE_RESETS, &["batch-validator"]); + } + s @ Signal::Activation(_) | s @ Signal::FlushChannel => { + self.prev.signal(s).await?; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::BatchValidator; + use crate::{ + batch::{Batch, SingleBatch, SpanBatch}, + errors::{PipelineErrorKind, ResetError}, + pipeline::{PipelineResult, SignalReceiver}, + prelude::PipelineError, + stages::{AttributesProvider, NextBatchProvider}, + test_utils::{CollectingLayer, TestBatchQueueProvider, TraceStorage}, + traits::{OriginAdvancer, ResetSignal, Signal}, + }; + use alloc::sync::Arc; + use alloy_eips::{BlockNumHash, NumHash}; + use alloy_primitives::B256; + use op_alloy_genesis::RollupConfig; + use op_alloy_protocol::{BlockInfo, L2BlockInfo}; + use tracing::Level; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + + #[tokio::test] + async fn test_batch_validator_origin_behind_eof() { + let cfg = Arc::new(RollupConfig::default()); + let mut mock = TestBatchQueueProvider::new(vec![]); + mock.origin = Some(BlockInfo::default()); + let mut bv = BatchValidator::new(cfg, mock); + bv.origin = Some(BlockInfo { number: 1, ..Default::default() }); + + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 5, ..Default::default() }, + ..Default::default() + }; + assert_eq!(bv.next_batch(mock_parent).await.unwrap_err(), PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_batch_validator_origin_behind_startup() { + let cfg = Arc::new(RollupConfig::default()); + let mut mock = TestBatchQueueProvider::new(vec![]); + mock.origin = Some(BlockInfo::default()); + let mut bv = BatchValidator::new(cfg, mock); + + // Reset the pipeline to add the L1 origin to the stage. + bv.signal(Signal::Reset(ResetSignal { + l1_origin: BlockInfo { number: 1, ..Default::default() }, + l2_safe_head: L2BlockInfo::new( + BlockInfo::default(), + NumHash::new(1, Default::default()), + 0, + ), + system_config: None, + })) + .await + .unwrap(); + + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 2, ..Default::default() }, + ..Default::default() + }; + assert_eq!(bv.l1_blocks.len(), 1); + bv.update_origins(&mock_parent).unwrap(); + assert_eq!(bv.l1_blocks.len(), 0); + } + + #[tokio::test] + async fn test_batch_validator_origin_behind_advance() { + let cfg = Arc::new(RollupConfig::default()); + let mut mock = TestBatchQueueProvider::new(vec![]); + mock.origin = Some(BlockInfo { number: 2, ..Default::default() }); + let mut bv = BatchValidator::new(cfg, mock); + + // Reset the pipeline to add the L1 origin to the stage. + bv.signal(Signal::Reset(ResetSignal { + l1_origin: BlockInfo { number: 1, ..Default::default() }, + l2_safe_head: L2BlockInfo::new( + BlockInfo::default(), + NumHash::new(1, Default::default()), + 0, + ), + system_config: None, + })) + .await + .unwrap(); + + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 1, ..Default::default() }, + ..Default::default() + }; + assert_eq!(bv.l1_blocks.len(), 1); + bv.update_origins(&mock_parent).unwrap(); + assert_eq!(bv.l1_blocks.len(), 2); + } + + #[tokio::test] + async fn test_batch_validator_advance_epoch() { + let cfg = Arc::new(RollupConfig::default()); + let mut mock = TestBatchQueueProvider::new(vec![]); + mock.origin = Some(BlockInfo { number: 2, ..Default::default() }); + let mut bv = BatchValidator::new(cfg, mock); + + // Reset the pipeline to add the L1 origin to the stage. + bv.signal(Signal::Reset(ResetSignal { + l1_origin: BlockInfo { number: 1, ..Default::default() }, + l2_safe_head: L2BlockInfo::new( + BlockInfo::default(), + NumHash::new(1, Default::default()), + 0, + ), + system_config: None, + })) + .await + .unwrap(); + + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 2, ..Default::default() }, + ..Default::default() + }; + assert_eq!(bv.l1_blocks.len(), 1); + assert_eq!(bv.l1_blocks[0].number, 1); + assert_eq!(bv.next_batch(mock_parent).await.unwrap_err(), PipelineError::Eof.temp()); + assert_eq!(bv.l1_blocks.len(), 1); + assert_eq!(bv.l1_blocks[0].number, 2); + } + + #[tokio::test] + async fn test_batch_validator_origin_behind_drain_prev() { + let cfg = Arc::new(RollupConfig::default()); + let mut mock = TestBatchQueueProvider::new( + (0..5).map(|_| Ok(Batch::Single(SingleBatch::default()))).collect(), + ); + mock.origin = Some(BlockInfo::default()); + let mut bv = BatchValidator::new(cfg, mock); + bv.origin = Some(BlockInfo::default()); + + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 5, ..Default::default() }, + ..Default::default() + }; + assert_eq!(bv.prev.span_buffer_size(), 5); + for i in 0..5 { + assert_eq!( + bv.next_batch(mock_parent).await.unwrap_err(), + PipelineError::NotEnoughData.temp() + ); + assert_eq!(bv.prev.span_buffer_size(), 4 - i); + } + assert_eq!(bv.next_batch(mock_parent).await.unwrap_err(), PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_batch_validator_l1_origin_mismatch() { + let cfg = Arc::new(RollupConfig::default()); + let mut mock = TestBatchQueueProvider::new(vec![Ok(Batch::Single(SingleBatch::default()))]); + mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); + let mut bv = BatchValidator::new(cfg, mock); + bv.origin = Some(BlockInfo::default()); + bv.l1_blocks.push(BlockInfo::default()); + + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 0, hash: [0xFF; 32].into() }, + ..Default::default() + }; + + assert!(matches!( + bv.next_batch(mock_parent).await.unwrap_err(), + PipelineErrorKind::Reset(ResetError::L1OriginMismatch(_, _)) + )); + } + + #[tokio::test] + async fn test_batch_validator_received_span_batch() { + let cfg = Arc::new(RollupConfig::default()); + let mut mock = TestBatchQueueProvider::new(vec![Ok(Batch::Span(SpanBatch::default()))]); + mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); + let mut bv = BatchValidator::new(cfg, mock); + bv.origin = Some(BlockInfo::default()); + bv.l1_blocks.push(BlockInfo::default()); + + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 0, ..Default::default() }, + ..Default::default() + }; + + assert_eq!( + bv.next_batch(mock_parent).await.unwrap_err(), + PipelineError::InvalidBatchType.crit() + ); + assert_eq!(bv.next_batch(mock_parent).await.unwrap_err(), PipelineError::Eof.temp()); + } + + #[tokio::test] + async fn test_batch_validator_next_batch_valid() { + let cfg = Arc::new(RollupConfig { + holocene_time: Some(0), + block_time: 2, + max_sequencer_drift: 700, + ..Default::default() + }); + assert!(cfg.is_holocene_active(0)); + let batch = SingleBatch { + parent_hash: B256::default(), + epoch_num: 2, + epoch_hash: B256::default(), + timestamp: 4, + transactions: Vec::new(), + }; + let parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 0, ..Default::default() }, + block_info: BlockInfo { timestamp: 2, ..Default::default() }, + ..Default::default() + }; + + // Setup batch validator deps + let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; + let mut mock = TestBatchQueueProvider::new(batch_vec); + mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); + + // Configure batch validator + let mut bv = BatchValidator::new(cfg, mock); + + // Reset the pipeline to add the L1 origin to the stage. + bv.signal(Signal::Reset(ResetSignal { + l1_origin: BlockInfo { number: 1, ..Default::default() }, + ..Default::default() + })) + .await + .unwrap(); + bv.l1_blocks.push(BlockInfo { number: 1, ..Default::default() }); + + // Grab the next batch. + let produced_batch = bv.next_batch(parent).await.unwrap(); + assert_eq!(batch, produced_batch); + } + + #[tokio::test] + async fn test_batch_validator_next_batch_sequence_window_expired() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + let cfg = Arc::new(RollupConfig { seq_window_size: 5, ..Default::default() }); + let mut mock = TestBatchQueueProvider::new(vec![]); + mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); + let mut bv = BatchValidator::new(cfg, mock); + + // Reset the pipeline to add the L1 origin to the stage. + bv.signal(Signal::Reset(ResetSignal { + l1_origin: BlockInfo { number: 1, ..Default::default() }, + ..Default::default() + })) + .await + .unwrap(); + + // Advance the origin of the previous stage to block #6. + for _ in 0..6 { + bv.advance_origin().await.unwrap(); + } + + // The sequence window is expired, so we should generate an empty batch. + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 0, ..Default::default() }, + ..Default::default() + }; + assert!(bv.next_batch(mock_parent).await.unwrap().transactions.is_empty()); + + let trace_lock = trace_store.lock(); + assert_eq!(trace_lock.iter().filter(|(l, _)| matches!(l, &Level::DEBUG)).count(), 1); + assert_eq!(trace_lock.iter().filter(|(l, _)| matches!(l, &Level::INFO)).count(), 1); + assert!(trace_lock[0].1.contains("Advancing batch validator origin")); + assert!(trace_lock[1].1.contains("Generating empty batch for epoch")); + } + + #[tokio::test] + async fn test_batch_validator_next_batch_sequence_window_expired_advance_epoch() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + let cfg = Arc::new(RollupConfig { seq_window_size: 5, ..Default::default() }); + let mut mock = TestBatchQueueProvider::new(vec![]); + mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); + let mut bv = BatchValidator::new(cfg, mock); + + // Reset the pipeline to add the L1 origin to the stage. + bv.signal(Signal::Reset(ResetSignal { + l1_origin: BlockInfo { number: 1, ..Default::default() }, + ..Default::default() + })) + .await + .unwrap(); + + // Advance the origin of the previous stage to block #6. + for _ in 0..6 { + bv.advance_origin().await.unwrap(); + } + + // The sequence window is expired, so we should generate an empty batch. + let mock_parent = L2BlockInfo { + l1_origin: BlockNumHash { number: 1, ..Default::default() }, + ..Default::default() + }; + assert_eq!(bv.next_batch(mock_parent).await.unwrap_err(), PipelineError::Eof.temp()); + + let trace_lock = trace_store.lock(); + dbg!(&trace_lock); + assert_eq!(trace_lock.iter().filter(|(l, _)| matches!(l, &Level::DEBUG)).count(), 2); + assert!(trace_lock[0].1.contains("Advancing batch validator origin")); + assert!(trace_lock[1].1.contains("Advancing batch validator epoch")); + } +} diff --git a/crates/derive/src/stages/batch/mod.rs b/crates/derive/src/stages/batch/mod.rs new file mode 100644 index 000000000..f6e7a01ba --- /dev/null +++ b/crates/derive/src/stages/batch/mod.rs @@ -0,0 +1,54 @@ +//! Contains stages pertaining to the processing of [Batch]es. +//! +//! Sitting after the [ChannelReader] stage, the [BatchStream] and [BatchQueue] stages are +//! responsible for validating and ordering the [Batch]es. The [BatchStream] stage is responsible +//! for streaming [SingleBatch]es from [SpanBatch]es, while the [BatchQueue] stage is responsible +//! for ordering the [Batch]es for the [AttributesQueue] stage. +//! +//! [Batch]: crate::batch::Batch +//! [SingleBatch]: crate::batch::SingleBatch +//! [SpanBatch]: crate::batch::SpanBatch +//! [ChannelReader]: crate::stages::channel::ChannelReader +//! [AttributesQueue]: crate::stages::attributes_queue::AttributesQueue + +use crate::{batch::Batch, pipeline::PipelineResult}; +use alloc::boxed::Box; +use async_trait::async_trait; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; + +mod batch_stream; +pub use batch_stream::{BatchStream, BatchStreamProvider}; + +mod batch_queue; +pub use batch_queue::BatchQueue; + +mod batch_validator; +pub use batch_validator::BatchValidator; + +/// Provides [Batch]es for the [BatchQueue] and [BatchValidator] stages. +#[async_trait] +pub trait NextBatchProvider { + /// Returns the next [Batch] in the [ChannelReader] stage, if the stage is not complete. + /// This function can only be called once while the stage is in progress, and will return + /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is + /// complete and the batch has been consumed, an [PipelineError::Eof] error is returned. + /// + /// [ChannelReader]: crate::stages::ChannelReader + /// [PipelineError::Eof]: crate::errors::PipelineError::Eof + async fn next_batch( + &mut self, + parent: L2BlockInfo, + l1_origins: &[BlockInfo], + ) -> PipelineResult; + + /// Returns the number of [SingleBatch]es that are currently buffered in the [BatchStream] + /// from a [SpanBatch]. + /// + /// [SpanBatch]: crate::batch::SpanBatch + /// [SingleBatch]: crate::batch::SingleBatch + fn span_buffer_size(&self) -> usize; + + /// Allows the stage to flush the buffer in the [crate::stages::BatchStream] + /// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op. + fn flush(&mut self); +} diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index c46c40988..185a35d98 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -30,11 +30,8 @@ pub use channel::{ NextFrameProvider, }; -mod batch_stream; -pub use batch_stream::{BatchStream, BatchStreamProvider}; - -mod batch_queue; -pub use batch_queue::{BatchQueue, BatchQueueProvider}; +mod batch; +pub use batch::{BatchQueue, BatchStream, BatchStreamProvider, BatchValidator, NextBatchProvider}; mod attributes_queue; pub use attributes_queue::{AttributesProvider, AttributesQueue}; diff --git a/crates/derive/src/test_utils/batch_queue.rs b/crates/derive/src/test_utils/batch_queue.rs index e0319c3a4..396222a51 100644 --- a/crates/derive/src/test_utils/batch_queue.rs +++ b/crates/derive/src/test_utils/batch_queue.rs @@ -3,7 +3,7 @@ use crate::{ batch::Batch, errors::{PipelineError, PipelineResult}, - stages::BatchQueueProvider, + stages::NextBatchProvider, traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; use alloc::{boxed::Box, vec::Vec}; @@ -37,11 +37,15 @@ impl OriginProvider for TestBatchQueueProvider { } #[async_trait] -impl BatchQueueProvider for TestBatchQueueProvider { +impl NextBatchProvider for TestBatchQueueProvider { fn flush(&mut self) { self.flushed = true; } + fn span_buffer_size(&self) -> usize { + self.batches.len() + } + async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult { self.batches.pop().ok_or(PipelineError::Eof.temp())? } @@ -50,6 +54,10 @@ impl BatchQueueProvider for TestBatchQueueProvider { #[async_trait] impl OriginAdvancer for TestBatchQueueProvider { async fn advance_origin(&mut self) -> PipelineResult<()> { + self.origin = self.origin.map(|mut origin| { + origin.number += 1; + origin + }); Ok(()) } }