From 0e7bdb7796fa1a03f6a3ac385098227558892743 Mon Sep 17 00:00:00 2001 From: refcell Date: Sun, 31 Mar 2024 22:26:51 -0400 Subject: [PATCH 1/8] feat(derive): attributes queue stage --- crates/derive/src/stages/attributes_queue.rs | 165 ++++++++++++++++++ crates/derive/src/stages/batch_queue.rs | 5 + crates/derive/src/stages/mod.rs | 4 +- .../derive/src/stages/payload_derivation.rs | 1 - crates/derive/src/types/attributes.rs | 79 +++++++++ crates/derive/src/types/errors.rs | 48 ++++- crates/derive/src/types/mod.rs | 13 +- crates/derive/src/types/payload.rs | 21 ++- crates/derive/src/types/single_batch.rs | 10 +- 9 files changed, 336 insertions(+), 10 deletions(-) create mode 100644 crates/derive/src/stages/attributes_queue.rs delete mode 100644 crates/derive/src/stages/payload_derivation.rs create mode 100644 crates/derive/src/types/attributes.rs diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs new file mode 100644 index 000000000..339a86119 --- /dev/null +++ b/crates/derive/src/stages/attributes_queue.rs @@ -0,0 +1,165 @@ +//! Contains the logic for the `AttributesQueue` stage. + +use crate::stages::batch_queue::BatchQueue; +use crate::traits::{ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher}; +use crate::types::{AttributesWithParent, PayloadAttributes, SingleBatch}; +use crate::types::{BlockID, BlockInfo, L2BlockRef}; +use crate::types::{ResetError, StageError, StageResult}; +use crate::types::{RollupConfig, SystemConfig}; +use alloc::boxed::Box; +use async_trait::async_trait; +use core::fmt::Debug; + +pub trait AttributesBuilder { + /// Prepare the payload attributes. + fn prepare_payload_attributes( + &self, + l2_parent: L2BlockRef, + epoch: BlockID, + ) -> anyhow::Result; +} + +/// [AttributesQueue] accepts batches from the [super::BatchQueue] stage +/// and transforms them into [PayloadAttributes]. The outputted payload +/// attributes cannot be buffered because each batch->attributes transformation +/// pulls in data about the current L2 safe head. +/// +/// [AttributesQueue] also buffers batches that have been output because +/// multiple batches can be created at once. +/// +/// This stage can be reset by clearing its batch buffer. +/// This stage does not need to retain any references to L1 blocks. +#[derive(Debug)] +pub struct AttributesQueue +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, + BF: SafeBlockFetcher + Debug, + AB: AttributesBuilder + Debug, +{ + /// The rollup config. + cfg: RollupConfig, + /// The previous stage of the derivation pipeline. + prev: BatchQueue, + /// Whether the current batch is the last in its span. + is_last_in_span: bool, + /// The current batch being processed. + batch: Option, + /// The attributes builder. + builder: AB, +} + +impl AttributesQueue +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, + BF: SafeBlockFetcher + Debug, + AB: AttributesBuilder + Debug, +{ + /// Create a new [AttributesQueue] stage. + pub fn new(cfg: RollupConfig, prev: BatchQueue, builder: AB) -> Self { + Self { + cfg, + prev, + is_last_in_span: false, + batch: None, + builder, + } + } + + /// Returns the L1 origin [BlockInfo]. + pub fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } + + /// Loads a batch from the previous stage if needed. + pub async fn load_batch(&mut self, parent: L2BlockRef) -> StageResult { + if self.batch.is_none() { + let batch = self.prev.next_batch(parent).await?; + self.batch = Some(batch); + self.is_last_in_span = self.prev.is_last_in_span(); + } + self.batch.as_ref().cloned().ok_or(StageError::Eof) + } + + /// Returns the next payload attributes from the current batch. + pub async fn next_attributes( + &mut self, + parent: L2BlockRef, + ) -> StageResult { + // Load the batch + let batch = self.load_batch(parent).await?; + + // Construct the payload attributes from the loaded batch + let attributes = self.create_next_attributes(batch, parent).await?; + let populated_attributes = AttributesWithParent { + attributes, + parent, + is_last_in_span: self.is_last_in_span, + }; + + // Clear out the local state once we will succeed + self.batch = None; + self.is_last_in_span = false; + Ok(populated_attributes) + } + + /// Creates the next attributes. + /// Transforms a [SingleBatch] into [PayloadAttributes]. + /// This sets `NoTxPool` and appends the batched transactions to the attributes transaction list. + pub async fn create_next_attributes( + &mut self, + batch: SingleBatch, + parent: L2BlockRef, + ) -> StageResult { + // Sanity check parent hash + if batch.parent_hash != parent.info.hash { + return Err(StageError::Reset(ResetError::BadParentHash( + batch.parent_hash, + parent.info.hash, + ))); + } + + // Sanity check timestamp + let actual = parent.info.timestamp + self.cfg.block_time; + if actual != batch.timestamp { + return Err(StageError::Reset(ResetError::BadTimestamp( + batch.timestamp, + actual, + ))); + } + + // Prepare the payload attributes + let mut attributes = self + .builder + .prepare_payload_attributes(parent, batch.epoch())?; + attributes.no_tx_pool = true; + attributes.transactions.extend(batch.transactions); + + // TODO: log + // log::info!( + // "generated attributes in payload queue", + // "txs" => tx_count, + // "timestamp" => batch.timestamp, + // ); + + Ok(attributes) + } +} + +#[async_trait] +impl ResettableStage for AttributesQueue +where + DAP: DataAvailabilityProvider + Send + Debug, + CP: ChainProvider + Send + Debug, + BF: SafeBlockFetcher + Send + Debug, + AB: AttributesBuilder + Send + Debug, +{ + async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { + // TODO: log the reset + // TODO: metrice the reset + self.batch = None; + self.is_last_in_span = false; + Err(StageError::Eof) + } +} diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index b0312344d..e28501664 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -82,6 +82,11 @@ where self.prev.origin() } + /// Returns if the previous batch was the last in the span. + pub fn is_last_in_span(&self) -> bool { + self.next_spans.is_empty() + } + /// Pops the next batch from the current queued up span-batch cache. /// The parent is used to set the parent hash of the batch. /// The parent is verified when the batch is later validated. diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 2c69cd2c1..01113de70 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -30,5 +30,5 @@ pub use channel_reader::ChannelReader; mod batch_queue; pub use batch_queue::BatchQueue; -mod engine_queue; -mod payload_derivation; +mod attributes_queue; +pub use attributes_queue::AttributesQueue; diff --git a/crates/derive/src/stages/payload_derivation.rs b/crates/derive/src/stages/payload_derivation.rs deleted file mode 100644 index 8b1378917..000000000 --- a/crates/derive/src/stages/payload_derivation.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/derive/src/types/attributes.rs b/crates/derive/src/types/attributes.rs new file mode 100644 index 000000000..19c08daee --- /dev/null +++ b/crates/derive/src/types/attributes.rs @@ -0,0 +1,79 @@ +//! Contains Payload Attribute Types. + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +use super::block::L2BlockRef; +use super::payload::Withdrawals; +use super::RawTransaction; +use alloc::vec::Vec; +use alloy_primitives::{Address, B256}; + +/// Payload attributes. +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PayloadAttributes { + /// Value for the timestamp field of the new payload. + #[cfg_attr(feature = "serde", serde(rename = "timestamp"))] + pub timestamp: u64, + /// Value for the random field of the new payload. + #[cfg_attr(feature = "serde", serde(rename = "prevRandao"))] + pub prev_randao: B256, + /// Suggested value for the coinbase field of the new payload. + #[cfg_attr(feature = "serde", serde(rename = "suggestedFeeRecipient"))] + pub fee_recipient: Address, + /// Withdrawals to include into the block -- should be nil or empty depending on Shanghai enablement. + #[cfg_attr(feature = "serde", serde(rename = "withdrawals"))] + pub withdrawals: Option, + /// Parent beacon block root optional extension in Dencun. + #[cfg_attr(feature = "serde", serde(rename = "parentBeaconBlockRoot"))] + pub parent_beacon_block_root: Option, + + // Optimism additions. + /// Transactions to force into the block (always at the start of the transactions list). + #[cfg_attr(feature = "serde", serde(rename = "transactions"))] + pub transactions: Vec, + /// NoTxPool to disable adding any transactions from the transaction-pool. + #[cfg_attr(feature = "serde", serde(rename = "noTxPool"))] + pub no_tx_pool: bool, + /// GasLimit override. + #[cfg_attr(feature = "serde", serde(rename = "gasLimit"))] + pub gas_limit: Option, +} + +/// Payload Attributes with parent block reference. +#[derive(Debug, Clone, PartialEq)] +pub struct AttributesWithParent { + /// The payload attributes. + pub attributes: PayloadAttributes, + /// The parent block reference. + pub parent: L2BlockRef, + /// Whether the current batch is the last in its span. + pub is_last_in_span: bool, +} + +impl AttributesWithParent { + /// Create a new [AttributesWithParent] instance. + pub fn new(attributes: PayloadAttributes, parent: L2BlockRef, is_last_in_span: bool) -> Self { + Self { + attributes, + parent, + is_last_in_span, + } + } + + /// Returns the payload attributes. + pub fn attributes(&self) -> &PayloadAttributes { + &self.attributes + } + + /// Returns the parent block reference. + pub fn parent(&self) -> &L2BlockRef { + &self.parent + } + + /// Returns whether the current batch is the last in its span. + pub fn is_last_in_span(&self) -> bool { + self.is_last_in_span + } +} diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 98d3220b8..355082686 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -1,7 +1,11 @@ //! This module contains derivation errors thrown within the pipeline. +use alloy_primitives::B256; use core::fmt::Display; +/// A result type for the derivation pipeline stages. +pub type StageResult = Result; + /// An error that is thrown within the stages of the derivation pipeline. #[derive(Debug)] pub enum StageError { @@ -10,6 +14,8 @@ pub enum StageError { /// There is not enough data progress, but if we wait, the stage will eventually return data /// or produce an EOF error. NotEnoughData, + /// Reset the pipeline. + Reset(ResetError), /// Other wildcard error. Custom(anyhow::Error), } @@ -25,9 +31,6 @@ impl PartialEq for StageError { } } -/// A result type for the derivation pipeline stages. -pub type StageResult = Result; - impl From for StageError { fn from(e: anyhow::Error) -> Self { StageError::Custom(e) @@ -39,7 +42,46 @@ impl Display for StageError { match self { StageError::Eof => write!(f, "End of file"), StageError::NotEnoughData => write!(f, "Not enough data"), + StageError::Reset(e) => write!(f, "Reset error: {}", e), StageError::Custom(e) => write!(f, "Custom error: {}", e), } } } + +/// A reset error +#[derive(Debug)] +pub enum ResetError { + /// The batch has a bad parent hash. + /// The first argument is the expected parent hash, and the second argument is the actual parent hash. + BadParentHash(B256, B256), + /// The batch has a bad timestamp. + /// The first argument is the expected timestamp, and the second argument is the actual timestamp. + BadTimestamp(u64, u64), +} + +impl PartialEq for ResetError { + fn eq(&self, other: &ResetError) -> bool { + match (self, other) { + (ResetError::BadParentHash(e1, a1), ResetError::BadParentHash(e2, a2)) => { + e1 == e2 && a1 == a2 + } + (ResetError::BadTimestamp(e1, a1), ResetError::BadTimestamp(e2, a2)) => { + e1 == e2 && a1 == a2 + } + _ => false, + } + } +} + +impl Display for ResetError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + ResetError::BadParentHash(expected, actual) => { + write!(f, "Bad parent hash: expected {}, got {}", expected, actual) + } + ResetError::BadTimestamp(expected, actual) => { + write!(f, "Bad timestamp: expected {}, got {}", expected, actual) + } + } + } +} diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index db5060611..8c68450db 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -1,8 +1,14 @@ //! This module contains all of the types used within the derivation pipeline. +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + use alloc::vec::Vec; use alloy_rlp::{Decodable, Encodable}; +mod attributes; +pub use attributes::{AttributesWithParent, PayloadAttributes}; + mod batch; pub use batch::{Batch, BatchWithInclusionBlock, SpanBatch}; @@ -37,7 +43,9 @@ mod receipt; pub use receipt::{Receipt, ReceiptWithBloom}; mod payload; -pub use payload::{ExecutionPayload, ExecutionPayloadEnvelope}; +pub use payload::{ + ExecutionPayload, ExecutionPayloadEnvelope, PAYLOAD_MEM_FIXED_COST, PAYLOAD_TX_MEM_OVERHEAD, +}; mod eips; pub use eips::{ @@ -55,12 +63,13 @@ mod channel; pub use channel::Channel; mod errors; -pub use errors::{StageError, StageResult}; +pub use errors::{ResetError, StageError, StageResult}; mod single_batch; pub use single_batch::SingleBatch; /// A raw transaction +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone, PartialEq, Eq)] pub struct RawTransaction(pub Vec); diff --git a/crates/derive/src/types/payload.rs b/crates/derive/src/types/payload.rs index b06d3b925..38a0fa57a 100644 --- a/crates/derive/src/types/payload.rs +++ b/crates/derive/src/types/payload.rs @@ -3,6 +3,14 @@ use alloc::vec::Vec; use alloy_primitives::{Address, Bytes, B256, U256}; +/// Fixed and variable memory costs for a payload. +/// ~1000 bytes per payload, with some margin for overhead like map data. +pub const PAYLOAD_MEM_FIXED_COST: u64 = 1000; + +/// Memory overhead per payload transaction. +/// 24 bytes per tx overhead (size of slice header in memory). +pub const PAYLOAD_TX_MEM_OVERHEAD: u64 = 24; + #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -18,6 +26,17 @@ pub struct ExecutionPayloadEnvelope { execution_payload: ExecutionPayload, } +impl ExecutionPayloadEnvelope { + /// Returns the payload memory size. + pub fn mem_size(&self) -> u64 { + let mut out = PAYLOAD_MEM_FIXED_COST; + for tx in &self.execution_payload.transactions { + out += tx.len() as u64 + PAYLOAD_TX_MEM_OVERHEAD; + } + out + } +} + /// The execution payload. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone, PartialEq, Eq)] @@ -61,4 +80,4 @@ pub struct ExecutionPayload { /// Withdrawal Type #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct Withdrawals {} +pub struct Withdrawals {} diff --git a/crates/derive/src/types/single_batch.rs b/crates/derive/src/types/single_batch.rs index 6c914b734..65d740a0a 100644 --- a/crates/derive/src/types/single_batch.rs +++ b/crates/derive/src/types/single_batch.rs @@ -5,7 +5,7 @@ use alloy_primitives::BlockHash; use alloy_rlp::{Decodable, Encodable}; use super::batch_validity::BatchValidity; -use super::block::{BlockInfo, L2BlockRef}; +use super::block::{BlockID, BlockInfo, L2BlockRef}; use super::rollup_config::RollupConfig; use super::RawTransaction; @@ -60,6 +60,14 @@ impl SingleBatch { .any(|tx| tx.0.is_empty() || tx.0[0] == 0x7E) } + /// Returns the [BlockID] of the batch. + pub fn epoch(&self) -> BlockID { + BlockID { + number: self.epoch_num, + hash: self.epoch_hash, + } + } + /// Checks if the batch is valid. pub fn check_batch( &self, From 61f0cbb49d09eaa22a1c8f30cb42b866a3b1da0b Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 3 Apr 2024 14:56:52 -0400 Subject: [PATCH 2/8] fix(derive): omit the engine queue stage --- crates/derive/src/stages/engine_queue.rs | 1 - crates/derive/src/stages/mod.rs | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) delete mode 100644 crates/derive/src/stages/engine_queue.rs diff --git a/crates/derive/src/stages/engine_queue.rs b/crates/derive/src/stages/engine_queue.rs deleted file mode 100644 index 8b1378917..000000000 --- a/crates/derive/src/stages/engine_queue.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index a54328c99..aef5fa27d 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -3,6 +3,7 @@ //! the produced execution payloads. //! //! **Stages:** +//! //! 1. L1 Traversal //! 2. L1 Retrieval //! 3. Frame Queue @@ -10,7 +11,7 @@ //! 5. Channel Reader (Batch Decoding) //! 6. Batch Queue //! 7. Payload Attributes Derivation -//! 8. Engine Queue +//! 8. (Omitted) Engine Queue mod l1_traversal; pub use l1_traversal::L1Traversal; From 85a7a19f272b97a9b8cf933ec39061af68da83cd Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 3 Apr 2024 22:15:30 -0400 Subject: [PATCH 3/8] fix(derive): attributes queue --- crates/derive/src/stages/attributes_queue.rs | 45 ++++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index f0067c399..4950fc8fd 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -2,13 +2,17 @@ use crate::{ stages::batch_queue::BatchQueue, - traits::{ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher}, + traits::{ + ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, SafeBlockFetcher, + TelemetryProvider, + }, types::{ AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, }, }; use alloc::boxed::Box; +use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; @@ -32,17 +36,20 @@ pub trait AttributesBuilder { /// This stage can be reset by clearing its batch buffer. /// This stage does not need to retain any references to L1 blocks. #[derive(Debug)] -pub struct AttributesQueue +pub struct AttributesQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. cfg: RollupConfig, /// The previous stage of the derivation pipeline. - prev: BatchQueue, + prev: BatchQueue, + /// Telemetry provider. + telemetry: T, /// Whether the current batch is the last in its span. is_last_in_span: bool, /// The current batch being processed. @@ -51,16 +58,22 @@ where builder: AB, } -impl AttributesQueue +impl AttributesQueue where DAP: DataAvailabilityProvider + Debug, CP: ChainProvider + Debug, BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub fn new(cfg: RollupConfig, prev: BatchQueue, builder: AB) -> Self { - Self { cfg, prev, is_last_in_span: false, batch: None, builder } + pub fn new( + cfg: RollupConfig, + prev: BatchQueue, + telemetry: T, + builder: AB, + ) -> Self { + Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder } } /// Returns the L1 origin [BlockInfo]. @@ -121,31 +134,35 @@ where } // Prepare the payload attributes + let tx_count = batch.transactions.len(); let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch())?; attributes.no_tx_pool = true; attributes.transactions.extend(batch.transactions); - // TODO: log - // log::info!( - // "generated attributes in payload queue", - // "txs" => tx_count, - // "timestamp" => batch.timestamp, - // ); + self.telemetry.write( + Bytes::from(alloc::format!( + "generated attributes in payload queue: txs={}, timestamp={}", + tx_count, + batch.timestamp, + )), + LogLevel::Info, + ); Ok(attributes) } } #[async_trait] -impl ResettableStage for AttributesQueue +impl ResettableStage for AttributesQueue where DAP: DataAvailabilityProvider + Send + Debug, CP: ChainProvider + Send + Debug, BF: SafeBlockFetcher + Send + Debug, + T: TelemetryProvider + Send + Debug, AB: AttributesBuilder + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { - // TODO: log the reset + self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info); // TODO: metrice the reset self.batch = None; self.is_last_in_span = false; From 82cc894e6b69d1c22cb5222b7ad65da98f75d6b0 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 09:29:38 -0400 Subject: [PATCH 4/8] fix(derive): rework abstractions and attributes queue testing --- crates/derive/src/stages/attributes_queue.rs | 73 ++++++++++++------- crates/derive/src/stages/mod.rs | 3 + .../src/stages/test_utils/attributes_queue.rs | 25 +++++++ .../src/stages/test_utils/batch_queue.rs | 44 +++++++++++ crates/derive/src/stages/test_utils/mod.rs | 8 ++ crates/derive/src/traits/data_sources.rs | 6 ++ crates/derive/src/traits/mod.rs | 2 +- 7 files changed, 133 insertions(+), 28 deletions(-) create mode 100644 crates/derive/src/stages/test_utils/attributes_queue.rs create mode 100644 crates/derive/src/stages/test_utils/batch_queue.rs create mode 100644 crates/derive/src/stages/test_utils/mod.rs diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 4950fc8fd..742f7b194 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,11 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - stages::batch_queue::BatchQueue, - traits::{ - ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, SafeBlockFetcher, - TelemetryProvider, - }, + traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider}, types::{ AttributesWithParent, BlockID, BlockInfo, L2BlockInfo, PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -19,13 +15,23 @@ use core::fmt::Debug; pub trait AttributesBuilder { /// Prepare the payload attributes. fn prepare_payload_attributes( - &self, + &mut self, l2_parent: L2BlockInfo, epoch: BlockID, ) -> anyhow::Result; } -/// [AttributesQueue] accepts batches from the [super::BatchQueue] stage +/// [AttributesProvider] is a trait abstraction that generalizes the [BatchQueue] stage. +#[async_trait] +pub trait AttributesProvider { + /// Returns the next valid batch upon the given safe head. + async fn next_batch(&mut self, parent: L2BlockInfo) -> StageResult; + + /// Returns whether the current batch is the last in its span. + fn is_last_in_span(&self) -> bool; +} + +/// [AttributesQueue] accepts batches from the [BatchQueue] stage /// and transforms them into [PayloadAttributes]. The outputted payload /// attributes cannot be buffered because each batch->attributes transformation /// pulls in data about the current L2 safe head. @@ -36,18 +42,16 @@ pub trait AttributesBuilder { /// This stage can be reset by clearing its batch buffer. /// This stage does not need to retain any references to L1 blocks. #[derive(Debug)] -pub struct AttributesQueue +pub struct AttributesQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, - BF: SafeBlockFetcher + Debug, + P: AttributesProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. cfg: RollupConfig, /// The previous stage of the derivation pipeline. - prev: BatchQueue, + prev: P, /// Telemetry provider. telemetry: T, /// Whether the current batch is the last in its span. @@ -58,21 +62,14 @@ where builder: AB, } -impl AttributesQueue +impl AttributesQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, - BF: SafeBlockFetcher + Debug, + P: AttributesProvider + OriginProvider + Debug, T: TelemetryProvider + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. - pub fn new( - cfg: RollupConfig, - prev: BatchQueue, - telemetry: T, - builder: AB, - ) -> Self { + pub fn new(cfg: RollupConfig, prev: P, telemetry: T, builder: AB) -> Self { Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder } } @@ -153,19 +150,41 @@ where } #[async_trait] -impl ResettableStage for AttributesQueue +impl ResettableStage for AttributesQueue where - DAP: DataAvailabilityProvider + Send + Debug, - CP: ChainProvider + Send + Debug, - BF: SafeBlockFetcher + Send + Debug, + P: AttributesProvider + OriginProvider + Send + Debug, T: TelemetryProvider + Send + Debug, AB: AttributesBuilder + Send + Debug, { async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> { self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info); - // TODO: metrice the reset + // TODO: metrice the reset using telemetry + // telemetry can provide a method of logging and metricing self.batch = None; self.is_last_in_span = false; Err(StageError::Eof) } } + +#[cfg(test)] +mod tests { + use super::{AttributesQueue, L2BlockInfo, RollupConfig, StageError}; + use crate::{ + stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder}, + traits::test_utils::TestTelemetry, + }; + use alloc::vec; + + #[tokio::test] + async fn test_load_batch_eof() { + let cfg = RollupConfig::default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(None, vec![]); + let mock_attributes_builder = MockAttributesBuilder::default(); + let mut attributes_queue = + AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder); + let parent = L2BlockInfo::default(); + let result = attributes_queue.load_batch(parent).await.unwrap_err(); + assert_eq!(result, StageError::Eof); + } +} diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index aef5fa27d..d998b075f 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -33,3 +33,6 @@ pub use batch_queue::BatchQueue; mod attributes_queue; pub use attributes_queue::AttributesQueue; + +#[cfg(test)] +pub mod test_utils; diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs new file mode 100644 index 000000000..17081de52 --- /dev/null +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -0,0 +1,25 @@ +//! Testing utilities for the attributes queue stage. + +use crate::{ + stages::attributes_queue::AttributesBuilder, + types::{BlockID, L2BlockInfo, PayloadAttributes}, +}; +use alloc::vec::Vec; + +/// A mock implementation of the [`AttributesBuilder`] for testing. +#[derive(Debug, Default)] +pub struct MockAttributesBuilder { + /// The attributes to return. + attributes: Vec>, +} + +impl AttributesBuilder for MockAttributesBuilder { + /// Prepares the [PayloadAttributes] for the next payload. + fn prepare_payload_attributes( + &mut self, + _l2_parent: L2BlockInfo, + _epoch: BlockID, + ) -> anyhow::Result { + self.attributes.pop().ok_or(anyhow::anyhow!("missing payload attribute"))? + } +} diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs new file mode 100644 index 000000000..06afd09da --- /dev/null +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -0,0 +1,44 @@ +//! A mock implementation of the [`BatchQueue`] stage for testing. + +use alloc::{boxed::Box, vec::Vec}; +use async_trait::async_trait; + +use crate::{ + stages::attributes_queue::AttributesProvider, + traits::OriginProvider, + types::{BlockInfo, L2BlockInfo, SingleBatch, StageError, StageResult}, +}; + +/// A mock implementation of the [`BatchQueue`] stage for testing. +#[derive(Debug, Default)] +pub struct MockBatchQueue { + /// The origin of the L1 block. + origin: Option, + /// A list of batches to return. + batches: Vec>, +} + +impl OriginProvider for MockBatchQueue { + fn origin(&self) -> Option<&BlockInfo> { + self.origin.as_ref() + } +} + +#[async_trait] +impl AttributesProvider for MockBatchQueue { + async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { + self.batches.pop().ok_or(StageError::Eof)? + } + + fn is_last_in_span(&self) -> bool { + self.batches.is_empty() + } +} + +/// Creates a new [`MockBatchQueue`] with the given origin and batches. +pub fn new_mock_batch_queue( + origin: Option, + batches: Vec>, +) -> MockBatchQueue { + MockBatchQueue { origin, batches } +} diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs new file mode 100644 index 000000000..b05611b71 --- /dev/null +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -0,0 +1,8 @@ +//! Test utilities for the stages module primarily contains +//! mock implementations of the various stages for testing. + +mod batch_queue; +pub use batch_queue::{new_mock_batch_queue, MockBatchQueue}; + +mod attributes_queue; +pub use attributes_queue::MockAttributesBuilder; diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index 89ea08d5e..682fa6c8c 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -8,6 +8,12 @@ use anyhow::Result; use async_trait::async_trait; use core::fmt::Debug; +/// Provides a method for accessing the pipeline origin. +pub trait OriginProvider { + /// Returns the optional L1 [BlockInfo] origin. + fn origin(&self) -> Option<&BlockInfo>; +} + /// Describes the functionality of a data source that can provide information from the blockchain. #[async_trait] pub trait ChainProvider { diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index b2db574c5..6700afb02 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,7 @@ //! pipeline. mod data_sources; -pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter, SafeBlockFetcher}; +pub use data_sources::*; mod stages; pub use stages::ResettableStage; From 6c67e03409df409b46c709ec398f9daff5ab6aab Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 09:55:25 -0400 Subject: [PATCH 5/8] fix(derive): error equality fixes and tests --- crates/derive/src/stages/attributes_queue.rs | 95 ++++++++++++++++++-- crates/derive/src/types/errors.rs | 3 + 2 files changed, 89 insertions(+), 9 deletions(-) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 742f7b194..1cab7bb79 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -168,23 +168,100 @@ where #[cfg(test)] mod tests { - use super::{AttributesQueue, L2BlockInfo, RollupConfig, StageError}; + use super::{ + AttributesQueue, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, + }; use crate::{ - stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder}, + stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue}, traits::test_utils::TestTelemetry, }; - use alloc::vec; + use alloc::{vec, vec::Vec}; + use alloy_primitives::b256; - #[tokio::test] - async fn test_load_batch_eof() { - let cfg = RollupConfig::default(); + fn new_attributes_queue( + cfg: Option, + origin: Option, + batches: Vec>, + ) -> AttributesQueue { + let cfg = cfg.unwrap_or_default(); let telemetry = TestTelemetry::new(); - let mock_batch_queue = new_mock_batch_queue(None, vec![]); + let mock_batch_queue = new_mock_batch_queue(origin, batches); let mock_attributes_builder = MockAttributesBuilder::default(); - let mut attributes_queue = - AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder); + AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_attributes_builder) + } + + #[tokio::test] + async fn test_load_batch_eof() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); let parent = L2BlockInfo::default(); let result = attributes_queue.load_batch(parent).await.unwrap_err(); assert_eq!(result, StageError::Eof); } + + #[tokio::test] + async fn test_load_batch_last_in_span() { + let mut attributes_queue = new_attributes_queue(None, None, vec![Ok(Default::default())]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.load_batch(parent).await.unwrap(); + assert_eq!(result, Default::default()); + assert!(attributes_queue.is_last_in_span); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_parent_hash() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let bad_hash = b256!("6666666666666666666666666666666666666666666666666666666666666666"); + let parent = L2BlockInfo { + block_info: BlockInfo { hash: bad_hash, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch::default(); + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!( + result, + StageError::Reset(super::ResetError::BadParentHash(Default::default(), bad_hash)) + ); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_timestamp() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 0))); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_parent_timestamp() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo { + block_info: BlockInfo { timestamp: 2, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2))); + } + + #[tokio::test] + async fn test_create_next_attributes_bad_config_timestamp() { + let cfg = RollupConfig { block_time: 1, ..Default::default() }; + let mut attributes_queue = new_attributes_queue(Some(cfg), None, vec![]); + let parent = L2BlockInfo { + block_info: BlockInfo { timestamp: 1, ..Default::default() }, + ..Default::default() + }; + let batch = SingleBatch { timestamp: 1, ..Default::default() }; + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2))); + } + + #[tokio::test] + async fn test_next_attributes_load_batch_eof() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let result = attributes_queue.next_attributes(parent).await.unwrap_err(); + assert_eq!(result, StageError::Eof); + } } diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index f1abf05fa..08e7d796d 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -37,6 +37,9 @@ impl PartialEq for StageError { if let (StageError::ReorgDetected(a, b), StageError::ReorgDetected(c, d)) = (self, other) { return a == c && b == d; } + if let (StageError::Reset(a), StageError::Reset(b)) = (self, other) { + return a == b; + } matches!( (self, other), (StageError::Eof, StageError::Eof) | From 70563c08e5357274fa4f906955309e16e4d9fad3 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 10:15:58 -0400 Subject: [PATCH 6/8] fix(derive): successful payload attributes building tests --- crates/derive/src/stages/attributes_queue.rs | 56 +++++++++++++++---- .../src/stages/test_utils/attributes_queue.rs | 2 +- crates/derive/src/types/attributes.rs | 2 +- crates/derive/src/types/errors.rs | 4 ++ crates/derive/src/types/mod.rs | 2 +- 5 files changed, 52 insertions(+), 14 deletions(-) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 1cab7bb79..63122a713 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -78,7 +78,7 @@ where self.prev.origin() } - /// Loads a batch from the previous stage if needed. + /// Loads a [SingleBatch] from the [AttributesProvider] if needed. pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult { if self.batch.is_none() { let batch = self.prev.next_batch(parent).await?; @@ -88,29 +88,27 @@ where self.batch.as_ref().cloned().ok_or(StageError::Eof) } - /// Returns the next payload attributes from the current batch. + /// Returns the next [AttributesWithParent] from the current batch. pub async fn next_attributes( &mut self, parent: L2BlockInfo, ) -> StageResult { - // Load the batch + // Load the batch. let batch = self.load_batch(parent).await?; - // Construct the payload attributes from the loaded batch + // Construct the payload attributes from the loaded batch. let attributes = self.create_next_attributes(batch, parent).await?; let populated_attributes = AttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span }; - // Clear out the local state once we will succeed + // Clear out the local state once payload attributes are prepared. self.batch = None; self.is_last_in_span = false; Ok(populated_attributes) } - /// Creates the next attributes. - /// Transforms a [SingleBatch] into [PayloadAttributes]. - /// This sets `NoTxPool` and appends the batched transactions to the attributes transaction - /// list. + /// Creates the next attributes, transforming a [SingleBatch] into [PayloadAttributes]. + /// This sets `no_tx_pool` and appends the batched txs to the attributes tx list. pub async fn create_next_attributes( &mut self, batch: SingleBatch, @@ -132,7 +130,10 @@ where // Prepare the payload attributes let tx_count = batch.transactions.len(); - let mut attributes = self.builder.prepare_payload_attributes(parent, batch.epoch())?; + let mut attributes = self + .builder + .prepare_payload_attributes(parent, batch.epoch()) + .map_err(StageError::AttributesBuild)?; attributes.no_tx_pool = true; attributes.transactions.extend(batch.transactions); @@ -169,11 +170,13 @@ where #[cfg(test)] mod tests { use super::{ - AttributesQueue, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, + AttributesQueue, BlockInfo, L2BlockInfo, PayloadAttributes, RollupConfig, SingleBatch, + StageError, StageResult, }; use crate::{ stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue}, traits::test_utils::TestTelemetry, + types::RawTransaction, }; use alloc::{vec, vec::Vec}; use alloy_primitives::b256; @@ -257,6 +260,37 @@ mod tests { assert_eq!(result, StageError::Reset(super::ResetError::BadTimestamp(1, 2))); } + #[tokio::test] + async fn test_create_next_attributes_preparation_fails() { + let mut attributes_queue = new_attributes_queue(None, None, vec![]); + let parent = L2BlockInfo::default(); + let batch = SingleBatch::default(); + let result = attributes_queue.create_next_attributes(batch, parent).await.unwrap_err(); + assert_eq!( + result, + StageError::AttributesBuild(anyhow::anyhow!("missing payload attribute")) + ); + } + + #[tokio::test] + async fn test_create_next_attributes_success() { + let cfg = RollupConfig::default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(None, vec![]); + let mut payload_attributes = PayloadAttributes::default(); + let mock_builder = + MockAttributesBuilder { attributes: vec![Ok(payload_attributes.clone())] }; + let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + let parent = L2BlockInfo::default(); + let txs = vec![RawTransaction::default(), RawTransaction::default()]; + let batch = SingleBatch { transactions: txs.clone(), ..Default::default() }; + let attributes = aq.create_next_attributes(batch, parent).await.unwrap(); + // update the expected attributes + payload_attributes.no_tx_pool = true; + payload_attributes.transactions.extend(txs); + assert_eq!(attributes, payload_attributes); + } + #[tokio::test] async fn test_next_attributes_load_batch_eof() { let mut attributes_queue = new_attributes_queue(None, None, vec![]); diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs index 17081de52..cb36f3838 100644 --- a/crates/derive/src/stages/test_utils/attributes_queue.rs +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -10,7 +10,7 @@ use alloc::vec::Vec; #[derive(Debug, Default)] pub struct MockAttributesBuilder { /// The attributes to return. - attributes: Vec>, + pub attributes: Vec>, } impl AttributesBuilder for MockAttributesBuilder { diff --git a/crates/derive/src/types/attributes.rs b/crates/derive/src/types/attributes.rs index a12fa9fe5..16aa7c507 100644 --- a/crates/derive/src/types/attributes.rs +++ b/crates/derive/src/types/attributes.rs @@ -9,7 +9,7 @@ use alloy_primitives::{Address, B256}; /// Payload attributes. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct PayloadAttributes { /// Value for the timestamp field of the new payload. #[cfg_attr(feature = "serde", serde(rename = "timestamp"))] diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 08e7d796d..2b88cf86a 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -15,6 +15,8 @@ pub enum StageError { /// There is not enough data progress, but if we wait, the stage will eventually return data /// or produce an EOF error. NotEnoughData, + /// Failed to build the [super::PayloadAttributes] for the next batch. + AttributesBuild(anyhow::Error), /// Reset the pipeline. Reset(ResetError), /// The stage detected a block reorg. @@ -44,6 +46,7 @@ impl PartialEq for StageError { (self, other), (StageError::Eof, StageError::Eof) | (StageError::NotEnoughData, StageError::NotEnoughData) | + (StageError::AttributesBuild(_), StageError::AttributesBuild(_)) | (StageError::ReceiptFetch(_), StageError::ReceiptFetch(_)) | (StageError::BlockInfoFetch(_), StageError::BlockInfoFetch(_)) | (StageError::SystemConfigUpdate(_), StageError::SystemConfigUpdate(_)) | @@ -63,6 +66,7 @@ impl Display for StageError { match self { StageError::Eof => write!(f, "End of file"), StageError::NotEnoughData => write!(f, "Not enough data"), + StageError::AttributesBuild(e) => write!(f, "Attributes build error: {}", e), StageError::Reset(e) => write!(f, "Reset error: {}", e), StageError::ReceiptFetch(e) => write!(f, "Receipt fetch error: {}", e), StageError::SystemConfigUpdate(e) => write!(f, "System config update error: {}", e), diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 80817630a..ca1dfdb83 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -55,7 +55,7 @@ pub use errors::*; /// A raw transaction #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct RawTransaction(pub Bytes); impl RawTransaction { From 2adbaf5792c11cc7ecd08b04ba9d30dd986d47b3 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 10:31:06 -0400 Subject: [PATCH 7/8] feat(derive): add next_attributes test --- crates/derive/src/stages/attributes_queue.rs | 29 ++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 63122a713..8612cefdc 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -170,8 +170,8 @@ where #[cfg(test)] mod tests { use super::{ - AttributesQueue, BlockInfo, L2BlockInfo, PayloadAttributes, RollupConfig, SingleBatch, - StageError, StageResult, + AttributesQueue, AttributesWithParent, BlockInfo, L2BlockInfo, PayloadAttributes, + RollupConfig, SingleBatch, StageError, StageResult, }; use crate::{ stages::test_utils::{new_mock_batch_queue, MockAttributesBuilder, MockBatchQueue}, @@ -298,4 +298,29 @@ mod tests { let result = attributes_queue.next_attributes(parent).await.unwrap_err(); assert_eq!(result, StageError::Eof); } + + #[tokio::test] + async fn test_next_attributes_load_batch_last_in_span() { + let cfg = RollupConfig::default(); + let telemetry = TestTelemetry::new(); + let mock_batch_queue = new_mock_batch_queue(None, vec![Ok(Default::default())]); + let mut pa = PayloadAttributes::default(); + let mock_builder = MockAttributesBuilder { attributes: vec![Ok(pa.clone())] }; + let mut aq = AttributesQueue::new(cfg, mock_batch_queue, telemetry, mock_builder); + // If we load the batch, we should get the last in span. + // But it won't take it so it will be available in the next_attributes call. + let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); + // This should successfully construct the next payload attributes. + // It should also reset the last in span flag and clear the batch. + let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap(); + pa.no_tx_pool = true; + let populated_attributes = AttributesWithParent { + attributes: pa, + parent: L2BlockInfo::default(), + is_last_in_span: true, + }; + assert_eq!(attributes, populated_attributes); + assert!(!aq.is_last_in_span); + assert!(aq.batch.is_none()); + } } From d1470b003dd8ac09bfe7db351a9da46c52bcb8d9 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 10:32:01 -0400 Subject: [PATCH 8/8] fix(derive): extend attributes queue unit test --- crates/derive/src/stages/attributes_queue.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 8612cefdc..4771a1b37 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -310,6 +310,8 @@ mod tests { // If we load the batch, we should get the last in span. // But it won't take it so it will be available in the next_attributes call. let _ = aq.load_batch(L2BlockInfo::default()).await.unwrap(); + assert!(aq.is_last_in_span); + assert!(aq.batch.is_some()); // This should successfully construct the next payload attributes. // It should also reset the last in span flag and clear the batch. let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap();