diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs new file mode 100644 index 000000000..4771a1b37 --- /dev/null +++ b/crates/derive/src/stages/attributes_queue.rs @@ -0,0 +1,328 @@ +//! Contains the logic for the `AttributesQueue` stage. + +use crate::{ + traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider}, + types::{ + AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError, + RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, + }, +}; +use alloc::boxed::Box; +use alloy_primitives::Bytes; +use async_trait::async_trait; +use core::fmt::Debug; + +pub trait AttributesBuilder { + /// Prepare the payload attributes. + fn prepare_payload_attributes( + &mut self, + l2_parent: L2BlockInfo, + epoch: BlockID, + ) -> anyhow::Result; +} + +/// [AttributesProvider] is a trait abstraction that generalizes the [BatchQueue] stage. +#[async_trait] +pub trait AttributesProvider { + /// Returns the next valid batch upon the given safe head. + async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult; + + /// Returns whether the current batch is the last in its span. + fn is_last_in_span(&self) -> bool; +} + +/// [AttributesQueue] accepts batches from the [BatchQueue] stage +/// and transforms them into [PayloadAttributes]. The outputted payload +/// attributes cannot be buffered because each batch->attributes transformation +/// pulls in data about the current L2 safe head. +/// +/// [AttributesQueue] also buffers batches that have been output because +/// multiple batches can be created at once. +/// +/// This stage can be reset by clearing its batch buffer. +/// This stage does not need to retain any references to L1 blocks. +#[derive(Debug)] +pub struct AttributesQueue +where + P: AttributesProvider + OriginProvider + Debug, + T: TelemetryProvider + Debug, + AB: AttributesBuilder + Debug, +{ + /// The rollup config. + cfg: RollupConfig, + /// The previous stage of the derivation pipeline. + prev: P, + /// Telemetry provider. + telemetry: T, + /// Whether the current batch is the last in its span. + is_last_in_span: bool, + /// The current batch being processed. + batch: Option, + /// The attributes builder. + builder: AB, +} + +impl AttributesQueue +where + P: AttributesProvider + OriginProvider + Debug, + T: TelemetryProvider + Debug, + AB: AttributesBuilder + Debug, +{ + /// Create a new [AttributesQueue] stage. + pub fn new(cfg: RollupConfig, prev: P, telemetry: T, builder: AB) -> Self { + Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder } + } + + /// Returns the L1 origin [BlockInfo]. + pub fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } + + /// Loads a [SingleBatch] from the [AttributesProvider] if needed. + pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult { + if self.batch.is_none() { + let batch = self.prev.next_batch(parent).await?; + self.batch = Some(batch); + self.is_last_in_span = self.prev.is_last_in_span(); + } + self.batch.as_ref().cloned().ok_or(StageError::Eof) + } + + /// Returns the next [AttributesWithParent] from the current batch. + pub async fn next_attributes( + &mut self, + parent: L2BlockInfo, + ) -> StageResult { + // Load the batch. + let batch = self.load_batch(parent).await?; + + // Construct the payload attributes from the loaded batch. + let attributes = self.create_next_attributes(batch, parent).await?; + let populated_attributes = + AttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span }; + + // Clear out the local state once payload attributes are prepared. + self.batch = None; + self.is_last_in_span = false; + Ok(populated_attributes) + } + + /// Creates the next attributes, transforming a [SingleBatch] into [PayloadAttributes]. + /// This sets `no_tx_pool` and appends the batched txs to the attributes tx list. + pub async fn create_next_attributes( + &mut self, + batch: SingleBatch, + parent: L2BlockInfo, + ) -> StageResult { + // Sanity check parent hash + if batch.parent_hash != parent.block_info.hash { + return Err(StageError::Reset(ResetError::BadParentHash( + batch.parent_hash, + parent.block_info.hash, + ))); + } + + // Sanity check timestamp + let actual = parent.block_info.timestamp + self.cfg.block_time; + if actual != batch.timestamp { + return Err(StageError::Reset(ResetError::BadTimestamp(batch.timestamp, actual))); + } + + // Prepare the payload attributes + let tx_count = batch.transactions.len(); + let mut attributes = self + .builder + .prepare_payload_attributes(parent, batch.epoch()) + .map_err(StageError::AttributesBuild)?; + attributes.no_tx_pool = true; + attributes.transactions.extend(batch.transactions); + + self.telemetry.write( + Bytes::from(alloc::format!( + "generated attributes in payload queue: txs={}, timestamp={}", + tx_count, + batch.timestamp, + )), + LogLevel::Info, + ); + + Ok(attributes) + } +} + +#[async_trait] +impl ResettableStage for AttributesQueue +where + P: AttributesProvider + OriginProvider + Send + Debug, + T: TelemetryProvider + Send + Debug, + AB: AttributesBuilder + Send + Debug, +{ + async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { + self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info); + // TODO: metrice the reset using telemetry + // telemetry can provide a method of logging and metricing + self.batch = None; + self.is_last_in_span = false; + Err(StageError::Eof) + } +} + +#[cfg(test)] +mod tests { + use super::{ + AttributesQueue, AttributesWithParent, BlockInfo, L2BlockInfo, PayloadAttributes, + RollupConfig, SingleBatch, StageError, StageResult, + }; + use crate::{ + stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue}, + traits::test_utils::TestTelemetry, + types::RawTransaction, + }; + use alloc::{vec, vec::Vec}; + use alloy_primitives::b256; + + fn new_attributes_queue( + cfg: Option, + origin: Option, + batches: Vec>, + ) -> AttributesQueue { + let cfg = cfg.unwrap_or_default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(origin, batches); + let mock_attributes_builder = MockAttributesBuilder::default(); + AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder) + } + + #[tokio::test] + async fn test_load_batch_eof() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.load_batch(parent).await.unwrap_err(); + assert_eq!(result, StageError::Eof); + } + + #[tokio::test] + async fn test_load_batch_last_in_span() { + let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.load_batch(parent).await.unwrap(); + assert_eq!(result, Default::default()); + assert!(attributes_queue.is_last_in_span); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_parent_hash() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666"); + let parent = L2BlockInfo { + block_info: BlockInfo { hash: bad_hash, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch::default(); + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!( + result, + StageError::Reset(super::ResetError::BadParentHash(Default::default(), bad_hash)) + ); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_timestamp() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 0))); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_parent_timestamp() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo { + block_info: BlockInfo { timestamp: 2, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2))); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_config_timestamp() { + let cfg = RollupConfig { block_time: 1, ..Default::default() }; + let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]); + let parent = L2BlockInfo { + block_info: BlockInfo { timestamp: 1, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2))); + } + + #[tokio::test] + async fn test_create_next_attributes_preparation_fails() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let batch = SingleBatch::default(); + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!( + result, + StageError::AttributesBuild(anyhow::anyhow!("missing payload attribute")) + ); + } + + #[tokio::test] + async fn test_create_next_attributes_success() { + let cfg = RollupConfig::default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(None, vec![]); + let mut payload_attributes = PayloadAttributes::default(); + let mock_builder = + MockAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; + let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + let parent = L2BlockInfo::default(); + let txs = vec![RawTransaction::default(), RawTransaction::default()]; + let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; + let attributes = aq.create_next_attributes(batch, parent).await.unwrap(); + // update the expected attributes + payload_attributes.no_tx_pool = true; + payload_attributes.transactions.extend(txs); + assert_eq!(attributes, payload_attributes); + } + + #[tokio::test] + async fn test_next_attributes_load_batch_eof() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.next_attributes(parent).await.unwrap_err(); + assert_eq!(result, StageError::Eof); + } + + #[tokio::test] + async fn test_next_attributes_load_batch_last_in_span() { + let cfg = RollupConfig::default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(None, vec![Ok(Default::default())]); + let mut pa = PayloadAttributes::default(); + let mock_builder = MockAttributesBuilder { attributes: vec![Ok(pa.clone())] }; + let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + // If we load the batch, we should get the last in span. + // But it won't take it so it will be available in the next_attributes call. + let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); + assert!(aq.is_last_in_span); + assert!(aq.batch.is_some()); + // This should successfully construct the next payload attributes. + // It should also reset the last in span flag and clear the batch. + let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap(); + pa.no_tx_pool = true; + let populated_attributes = AttributesWithParent { + attributes: pa, + parent: L2BlockInfo::default(), + is_last_in_span: true, + }; + assert_eq!(attributes, populated_attributes); + assert!(!aq.is_last_in_span); + assert!(aq.batch.is_none()); + } +} diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index d7fc4498b..06a769d2f 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -88,6 +88,11 @@ where self.prev.origin() } + /// Returns if the previous batch was the last in the span. + pub fn is_last_in_span(&self) -> bool { + self.next_spans.is_empty() + } + /// Pops the next batch from the current queued up span-batch cache. /// The parent is used to set the parent hash of the batch. /// The parent is verified when the batch is later validated. diff --git a/crates/derive/src/stages/engine_queue.rs b/crates/derive/src/stages/engine_queue.rs deleted file mode 100644 index 8b1378917..000000000 --- a/crates/derive/src/stages/engine_queue.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 1717c306c..d998b075f 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -3,6 +3,7 @@ //! the produced execution payloads. //! //! **Stages:** +//! //! 1. L1 Traversal //! 2. L1 Retrieval //! 3. Frame Queue @@ -10,7 +11,7 @@ //! 5. Channel Reader (Batch Decoding) //! 6. Batch Queue //! 7. Payload Attributes Derivation -//! 8. Engine Queue +//! 8. (Omitted) Engine Queue mod l1_traversal; pub use l1_traversal::L1Traversal; @@ -30,5 +31,8 @@ pub use channel_reader::ChannelReader; mod batch_queue; pub use batch_queue::BatchQueue; -mod engine_queue; -mod payload_derivation; +mod attributes_queue; +pub use attributes_queue::AttributesQueue; + +#[cfg(test)] +pub mod test_utils; diff --git a/crates/derive/src/stages/payload_derivation.rs b/crates/derive/src/stages/payload_derivation.rs deleted file mode 100644 index 8b1378917..000000000 --- a/crates/derive/src/stages/payload_derivation.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs new file mode 100644 index 000000000..cb36f3838 --- /dev/null +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -0,0 +1,25 @@ +//! Testing utilities for the attributes queue stage. + +use crate::{ + stages::attributes_queue::AttributesBuilder, + types::{BlockID, L2BlockInfo, PayloadAttributes}, +}; +use alloc::vec::Vec; + +/// A mock implementation of the [`AttributesBuilder`] for testing. +#[derive(Debug, Default)] +pub struct MockAttributesBuilder { + /// The attributes to return. + pub attributes: Vec>, +} + +impl AttributesBuilder for MockAttributesBuilder { + /// Prepares the [PayloadAttributes] for the next payload. + fn prepare_payload_attributes( + &mut self, + _l2_parent: L2BlockInfo, + _epoch: BlockID, + ) -> anyhow::Result { + self.attributes.pop().ok_or(anyhow::anyhow!("missing payload attribute"))? + } +} diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs new file mode 100644 index 000000000..06afd09da --- /dev/null +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -0,0 +1,44 @@ +//! A mock implementation of the [`BatchQueue`] stage for testing. + +use alloc::{boxed::Box, vec::Vec}; +use async_trait::async_trait; + +use crate::{ + stages::attributes_queue::AttributesProvider, + traits::OriginProvider, + types::{BlockInfo, L2BlockInfo, SingleBatch, StageError, StageResult}, +}; + +/// A mock implementation of the [`BatchQueue`] stage for testing. +#[derive(Debug, Default)] +pub struct MockBatchQueue { + /// The origin of the L1 block. + origin: Option, + /// A list of batches to return. + batches: Vec>, +} + +impl OriginProvider for MockBatchQueue { + fn origin(&self) -> Option<&BlockInfo> { + self.origin.as_ref() + } +} + +#[async_trait] +impl AttributesProvider for MockBatchQueue { + async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { + self.batches.pop().ok_or(StageError::Eof)? + } + + fn is_last_in_span(&self) -> bool { + self.batches.is_empty() + } +} + +/// Creates a new [`MockBatchQueue`] with the given origin and batches. +pub fn new_mock_batch_queue( + origin: Option, + batches: Vec>, +) -> MockBatchQueue { + MockBatchQueue { origin, batches } +} diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs new file mode 100644 index 000000000..b05611b71 --- /dev/null +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -0,0 +1,8 @@ +//! Test utilities for the stages module primarily contains +//! mock implementations of the various stages for testing. + +mod batch_queue; +pub use batch_queue::{new_mock_batch_queue, MockBatchQueue}; + +mod attributes_queue; +pub use attributes_queue::MockAttributesBuilder; diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index 89ea08d5e..682fa6c8c 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -8,6 +8,12 @@ use anyhow::Result; use async_trait::async_trait; use core::fmt::Debug; +/// Provides a method for accessing the pipeline origin. +pub trait OriginProvider { + /// Returns the optional L1 [BlockInfo] origin. + fn origin(&self) -> Option<&BlockInfo>; +} + /// Describes the functionality of a data source that can provide information from the blockchain. #[async_trait] pub trait ChainProvider { diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index b2db574c5..6700afb02 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,7 @@ //! pipeline. mod data_sources; -pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter, SafeBlockFetcher}; +pub use data_sources::*; mod stages; pub use stages::ResettableStage; diff --git a/crates/derive/src/types/attributes.rs b/crates/derive/src/types/attributes.rs new file mode 100644 index 000000000..16aa7c507 --- /dev/null +++ b/crates/derive/src/types/attributes.rs @@ -0,0 +1,74 @@ +//! Contains Payload Attribute Types. + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +use super::{block::L2BlockInfo, payload::Withdrawals, RawTransaction}; +use alloc::vec::Vec; +use alloy_primitives::{Address, B256}; + +/// Payload attributes. +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct PayloadAttributes { + /// Value for the timestamp field of the new payload. + #[cfg_attr(feature = "serde", serde(rename = "timestamp"))] + pub timestamp: u64, + /// Value for the random field of the new payload. + #[cfg_attr(feature = "serde", serde(rename = "prevRandao"))] + pub prev_randao: B256, + /// Suggested value for the coinbase field of the new payload. + #[cfg_attr(feature = "serde", serde(rename = "suggestedFeeRecipient"))] + pub fee_recipient: Address, + /// Withdrawals to include into the block -- should be nil or empty depending on Shanghai + /// enablement. + #[cfg_attr(feature = "serde", serde(rename = "withdrawals"))] + pub withdrawals: Option, + /// Parent beacon block root optional extension in Dencun. + #[cfg_attr(feature = "serde", serde(rename = "parentBeaconBlockRoot"))] + pub parent_beacon_block_root: Option, + + // Optimism additions. + /// Transactions to force into the block (always at the start of the transactions list). + #[cfg_attr(feature = "serde", serde(rename = "transactions"))] + pub transactions: Vec, + /// NoTxPool to disable adding any transactions from the transaction-pool. + #[cfg_attr(feature = "serde", serde(rename = "noTxPool"))] + pub no_tx_pool: bool, + /// GasLimit override. + #[cfg_attr(feature = "serde", serde(rename = "gasLimit"))] + pub gas_limit: Option, +} + +/// Payload Attributes with parent block reference. +#[derive(Debug, Clone, PartialEq)] +pub struct AttributesWithParent { + /// The payload attributes. + pub attributes: PayloadAttributes, + /// The parent block reference. + pub parent: L2BlockInfo, + /// Whether the current batch is the last in its span. + pub is_last_in_span: bool, +} + +impl AttributesWithParent { + /// Create a new [AttributesWithParent] instance. + pub fn new(attributes: PayloadAttributes, parent: L2BlockInfo, is_last_in_span: bool) -> Self { + Self { attributes, parent, is_last_in_span } + } + + /// Returns the payload attributes. + pub fn attributes(&self) -> &PayloadAttributes { + &self.attributes + } + + /// Returns the parent block reference. + pub fn parent(&self) -> &L2BlockInfo { + &self.parent + } + + /// Returns whether the current batch is the last in its span. + pub fn is_last_in_span(&self) -> bool { + self.is_last_in_span + } +} diff --git a/crates/derive/src/types/batch/single_batch.rs b/crates/derive/src/types/batch/single_batch.rs index 273c0a38a..89d37351d 100644 --- a/crates/derive/src/types/batch/single_batch.rs +++ b/crates/derive/src/types/batch/single_batch.rs @@ -1,7 +1,7 @@ //! This module contains the [SingleBatch] type. use super::validity::BatchValidity; -use crate::types::{BlockInfo, L2BlockInfo, RawTransaction, RollupConfig}; +use crate::types::{BlockID, BlockInfo, L2BlockInfo, RawTransaction, RollupConfig}; use alloc::vec::Vec; use alloy_primitives::BlockHash; use alloy_rlp::{Decodable, Encodable}; @@ -28,6 +28,11 @@ impl SingleBatch { self.transactions.iter().any(|tx| tx.0.is_empty() || tx.0[0] == 0x7E) } + /// Returns the [BlockID] of the batch. + pub fn epoch(&self) -> BlockID { + BlockID { number: self.epoch_num, hash: self.epoch_hash } + } + /// Checks if the batch is valid. pub fn check_batch( &self, diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 3184c5f9e..2b88cf86a 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -1,9 +1,11 @@ //! This module contains derivation errors thrown within the pipeline. +use super::SpanBatchError; use alloy_primitives::B256; use core::fmt::Display; -use super::SpanBatchError; +/// A result type for the derivation pipeline stages. +pub type StageResult = Result; /// An error that is thrown within the stages of the derivation pipeline. #[derive(Debug)] @@ -13,6 +15,10 @@ pub enum StageError { /// There is not enough data progress, but if we wait, the stage will eventually return data /// or produce an EOF error. NotEnoughData, + /// Failed to build the [super::PayloadAttributes] for the next batch. + AttributesBuild(anyhow::Error), + /// Reset the pipeline. + Reset(ResetError), /// The stage detected a block reorg. /// The first argument is the expected block hash. /// The second argument is the paren_hash of the next l1 origin block. @@ -33,10 +39,14 @@ impl PartialEq for StageError { if let (StageError::ReorgDetected(a, b), StageError::ReorgDetected(c, d)) = (self, other) { return a == c && b == d; } + if let (StageError::Reset(a), StageError::Reset(b)) = (self, other) { + return a == b; + } matches!( (self, other), (StageError::Eof, StageError::Eof) | (StageError::NotEnoughData, StageError::NotEnoughData) | + (StageError::AttributesBuild(_), StageError::AttributesBuild(_)) | (StageError::ReceiptFetch(_), StageError::ReceiptFetch(_)) | (StageError::BlockInfoFetch(_), StageError::BlockInfoFetch(_)) | (StageError::SystemConfigUpdate(_), StageError::SystemConfigUpdate(_)) | @@ -45,9 +55,6 @@ impl PartialEq for StageError { } } -/// A result type for the derivation pipeline stages. -pub type StageResult = Result; - impl From for StageError { fn from(e: anyhow::Error) -> Self { StageError::Custom(e) @@ -59,6 +66,8 @@ impl Display for StageError { match self { StageError::Eof => write!(f, "End of file"), StageError::NotEnoughData => write!(f, "Not enough data"), + StageError::AttributesBuild(e) => write!(f, "Attributes build error: {}", e), + StageError::Reset(e) => write!(f, "Reset error: {}", e), StageError::ReceiptFetch(e) => write!(f, "Receipt fetch error: {}", e), StageError::SystemConfigUpdate(e) => write!(f, "System config update error: {}", e), StageError::ReorgDetected(current, next) => { @@ -70,6 +79,46 @@ impl Display for StageError { } } +/// A reset error +#[derive(Debug)] +pub enum ResetError { + /// The batch has a bad parent hash. + /// The first argument is the expected parent hash, and the second argument is the actual + /// parent hash. + BadParentHash(B256, B256), + /// The batch has a bad timestamp. + /// The first argument is the expected timestamp, and the second argument is the actual + /// timestamp. + BadTimestamp(u64, u64), +} + +impl PartialEq for ResetError { + fn eq(&self, other: &ResetError) -> bool { + match (self, other) { + (ResetError::BadParentHash(e1, a1), ResetError::BadParentHash(e2, a2)) => { + e1 == e2 && a1 == a2 + } + (ResetError::BadTimestamp(e1, a1), ResetError::BadTimestamp(e2, a2)) => { + e1 == e2 && a1 == a2 + } + _ => false, + } + } +} + +impl Display for ResetError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + ResetError::BadParentHash(expected, actual) => { + write!(f, "Bad parent hash: expected {}, got {}", expected, actual) + } + ResetError::BadTimestamp(expected, actual) => { + write!(f, "Bad timestamp: expected {}, got {}", expected, actual) + } + } + } +} + /// A decoding error. #[derive(Debug)] pub enum DecodeError { diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 138fe1c19..ca1dfdb83 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -1,9 +1,15 @@ //! This module contains all of the types used within the derivation pipeline. +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + use alloc::vec::Vec; use alloy_primitives::Bytes; use alloy_rlp::{Decodable, Encodable}; +mod attributes; +pub use attributes::{AttributesWithParent, PayloadAttributes}; + mod system_config; pub use system_config::{SystemAccounts, SystemConfig, SystemConfigUpdateType}; @@ -28,7 +34,9 @@ pub use alloy::{ }; mod payload; -pub use payload::{ExecutionPayload, ExecutionPayloadEnvelope}; +pub use payload::{ + ExecutionPayload, ExecutionPayloadEnvelope, PAYLOAD_MEM_FIXED_COST, PAYLOAD_TX_MEM_OVERHEAD, +}; mod block; pub use block::{BlockID, BlockInfo, BlockKind, L2BlockInfo}; @@ -43,10 +51,11 @@ mod channel; pub use channel::Channel; mod errors; -pub use errors::{DecodeError, StageError, StageResult}; +pub use errors::*; /// A raw transaction -#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct RawTransaction(pub Bytes); impl RawTransaction { diff --git a/crates/derive/src/types/payload.rs b/crates/derive/src/types/payload.rs index b06d3b925..38a0fa57a 100644 --- a/crates/derive/src/types/payload.rs +++ b/crates/derive/src/types/payload.rs @@ -3,6 +3,14 @@ use alloc::vec::Vec; use alloy_primitives::{Address, Bytes, B256, U256}; +/// Fixed and variable memory costs for a payload. +/// ~1000 bytes per payload, with some margin for overhead like map data. +pub const PAYLOAD_MEM_FIXED_COST: u64 = 1000; + +/// Memory overhead per payload transaction. +/// 24 bytes per tx overhead (size of slice header in memory). +pub const PAYLOAD_TX_MEM_OVERHEAD: u64 = 24; + #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -18,6 +26,17 @@ pub struct ExecutionPayloadEnvelope { execution_payload: ExecutionPayload, } +impl ExecutionPayloadEnvelope { + /// Returns the payload memory size. + pub fn mem_size(&self) -> u64 { + let mut out = PAYLOAD_MEM_FIXED_COST; + for tx in &self.execution_payload.transactions { + out += tx.len() as u64 + PAYLOAD_TX_MEM_OVERHEAD; + } + out + } +} + /// The execution payload. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone, PartialEq, Eq)] @@ -61,4 +80,4 @@ pub struct ExecutionPayload { /// Withdrawal Type #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct Withdrawals {} +pub struct Withdrawals {}