diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 4950fc8fd..4771a1b37 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,11 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - stages::batch_queue::BatchQueue, - traits::{ - ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, SafeBlockFetcher, - TelemetryProvider, - }, + traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider}, types::{ AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -19,13 +15,23 @@ use core::fmt::Debug; pub trait AttributesBuilder { /// Prepare the payload attributes. fn prepare_payload_attributes( - &self, + &mut self, l2_parent: L2BlockInfo, epoch: BlockID, ) -> anyhow::Result; } -/// [AttributesQueue] accepts batches from the [super::BatchQueue] stage +/// [AttributesProvider] is a trait abstraction that generalizes the [BatchQueue] stage. +#[async_trait] +pub trait AttributesProvider { + /// Returns the next valid batch upon the given safe head. + async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult; + + /// Returns whether the current batch is the last in its span. + fn is_last_in_span(&self) -> bool; +} + +/// [AttributesQueue] accepts batches from the [BatchQueue] stage /// and transforms them into [PayloadAttributes]. The outputted payload /// attributes cannot be buffered because each batch->attributes transformation /// pulls in data about the current L2 safe head. @@ -36,18 +42,16 @@ pub trait AttributesBuilder { /// This stage can be reset by clearing its batch buffer. /// This stage does not need to retain any references to L1 blocks. #[derive(Debug)] -pub struct AttributesQueue +pub struct AttributesQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, - BF: SafeBlockFetcher + Debug, + P: AttributesProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. cfg: RollupConfig, /// The previous stage of the derivation pipeline. - prev: BatchQueue, + prev: P, /// Telemetry provider. telemetry: T, /// Whether the current batch is the last in its span. @@ -58,21 +62,14 @@ where builder: AB, } -impl AttributesQueue +impl AttributesQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, - BF: SafeBlockFetcher + Debug, + P: AttributesProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub fn new( - cfg: RollupConfig, - prev: BatchQueue, - telemetry: T, - builder: AB, - ) -> Self { + pub fn new(cfg: RollupConfig, prev: P, telemetry: T, builder: AB) -> Self { Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder } } @@ -81,7 +78,7 @@ where self.prev.origin() } - /// Loads a batch from the previous stage if needed. + /// Loads a [SingleBatch] from the [AttributesProvider] if needed. pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult { if self.batch.is_none() { let batch = self.prev.next_batch(parent).await?; @@ -91,29 +88,27 @@ where self.batch.as_ref().cloned().ok_or(StageError::Eof) } - /// Returns the next payload attributes from the current batch. + /// Returns the next [AttributesWithParent] from the current batch. pub async fn next_attributes( &mut self, parent: L2BlockInfo, ) -> StageResult { - // Load the batch + // Load the batch. let batch = self.load_batch(parent).await?; - // Construct the payload attributes from the loaded batch + // Construct the payload attributes from the loaded batch. let attributes = self.create_next_attributes(batch, parent).await?; let populated_attributes = AttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span }; - // Clear out the local state once we will succeed + // Clear out the local state once payload attributes are prepared. self.batch = None; self.is_last_in_span = false; Ok(populated_attributes) } - /// Creates the next attributes. - /// Transforms a [SingleBatch] into [PayloadAttributes]. - /// This sets `NoTxPool` and appends the batched transactions to the attributes transaction - /// list. + /// Creates the next attributes, transforming a [SingleBatch] into [PayloadAttributes]. + /// This sets `no_tx_pool` and appends the batched txs to the attributes tx list. pub async fn create_next_attributes( &mut self, batch: SingleBatch, @@ -135,7 +130,10 @@ where // Prepare the payload attributes let tx_count = batch.transactions.len(); - let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch())?; + let mut attributes = self + .builder + .prepare_payload_attributes(parent, batch.epoch()) + .map_err(StageError::AttributesBuild)?; attributes.no_tx_pool = true; attributes.transactions.extend(batch.transactions); @@ -153,19 +151,178 @@ where } #[async_trait] -impl ResettableStage for AttributesQueue +impl ResettableStage for AttributesQueue where - DAP: DataAvailabilityProvider + Send + Debug, - CP: ChainProvider + Send + Debug, - BF: SafeBlockFetcher + Send + Debug, + P: AttributesProvider + OriginProvider + Send + Debug, T: TelemetryProvider + Send + Debug, AB: AttributesBuilder + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info); - // TODO: metrice the reset + // TODO: metrice the reset using telemetry + // telemetry can provide a method of logging and metricing self.batch = None; self.is_last_in_span = false; Err(StageError::Eof) } } + +#[cfg(test)] +mod tests { + use super::{ + AttributesQueue, AttributesWithParent, BlockInfo, L2BlockInfo, PayloadAttributes, + RollupConfig, SingleBatch, StageError, StageResult, + }; + use crate::{ + stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue}, + traits::test_utils::TestTelemetry, + types::RawTransaction, + }; + use alloc::{vec, vec::Vec}; + use alloy_primitives::b256; + + fn new_attributes_queue( + cfg: Option, + origin: Option, + batches: Vec>, + ) -> AttributesQueue { + let cfg = cfg.unwrap_or_default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(origin, batches); + let mock_attributes_builder = MockAttributesBuilder::default(); + AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder) + } + + #[tokio::test] + async fn test_load_batch_eof() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.load_batch(parent).await.unwrap_err(); + assert_eq!(result, StageError::Eof); + } + + #[tokio::test] + async fn test_load_batch_last_in_span() { + let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.load_batch(parent).await.unwrap(); + assert_eq!(result, Default::default()); + assert!(attributes_queue.is_last_in_span); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_parent_hash() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666"); + let parent = L2BlockInfo { + block_info: BlockInfo { hash: bad_hash, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch::default(); + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!( + result, + StageError::Reset(super::ResetError::BadParentHash(Default::default(), bad_hash)) + ); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_timestamp() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 0))); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_parent_timestamp() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo { + block_info: BlockInfo { timestamp: 2, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2))); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_config_timestamp() { + let cfg = RollupConfig { block_time: 1, ..Default::default() }; + let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]); + let parent = L2BlockInfo { + block_info: BlockInfo { timestamp: 1, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2))); + } + + #[tokio::test] + async fn test_create_next_attributes_preparation_fails() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let batch = SingleBatch::default(); + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!( + result, + StageError::AttributesBuild(anyhow::anyhow!("missing payload attribute")) + ); + } + + #[tokio::test] + async fn test_create_next_attributes_success() { + let cfg = RollupConfig::default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(None, vec![]); + let mut payload_attributes = PayloadAttributes::default(); + let mock_builder = + MockAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; + let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + let parent = L2BlockInfo::default(); + let txs = vec![RawTransaction::default(), RawTransaction::default()]; + let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; + let attributes = aq.create_next_attributes(batch, parent).await.unwrap(); + // update the expected attributes + payload_attributes.no_tx_pool = true; + payload_attributes.transactions.extend(txs); + assert_eq!(attributes, payload_attributes); + } + + #[tokio::test] + async fn test_next_attributes_load_batch_eof() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.next_attributes(parent).await.unwrap_err(); + assert_eq!(result, StageError::Eof); + } + + #[tokio::test] + async fn test_next_attributes_load_batch_last_in_span() { + let cfg = RollupConfig::default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(None, vec![Ok(Default::default())]); + let mut pa = PayloadAttributes::default(); + let mock_builder = MockAttributesBuilder { attributes: vec![Ok(pa.clone())] }; + let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + // If we load the batch, we should get the last in span. + // But it won't take it so it will be available in the next_attributes call. + let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); + assert!(aq.is_last_in_span); + assert!(aq.batch.is_some()); + // This should successfully construct the next payload attributes. + // It should also reset the last in span flag and clear the batch. + let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap(); + pa.no_tx_pool = true; + let populated_attributes = AttributesWithParent { + attributes: pa, + parent: L2BlockInfo::default(), + is_last_in_span: true, + }; + assert_eq!(attributes, populated_attributes); + assert!(!aq.is_last_in_span); + assert!(aq.batch.is_none()); + } +} diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index aef5fa27d..d998b075f 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -33,3 +33,6 @@ pub use batch_queue::BatchQueue; mod attributes_queue; pub use attributes_queue::AttributesQueue; + +#[cfg(test)] +pub mod test_utils; diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs new file mode 100644 index 000000000..cb36f3838 --- /dev/null +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -0,0 +1,25 @@ +//! Testing utilities for the attributes queue stage. + +use crate::{ + stages::attributes_queue::AttributesBuilder, + types::{BlockID, L2BlockInfo, PayloadAttributes}, +}; +use alloc::vec::Vec; + +/// A mock implementation of the [`AttributesBuilder`] for testing. +#[derive(Debug, Default)] +pub struct MockAttributesBuilder { + /// The attributes to return. + pub attributes: Vec>, +} + +impl AttributesBuilder for MockAttributesBuilder { + /// Prepares the [PayloadAttributes] for the next payload. + fn prepare_payload_attributes( + &mut self, + _l2_parent: L2BlockInfo, + _epoch: BlockID, + ) -> anyhow::Result { + self.attributes.pop().ok_or(anyhow::anyhow!("missing payload attribute"))? + } +} diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs new file mode 100644 index 000000000..06afd09da --- /dev/null +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -0,0 +1,44 @@ +//! A mock implementation of the [`BatchQueue`] stage for testing. + +use alloc::{boxed::Box, vec::Vec}; +use async_trait::async_trait; + +use crate::{ + stages::attributes_queue::AttributesProvider, + traits::OriginProvider, + types::{BlockInfo, L2BlockInfo, SingleBatch, StageError, StageResult}, +}; + +/// A mock implementation of the [`BatchQueue`] stage for testing. +#[derive(Debug, Default)] +pub struct MockBatchQueue { + /// The origin of the L1 block. + origin: Option, + /// A list of batches to return. + batches: Vec>, +} + +impl OriginProvider for MockBatchQueue { + fn origin(&self) -> Option<&BlockInfo> { + self.origin.as_ref() + } +} + +#[async_trait] +impl AttributesProvider for MockBatchQueue { + async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { + self.batches.pop().ok_or(StageError::Eof)? + } + + fn is_last_in_span(&self) -> bool { + self.batches.is_empty() + } +} + +/// Creates a new [`MockBatchQueue`] with the given origin and batches. +pub fn new_mock_batch_queue( + origin: Option, + batches: Vec>, +) -> MockBatchQueue { + MockBatchQueue { origin, batches } +} diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs new file mode 100644 index 000000000..b05611b71 --- /dev/null +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -0,0 +1,8 @@ +//! Test utilities for the stages module primarily contains +//! mock implementations of the various stages for testing. + +mod batch_queue; +pub use batch_queue::{new_mock_batch_queue, MockBatchQueue}; + +mod attributes_queue; +pub use attributes_queue::MockAttributesBuilder; diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index 89ea08d5e..682fa6c8c 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -8,6 +8,12 @@ use anyhow::Result; use async_trait::async_trait; use core::fmt::Debug; +/// Provides a method for accessing the pipeline origin. +pub trait OriginProvider { + /// Returns the optional L1 [BlockInfo] origin. + fn origin(&self) -> Option<&BlockInfo>; +} + /// Describes the functionality of a data source that can provide information from the blockchain. #[async_trait] pub trait ChainProvider { diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index b2db574c5..6700afb02 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,7 @@ //! pipeline. mod data_sources; -pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter, SafeBlockFetcher}; +pub use data_sources::*; mod stages; pub use stages::ResettableStage; diff --git a/crates/derive/src/types/attributes.rs b/crates/derive/src/types/attributes.rs index a12fa9fe5..16aa7c507 100644 --- a/crates/derive/src/types/attributes.rs +++ b/crates/derive/src/types/attributes.rs @@ -9,7 +9,7 @@ use alloy_primitives::{Address, B256}; /// Payload attributes. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct PayloadAttributes { /// Value for the timestamp field of the new payload. #[cfg_attr(feature = "serde", serde(rename = "timestamp"))] diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index f1abf05fa..2b88cf86a 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -15,6 +15,8 @@ pub enum StageError { /// There is not enough data progress, but if we wait, the stage will eventually return data /// or produce an EOF error. NotEnoughData, + /// Failed to build the [super::PayloadAttributes] for the next batch. + AttributesBuild(anyhow::Error), /// Reset the pipeline. Reset(ResetError), /// The stage detected a block reorg. @@ -37,10 +39,14 @@ impl PartialEq for StageError { if let (StageError::ReorgDetected(a, b), StageError::ReorgDetected(c, d)) = (self, other) { return a == c && b == d; } + if let (StageError::Reset(a), StageError::Reset(b)) = (self, other) { + return a == b; + } matches!( (self, other), (StageError::Eof, StageError::Eof) | (StageError::NotEnoughData, StageError::NotEnoughData) | + (StageError::AttributesBuild(_), StageError::AttributesBuild(_)) | (StageError::ReceiptFetch(_), StageError::ReceiptFetch(_)) | (StageError::BlockInfoFetch(_), StageError::BlockInfoFetch(_)) | (StageError::SystemConfigUpdate(_), StageError::SystemConfigUpdate(_)) | @@ -60,6 +66,7 @@ impl Display for StageError { match self { StageError::Eof => write!(f, "End of file"), StageError::NotEnoughData => write!(f, "Not enough data"), + StageError::AttributesBuild(e) => write!(f, "Attributes build error: {}", e), StageError::Reset(e) => write!(f, "Reset error: {}", e), StageError::ReceiptFetch(e) => write!(f, "Receipt fetch error: {}", e), StageError::SystemConfigUpdate(e) => write!(f, "System config update error: {}", e), diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 80817630a..ca1dfdb83 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -55,7 +55,7 @@ pub use errors::*; /// A raw transaction #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct RawTransaction(pub Bytes); impl RawTransaction {