From 52036db604f4ec0bfd8df9588554b1553f3b29cd Mon Sep 17 00:00:00 2001 From: refcell Date: Fri, 26 Apr 2024 18:08:18 -0700 Subject: [PATCH] feat(derive): Pipeline Builder (#127) * feat(derive): span batch validation * feat(derive): span batch validity unit tests * feat(derive): span batch unit tests with acceptance test * fix(derive): unit tests * fix(derive): add more unit tests * feat(derive): span batch validity unit tests for txs * feat(derive): pipeline builder * fix(derive): so close :sadge * fix(derive): ugly refactor * fix(derive): pipeline construction and trait abstractions * fix(derive): nit fixes * fix(derive): temp manual deposit type check --- crates/derive/Cargo.toml | 1 + crates/derive/src/builder.rs | 121 +++++++++ crates/derive/src/lib.rs | 27 +- crates/derive/src/online/mod.rs | 38 ++- crates/derive/src/stages/attributes_queue.rs | 60 ++++- crates/derive/src/stages/batch_queue.rs | 241 +++++++++++++++--- crates/derive/src/stages/channel_bank.rs | 40 ++- crates/derive/src/stages/channel_reader.rs | 47 +++- crates/derive/src/stages/frame_queue.rs | 38 ++- crates/derive/src/stages/l1_retrieval.rs | 43 +++- crates/derive/src/stages/l1_traversal.rs | 50 ++-- crates/derive/src/stages/mod.rs | 3 +- .../src/stages/test_utils/attributes_queue.rs | 24 +- .../src/stages/test_utils/batch_queue.rs | 28 +- .../src/stages/test_utils/channel_bank.rs | 26 +- .../src/stages/test_utils/channel_reader.rs | 26 +- .../src/stages/test_utils/frame_queue.rs | 26 +- crates/derive/src/traits/mod.rs | 2 +- crates/derive/src/traits/stages.rs | 14 + .../src/types/batch/span_batch/batch.rs | 18 +- crates/derive/src/types/payload.rs | 15 +- 21 files changed, 751 insertions(+), 137 deletions(-) create mode 100644 crates/derive/src/builder.rs diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index b807ce127..794865904 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -60,6 +60,7 @@ online = [ "dep:alloy-provider", "dep:alloy-transport-http", "dep:reqwest", + "alloy-provider/reqwest", "alloy-consensus/serde", "c-kzg/serde", "revm-primitives/serde", diff --git a/crates/derive/src/builder.rs b/crates/derive/src/builder.rs new file mode 100644 index 000000000..b3023231e --- /dev/null +++ b/crates/derive/src/builder.rs @@ -0,0 +1,121 @@ +//! Contains a concrete implementation of the [DerivationPipeline]. + +use crate::{ + stages::NextAttributes, + traits::{OriginAdvancer, ResettableStage}, + types::{ + BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig, + }, +}; +use alloc::{boxed::Box, collections::VecDeque}; +use async_trait::async_trait; +use core::fmt::Debug; + +/// Provides the [BlockInfo] and [SystemConfig] for the stack to reset the stages. +#[async_trait] +pub trait ResetProvider { + /// Returns the current [BlockInfo] for the pipeline to reset. + async fn block_info(&self) -> BlockInfo; + + /// Returns the current [SystemConfig] for the pipeline to reset. + async fn system_config(&self) -> SystemConfig; +} + +/// The derivation pipeline is responsible for deriving L2 inputs from L1 data. +#[derive(Debug)] +pub struct DerivationPipeline< + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, +> { + /// A handle to the next attributes. + pub attributes: S, + /// Reset provider for the pipeline. + pub reset: R, + /// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer. + pub prepared: VecDeque, + /// A flag to tell the pipeline to reset. + pub needs_reset: bool, + /// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes. + pub cursor: L2BlockInfo, +} + +impl< + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, + > DerivationPipeline +{ + /// Creates a new instance of the [DerivationPipeline]. + pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self { + Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor } + } + + /// Set the [L2BlockInfo] cursor to be used when pulling the next attributes. + pub fn set_cursor(&mut self, cursor: L2BlockInfo) { + self.cursor = cursor; + } + + /// Returns the next [L2AttributesWithParent] from the pipeline. + pub fn next_attributes(&mut self) -> Option { + self.prepared.pop_front() + } + + /// Flags the pipeline to reset on the next [DerivationPipeline::step] call. + pub fn reset(&mut self) { + self.needs_reset = true; + } + + /// Resets the pipeline. + async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { + match self.attributes.reset(bi, sc).await { + Ok(()) => { + tracing::info!("Stages reset"); + } + Err(StageError::Eof) => { + tracing::info!("Stages reset with EOF"); + } + Err(err) => { + tracing::error!("Stages reset failed: {:?}", err); + return Err(err); + } + } + Ok(()) + } + + /// Attempts to progress the pipeline. + /// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data. + /// Any other error is critical and the derivation pipeline should be reset. + /// An error is expected when the underlying source closes. + /// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the + /// derivation process. + pub async fn step(&mut self) -> StageResult<()> { + tracing::info!("DerivationPipeline::step"); + + // Reset the pipeline if needed. + if self.needs_reset { + let block_info = self.reset.block_info().await; + let system_config = self.reset.system_config().await; + self.reset_pipe(block_info, &system_config).await?; + self.needs_reset = false; + } + + match self.attributes.next_attributes(self.cursor).await { + Ok(a) => { + tracing::info!("attributes queue stage step returned l2 attributes"); + tracing::info!("prepared L2 attributes: {:?}", a); + self.prepared.push_back(a); + return Ok(()); + } + Err(StageError::Eof) => { + tracing::info!("attributes queue stage complete"); + self.attributes.advance_origin().await?; + } + // TODO: match on the EngineELSyncing error here and log + Err(err) => { + tracing::error!("attributes queue stage failed: {:?}", err); + return Err(err); + } + } + + Ok(()) + } +} diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 00c8bc34e..087ba4ac6 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -6,11 +6,6 @@ extern crate alloc; -use alloc::sync::Arc; -use core::fmt::Debug; -use traits::ChainProvider; -use types::RollupConfig; - mod params; pub use params::{ ChannelID, CHANNEL_ID_LENGTH, CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC, @@ -19,6 +14,9 @@ pub use params::{ MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS, }; +pub mod builder; +pub use builder::DerivationPipeline; + pub mod sources; pub mod stages; pub mod traits; @@ -27,18 +25,7 @@ pub mod types; #[cfg(feature = "online")] mod online; #[cfg(feature = "online")] -pub use online::prelude::*; - -/// The derivation pipeline is responsible for deriving L2 inputs from L1 data. -#[derive(Debug, Clone, Copy)] -pub struct DerivationPipeline; - -impl DerivationPipeline { - /// Creates a new instance of the [DerivationPipeline]. - pub fn new

(_rollup_config: Arc, _chain_provider: P) -> Self - where - P: ChainProvider + Clone + Debug + Send, - { - unimplemented!("TODO: High-level pipeline composition helper.") - } -} +pub use online::{ + new_online_stack, AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient, + OnlineBlobProvider, SimpleSlotDerivation, +}; diff --git a/crates/derive/src/online/mod.rs b/crates/derive/src/online/mod.rs index 996052e09..c851b0deb 100644 --- a/crates/derive/src/online/mod.rs +++ b/crates/derive/src/online/mod.rs @@ -1,11 +1,37 @@ //! Contains "online" implementations for providers. -/// Prelude for online providers. -pub(crate) mod prelude { - pub use super::{ - AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient, - OnlineBlobProvider, SimpleSlotDerivation, - }; +use crate::{ + stages::{ + AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, + L1Traversal, NextAttributes, StatefulAttributesBuilder, + }, + traits::{DataAvailabilityProvider, ResettableStage}, + types::RollupConfig, +}; + +use alloc::sync::Arc; +use alloy_provider::ReqwestProvider; +use core::fmt::Debug; + +/// Creates a new [OnlineStageStack]. +#[cfg(feature = "online")] +pub fn new_online_stack( + rollup_config: Arc, + chain_provider: AlloyChainProvider, + dap_source: impl DataAvailabilityProvider + Send + Sync + Debug, + fetcher: AlloyL2ChainProvider, + builder: StatefulAttributesBuilder< + AlloyChainProvider, + AlloyL2ChainProvider, + >, +) -> impl NextAttributes + ResettableStage + Debug + Send { + let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone()); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); + let frame_queue = FrameQueue::new(l1_retrieval); + let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); + let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); + let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); + AttributesQueue::new(*rollup_config, batch_queue, builder) } #[cfg(test)] diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index 98383de83..c3f4e87b4 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,7 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - traits::{OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -30,6 +30,14 @@ pub trait AttributesProvider { fn is_last_in_span(&self) -> bool; } +/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage. +#[async_trait] +pub trait NextAttributes { + /// Returns the next [L2AttributesWithParent] from the current batch. + async fn next_attributes(&mut self, parent: L2BlockInfo) + -> StageResult; +} + /// [AttributesQueue] accepts batches from the [BatchQueue] stage /// and transforms them into [L2PayloadAttributes]. The outputted payload /// attributes cannot be buffered because each batch->attributes transformation @@ -45,7 +53,7 @@ pub trait AttributesProvider { #[derive(Debug)] pub struct AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. @@ -62,7 +70,7 @@ where impl AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. @@ -139,9 +147,44 @@ where } } +impl PreviousStage for AttributesQueue +where + P: AttributesProvider + PreviousStage + Send + Debug, + AB: AttributesBuilder + Send + Debug, +{ + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} + +#[async_trait] +impl OriginAdvancer for AttributesQueue +where + P: AttributesProvider + PreviousStage + Debug + Send, + AB: AttributesBuilder + Debug + Send, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + +#[async_trait] +impl NextAttributes for AttributesQueue +where + P: AttributesProvider + PreviousStage + Debug + Send, + AB: AttributesBuilder + Debug + Send, +{ + async fn next_attributes( + &mut self, + parent: L2BlockInfo, + ) -> StageResult { + self.next_attributes(parent).await + } +} + impl OriginProvider for AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -152,10 +195,15 @@ where #[async_trait] impl ResettableStage for AttributesQueue where - P: AttributesProvider + OriginProvider + Send + Debug, + P: AttributesProvider + PreviousStage + Send + Debug, AB: AttributesBuilder + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; info!("resetting attributes queue"); self.batch = None; self.is_last_in_span = false; diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 9a66fb20a..ca4bd31f2 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::attributes_queue::AttributesProvider, - traits::{L2ChainProvider, OriginProvider, ResettableStage}, + traits::{L2ChainProvider, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -43,7 +43,7 @@ pub trait BatchQueueProvider { #[derive(Debug)] pub struct BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { /// The rollup config. @@ -75,7 +75,7 @@ where impl BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. @@ -242,10 +242,21 @@ where } } +#[async_trait] +impl OriginAdvancer for BatchQueue +where + P: BatchQueueProvider + PreviousStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl AttributesProvider for BatchQueue where - P: BatchQueueProvider + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, BF: L2ChainProvider + Send + Debug, { /// Returns the next valid batch upon the given safe head. @@ -290,7 +301,7 @@ where // We always update the origin of this stage if it's not the same so // after the update code runs, this is consistent. let origin_behind = - self.origin.map_or(true, |origin| origin.number < parent.l1_origin.number); + self.prev.origin().map_or(true, |origin| origin.number < parent.l1_origin.number); // Advance the origin if needed. // The entire pipeline has the same origin. @@ -374,7 +385,7 @@ where impl OriginProvider for BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -382,13 +393,24 @@ where } } +impl PreviousStage for BatchQueue +where + P: BatchQueueProvider + PreviousStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, +{ + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} + #[async_trait] impl ResettableStage for BatchQueue where - P: BatchQueueProvider + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, BF: L2ChainProvider + Send + Debug, { - async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, system_config).await?; // Copy over the Origin from the next stage. // It is set in the engine queue (two stages away) // such that the L2 Safe Head origin is the progress. @@ -408,12 +430,23 @@ where mod tests { use super::*; use crate::{ - stages::{channel_reader::BatchReader, test_utils::MockBatchQueueProvider}, + stages::{ + channel_reader::BatchReader, + test_utils::{CollectingLayer, MockBatchQueueProvider, TraceStorage}, + }, traits::test_utils::MockBlockFetcher, - types::BatchType, + types::{ + BatchType, BlockID, Genesis, L1BlockInfoBedrock, L1BlockInfoTx, L2ExecutionPayload, + L2ExecutionPayloadEnvelope, + }, }; use alloc::vec; + use alloy_primitives::{address, b256, Address, Bytes, TxKind, B256, U256}; + use alloy_rlp::{BytesMut, Encodable}; use miniz_oxide::deflate::compress_to_vec_zlib; + use op_alloy_consensus::{OpTxType, TxDeposit}; + use tracing::Level; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; fn new_batch_reader() -> BatchReader { let raw_data = include_bytes!("../../testdata/raw_batch.hex"); @@ -448,24 +481,172 @@ mod tests { assert!(bq.is_last_in_span()); } - // TODO(refcell): The batch reader here loops forever. - // Maybe the cursor isn't being used? - // UPDATE: the batch data is not valid - // #[tokio::test] - // async fn test_next_batch_succeeds() { - // let mut reader = new_batch_reader(); - // let mut batch_vec: Vec> = vec![]; - // while let Some(batch) = reader.next_batch() { - // batch_vec.push(Ok(batch)); - // } - // let mock = MockBatchQueueProvider::new(batch_vec); - // let telemetry = TestTelemetry::new(); - // let fetcher = MockBlockFetcher::default(); - // let mut bq = BatchQueue::new(RollupConfig::default(), mock, telemetry, fetcher); - // let res = bq.next_batch(L2BlockInfo::default()).await.unwrap(); - // assert_eq!(res, SingleBatch::default()); - // assert!(bq.is_last_in_span()); - // } + #[tokio::test] + async fn test_next_batch_origin_behind() { + let mut reader = new_batch_reader(); + let cfg = Arc::new(RollupConfig::default()); + let mut batch_vec: Vec> = vec![]; + while let Some(batch) = reader.next_batch(cfg.as_ref()) { + batch_vec.push(Ok(batch)); + } + let mut mock = MockBatchQueueProvider::new(batch_vec); + mock.origin = Some(BlockInfo::default()); + let fetcher = MockBlockFetcher::default(); + let mut bq = BatchQueue::new(cfg, mock, fetcher); + let parent = L2BlockInfo { + l1_origin: BlockID { number: 10, ..Default::default() }, + ..Default::default() + }; + let res = bq.next_batch(parent).await.unwrap_err(); + assert_eq!(res, StageError::NotEnoughData); + } + + #[tokio::test] + async fn test_next_batch_missing_origin() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + let mut reader = new_batch_reader(); + let payload_block_hash = + b256!("4444444444444444444444444444444444444444444444444444444444444444"); + let cfg = Arc::new(RollupConfig { + delta_time: Some(0), + block_time: 100, + max_sequencer_drift: 10000000, + seq_window_size: 10000000, + genesis: Genesis { + l2: BlockID { number: 8, hash: payload_block_hash }, + l1: BlockID { number: 16988980031808077784, ..Default::default() }, + ..Default::default() + }, + ..Default::default() + }); + let mut batch_vec: Vec> = vec![]; + let mut batch_txs: Vec = vec![]; + let mut second_batch_txs: Vec = vec![]; + while let Some(batch) = reader.next_batch(cfg.as_ref()) { + if let Batch::Span(span) = &batch { + let bys = span.batches[0] + .transactions + .iter() + .cloned() + .map(|tx| tx.0) + .collect::>(); + let sbys = span.batches[1] + .transactions + .iter() + .cloned() + .map(|tx| tx.0) + .collect::>(); + second_batch_txs.extend(sbys); + batch_txs.extend(bys); + } + batch_vec.push(Ok(batch)); + } + // Insert a deposit transaction in the front of the second batch txs + let expected = L1BlockInfoBedrock { + number: 16988980031808077784, + time: 1697121143, + base_fee: 10419034451, + block_hash: b256!("392012032675be9f94aae5ab442de73c5f4fb1bf30fa7dd0d2442239899a40fc"), + sequence_number: 4, + batcher_address: address!("6887246668a3b87f54deb3b94ba47a6f63f32985"), + l1_fee_overhead: U256::from(0xbc), + l1_fee_scalar: U256::from(0xa6fe0), + }; + let deposit_tx_calldata: Bytes = L1BlockInfoTx::Bedrock(expected).encode_calldata(); + let tx = TxDeposit { + source_hash: B256::left_padding_from(&[0xde, 0xad]), + from: Address::left_padding_from(&[0xbe, 0xef]), + mint: Some(1), + gas_limit: 2, + to: TxKind::Call(Address::left_padding_from(&[3])), + value: U256::from(4_u64), + input: deposit_tx_calldata, + is_system_transaction: false, + }; + let mut buf = BytesMut::new(); + tx.encode(&mut buf); + let prefixed = [&[OpTxType::Deposit as u8], &buf[..]].concat(); + second_batch_txs.insert(0, Bytes::copy_from_slice(&prefixed)); + let mut mock = MockBatchQueueProvider::new(batch_vec); + let origin_check = + b256!("8527cdb6f601acf9b483817abd1da92790c92b19000000000000000000000000"); + mock.origin = Some(BlockInfo { + number: 16988980031808077784, + timestamp: 1639845845, + parent_hash: Default::default(), + hash: origin_check, + }); + let origin = mock.origin; + + let parent_check = + b256!("01ddf682e2f8a6f10c2207e02322897e65317196000000000000000000000000"); + let block_nine = L2BlockInfo { + block_info: BlockInfo { + number: 9, + timestamp: 1639845645, + parent_hash: parent_check, + hash: origin_check, + }, + ..Default::default() + }; + let block_seven = L2BlockInfo { + block_info: BlockInfo { + number: 7, + timestamp: 1639845745, + parent_hash: parent_check, + hash: origin_check, + }, + ..Default::default() + }; + let payload = L2ExecutionPayloadEnvelope { + parent_beacon_block_root: None, + execution_payload: L2ExecutionPayload { + block_number: 8, + block_hash: payload_block_hash, + transactions: batch_txs, + ..Default::default() + }, + }; + let second = L2ExecutionPayloadEnvelope { + parent_beacon_block_root: None, + execution_payload: L2ExecutionPayload { + block_number: 9, + block_hash: payload_block_hash, + transactions: second_batch_txs, + ..Default::default() + }, + }; + let fetcher = MockBlockFetcher { + blocks: vec![block_nine, block_seven], + payloads: vec![payload, second], + ..Default::default() + }; + let mut bq = BatchQueue::new(cfg, mock, fetcher); + let parent = L2BlockInfo { + block_info: BlockInfo { + number: 9, + timestamp: 1639845745, + parent_hash: parent_check, + hash: origin_check, + }, + l1_origin: BlockID { number: 16988980031808077784, hash: origin_check }, + ..Default::default() + }; + let res = bq.next_batch(parent).await.unwrap_err(); + let logs = trace_store.get_by_level(Level::INFO); + assert_eq!(logs.len(), 4); + let str = alloc::format!("Advancing batch queue origin: {:?}", origin); + assert!(logs[0].contains(&str)); + assert!(logs[1].contains("need more l1 blocks to check entire origins of span batch")); + assert!(logs[2].contains("Deriving next batch for epoch: 16988980031808077784")); + assert!(logs[3].contains("need more l1 blocks to check entire origins of span batch")); + let warns = trace_store.get_by_level(Level::WARN); + assert_eq!(warns.len(), 0); + assert_eq!(res, StageError::NotEnoughData); + } #[tokio::test] async fn test_batch_queue_empty_bytes() { @@ -475,7 +656,7 @@ mod tests { let fetcher = MockBlockFetcher::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo::default(); - let result = bq.next_batch(parent).await; - assert!(result.is_err()); + let batch = bq.next_batch(parent).await.unwrap(); + assert_eq!(batch, SingleBatch::default()); } } diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 03027515e..078341fd4 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -3,7 +3,7 @@ use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, stages::ChannelReaderProvider, - traits::{OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -37,7 +37,7 @@ pub trait ChannelBankProvider { #[derive(Debug)] pub struct ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { /// The rollup configuration. cfg: Arc, @@ -51,7 +51,7 @@ where impl

ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { @@ -117,8 +117,6 @@ where let first = self.channel_queue[0]; let channel = self.channels.get(&first).ok_or(StageError::ChannelNotFound)?; let origin = self.origin().ok_or(StageError::MissingOrigin)?; - - // Remove all timed out channels from the front of the `channel_queue`. if channel.open_block_number() + self.cfg.channel_timeout < origin.number { warn!("Channel {:?} timed out", first); self.channels.remove(&first); @@ -165,10 +163,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for ChannelBank

+where + P: ChannelBankProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

ChannelReaderProvider for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + Send + Debug, { async fn next_data(&mut self) -> StageResult> { match self.read() { @@ -190,19 +198,33 @@ where impl

OriginProvider for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl

PreviousStage for ChannelBank

+where + P: ChannelBankProvider + PreviousStage + Debug + Send, +{ + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} + #[async_trait] impl

ResettableStage for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; self.channels.clear(); self.channel_queue = VecDeque::with_capacity(10); Err(StageError::Eof) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index e6cf77611..e24bd5c79 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,8 +2,8 @@ use crate::{ stages::BatchQueueProvider, - traits::OriginProvider, - types::{Batch, BlockInfo, RollupConfig, StageError, StageResult}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, + types::{Batch, BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; @@ -27,7 +27,7 @@ pub trait ChannelReaderProvider { #[derive(Debug)] pub struct ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { /// The previous stage of the derivation pipeline. prev: P, @@ -39,7 +39,7 @@ where impl

ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { @@ -62,10 +62,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

BatchQueueProvider for ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Send + Debug, + P: ChannelReaderProvider + PreviousStage + Send + Debug, { async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { @@ -91,13 +101,34 @@ where impl

OriginProvider for ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +#[async_trait] +impl

ResettableStage for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + Debug + Send, +{ + async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, cfg).await?; + self.next_channel(); + Ok(()) + } +} + +impl

PreviousStage for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + Send + Debug, +{ + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} + /// Batch Reader provides a function that iteratively consumes batches from the reader. /// The L1Inclusion block is also provided at creation time. /// Warning: the batch reader can read every batch-type. @@ -122,11 +153,11 @@ impl BatchReader { } // Decompress and RLP decode the batch data, before finally decoding the batch itself. - let mut decompressed_reader = self.decompressed.as_slice(); + let mut decompressed_reader = self.decompressed.as_slice()[self.cursor..].as_ref(); let batch = Batch::decode(&mut decompressed_reader, cfg).ok()?; // Advance the cursor on the reader. - self.cursor += self.decompressed.len() - decompressed_reader.len(); + self.cursor = self.decompressed.len() - decompressed_reader.len(); Some(batch) } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index be7ac6bcb..07586ffcb 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelBankProvider, - traits::{OriginProvider, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -31,7 +31,7 @@ pub trait FrameQueueProvider { #[derive(Debug)] pub struct FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { /// The previous stage in the pipeline. pub prev: P, @@ -41,7 +41,7 @@ where impl

FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// @@ -51,10 +51,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for FrameQueue

+where + P: FrameQueueProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

ChannelBankProvider for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { @@ -87,19 +97,33 @@ where impl

OriginProvider for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl

PreviousStage for FrameQueue

+where + P: FrameQueueProvider + PreviousStage + Send + Debug, +{ + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} + #[async_trait] impl

ResettableStage for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; self.queue = VecDeque::default(); Err(StageError::Eof) } diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 8d2391766..801302e44 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -2,7 +2,10 @@ use crate::{ stages::FrameQueueProvider, - traits::{AsyncIterator, DataAvailabilityProvider, OriginProvider, ResettableStage}, + traits::{ + AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, PreviousStage, + ResettableStage, + }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; @@ -12,6 +15,7 @@ use async_trait::async_trait; /// Provides L1 blocks for the [L1Retrieval] stage. /// This is the previous stage in the pipeline. +#[async_trait] pub trait L1RetrievalProvider { /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. /// This function can only be called once while the stage is in progress, and will return @@ -19,7 +23,7 @@ pub trait L1RetrievalProvider { /// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned. /// /// [L1Traversal]: crate::stages::L1Traversal - fn next_l1_block(&mut self) -> StageResult>; + async fn next_l1_block(&mut self) -> StageResult>; /// Returns the batcher [Address] from the [crate::types::SystemConfig]. fn batcher_addr(&self) -> Address; @@ -36,7 +40,7 @@ pub trait L1RetrievalProvider { pub struct L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { /// The previous stage in the pipeline. pub prev: P, @@ -49,7 +53,7 @@ where impl L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] stage and given /// [DataAvailabilityProvider]. @@ -60,11 +64,22 @@ where } } +#[async_trait] +impl OriginAdvancer for L1Retrieval +where + DAP: DataAvailabilityProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl FrameQueueProvider for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { type Item = DAP::Item; @@ -72,7 +87,8 @@ where if self.data.is_none() { let next = self .prev - .next_l1_block()? + .next_l1_block() + .await? // SAFETY: This question mark bubbles up the Eof error. .ok_or_else(|| anyhow!("No block to retrieve data from"))?; self.data = Some(self.provider.open_data(&next, self.prev.batcher_addr()).await?); } @@ -92,20 +108,31 @@ where impl OriginProvider for L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl PreviousStage for L1Retrieval +where + DAP: DataAvailabilityProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, +{ + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} + #[async_trait] impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, cfg).await?; self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?); Ok(()) } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 6403291fd..65c975108 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,7 +2,7 @@ use crate::{ stages::L1RetrievalProvider, - traits::{ChainProvider, OriginProvider, ResettableStage}, + traits::{ChainProvider, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; @@ -31,12 +31,13 @@ pub struct L1Traversal { pub rollup_config: Arc, } -impl L1RetrievalProvider for L1Traversal { +#[async_trait] +impl L1RetrievalProvider for L1Traversal { fn batcher_addr(&self) -> Address { self.system_config.batcher_addr } - fn next_l1_block(&mut self) -> StageResult> { + async fn next_l1_block(&mut self) -> StageResult> { if !self.done { self.done = true; Ok(self.block) @@ -62,11 +63,14 @@ impl L1Traversal { pub fn data_source(&self) -> &F { &self.data_source } +} +#[async_trait] +impl OriginAdvancer for L1Traversal { /// Advances the internal state of the [L1Traversal] stage to the next L1 block. /// This function fetches the next L1 [BlockInfo] from the data source and updates the /// [SystemConfig] with the receipts from the block. - pub async fn advance_l1_block(&mut self) -> StageResult<()> { + async fn advance_origin(&mut self) -> StageResult<()> { // Pull the next block or return EOF. // StageError::EOF has special handling further up the pipeline. let block = match self.block { @@ -112,6 +116,12 @@ impl OriginProvider for L1Traversal { } } +impl PreviousStage for L1Traversal { + fn previous(&self) -> Option> { + None + } +} + #[async_trait] impl ResettableStage for L1Traversal { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { @@ -193,18 +203,18 @@ pub(crate) mod tests { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); - assert!(traversal.advance_l1_block().await.is_ok()); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); + assert!(traversal.advance_origin().await.is_ok()); } #[tokio::test] async fn test_l1_traversal_missing_receipts() { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let mut traversal = new_test_traversal(blocks, vec![]); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_)); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); + matches!(traversal.advance_origin().await.unwrap_err(), StageError::ReceiptFetch(_)); } #[tokio::test] @@ -214,17 +224,17 @@ pub(crate) mod tests { let blocks = vec![block, block]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert!(traversal.advance_l1_block().await.is_ok()); - let err = traversal.advance_l1_block().await.unwrap_err(); + assert!(traversal.advance_origin().await.is_ok()); + let err = traversal.advance_origin().await.unwrap_err(); assert_eq!(err, StageError::ReorgDetected(block.hash, block.parent_hash)); } #[tokio::test] async fn test_l1_traversal_missing_blocks() { let mut traversal = new_test_traversal(vec![], vec![]); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_)); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); + matches!(traversal.advance_origin().await.unwrap_err(), StageError::BlockInfoFetch(_)); } #[tokio::test] @@ -236,10 +246,10 @@ pub(crate) mod tests { let blocks = vec![block1, block2]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert!(traversal.advance_l1_block().await.is_ok()); + assert!(traversal.advance_origin().await.is_ok()); // Only the second block should fail since the second receipt // contains invalid logs that will error for a system config update. - let err = traversal.advance_l1_block().await.unwrap_err(); + let err = traversal.advance_origin().await.unwrap_err(); matches!(err, StageError::SystemConfigUpdate(_)); } @@ -248,9 +258,9 @@ pub(crate) mod tests { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); - assert!(traversal.advance_l1_block().await.is_ok()); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); + assert!(traversal.advance_origin().await.is_ok()); let expected = address!("000000000000000000000000000000000000bEEF"); assert_eq!(traversal.system_config.batcher_addr, expected); } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index a428572e7..2034cf5d7 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -33,7 +33,8 @@ pub use batch_queue::{BatchQueue, BatchQueueProvider}; mod attributes_queue; pub use attributes_queue::{ - AttributesBuilder, AttributesProvider, AttributesQueue, StatefulAttributesBuilder, + AttributesBuilder, AttributesProvider, AttributesQueue, NextAttributes, + StatefulAttributesBuilder, }; #[cfg(test)] diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs index a630b9123..1e689ecbf 100644 --- a/crates/derive/src/stages/test_utils/attributes_queue.rs +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -2,10 +2,10 @@ use crate::{ stages::attributes_queue::{AttributesBuilder, AttributesProvider}, - traits::OriginProvider, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ BlockID, BlockInfo, BuilderError, L2BlockInfo, L2PayloadAttributes, SingleBatch, - StageError, StageResult, + StageError, StageResult, SystemConfig, }, }; use alloc::{boxed::Box, vec::Vec}; @@ -49,6 +49,26 @@ impl OriginProvider for MockAttributesProvider { } } +#[async_trait] +impl OriginAdvancer for MockAttributesProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + +#[async_trait] +impl ResettableStage for MockAttributesProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockAttributesProvider { + fn previous(&self) -> Option> { + Some(Box::new(self)) + } +} + #[async_trait] impl AttributesProvider for MockAttributesProvider { async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index 0165b452e..896add5ee 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -2,8 +2,8 @@ use crate::{ stages::batch_queue::BatchQueueProvider, - traits::OriginProvider, - types::{Batch, BlockInfo, StageError, StageResult}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, + types::{Batch, BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; @@ -12,9 +12,9 @@ use async_trait::async_trait; #[derive(Debug, Default)] pub struct MockBatchQueueProvider { /// The origin of the L1 block. - origin: Option, + pub origin: Option, /// A list of batches to return. - batches: Vec>, + pub batches: Vec>, } impl MockBatchQueueProvider { @@ -36,3 +36,23 @@ impl BatchQueueProvider for MockBatchQueueProvider { self.batches.pop().ok_or(StageError::Eof)? } } + +#[async_trait] +impl OriginAdvancer for MockBatchQueueProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + +#[async_trait] +impl ResettableStage for MockBatchQueueProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockBatchQueueProvider { + fn previous(&self) -> Option> { + Some(Box::new(self)) + } +} diff --git a/crates/derive/src/stages/test_utils/channel_bank.rs b/crates/derive/src/stages/test_utils/channel_bank.rs index 62db2933a..9da6717ab 100644 --- a/crates/derive/src/stages/test_utils/channel_bank.rs +++ b/crates/derive/src/stages/test_utils/channel_bank.rs @@ -2,14 +2,14 @@ use crate::{ stages::ChannelBankProvider, - traits::OriginProvider, - types::{BlockInfo, Frame, StageError, StageResult}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; /// A mock [ChannelBankProvider] for testing the [ChannelBank] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockChannelBankProvider { /// The data to return. pub data: Vec>, @@ -30,9 +30,29 @@ impl OriginProvider for MockChannelBankProvider { } } +#[async_trait] +impl OriginAdvancer for MockChannelBankProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ChannelBankProvider for MockChannelBankProvider { async fn next_frame(&mut self) -> StageResult { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockChannelBankProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockChannelBankProvider { + fn previous(&self) -> Option> { + Some(Box::new(self)) + } +} diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs index 23cea6416..d43a4e830 100644 --- a/crates/derive/src/stages/test_utils/channel_reader.rs +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -2,15 +2,15 @@ use crate::{ stages::ChannelReaderProvider, - traits::OriginProvider, - types::{BlockInfo, StageError, StageResult}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; /// A mock [ChannelReaderProvider] for testing the [ChannelReader] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockChannelReaderProvider { /// The data to return. pub data: Vec>>, @@ -31,9 +31,29 @@ impl OriginProvider for MockChannelReaderProvider { } } +#[async_trait] +impl OriginAdvancer for MockChannelReaderProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ChannelReaderProvider for MockChannelReaderProvider { async fn next_data(&mut self) -> StageResult> { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockChannelReaderProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockChannelReaderProvider { + fn previous(&self) -> Option> { + Some(Box::new(self)) + } +} diff --git a/crates/derive/src/stages/test_utils/frame_queue.rs b/crates/derive/src/stages/test_utils/frame_queue.rs index a7cd4f8e5..8349cfbc9 100644 --- a/crates/derive/src/stages/test_utils/frame_queue.rs +++ b/crates/derive/src/stages/test_utils/frame_queue.rs @@ -2,15 +2,15 @@ use crate::{ stages::FrameQueueProvider, - traits::OriginProvider, - types::{BlockInfo, StageError, StageResult}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; /// A mock [FrameQueueProvider] for testing the [FrameQueue] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockFrameQueueProvider { /// The data to return. pub data: Vec>, @@ -29,6 +29,13 @@ impl OriginProvider for MockFrameQueueProvider { } } +#[async_trait] +impl OriginAdvancer for MockFrameQueueProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl FrameQueueProvider for MockFrameQueueProvider { type Item = Bytes; @@ -37,3 +44,16 @@ impl FrameQueueProvider for MockFrameQueueProvider { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockFrameQueueProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockFrameQueueProvider { + fn previous(&self) -> Option> { + Some(Box::new(self)) + } +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index ccd92b105..f3ff4f547 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -5,7 +5,7 @@ mod data_sources; pub use data_sources::*; mod stages; -pub use stages::{OriginProvider, ResettableStage}; +pub use stages::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}; mod ecrecover; pub use ecrecover::SignedRecoverable; diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index 7c4362654..3776f9a19 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -16,3 +16,17 @@ pub trait OriginProvider { /// Returns the optional L1 [BlockInfo] origin. fn origin(&self) -> Option<&BlockInfo>; } + +/// Defines a trait for advancing the L1 origin of the pipeline. +#[async_trait] +pub trait OriginAdvancer { + /// Advances the internal state of the lowest stage to the next l1 origin. + /// This method is the equivalent of the reference implementation `advance_l1_block`. + async fn advance_origin(&mut self) -> StageResult<()>; +} + +/// Provides a method for accessing a previous stage. +pub trait PreviousStage: ResettableStage + OriginAdvancer + OriginProvider { + /// Returns the previous stage. + fn previous(&self) -> Option>; +} diff --git a/crates/derive/src/types/batch/span_batch/batch.rs b/crates/derive/src/types/batch/span_batch/batch.rs index f68f3b322..0ae4bac48 100644 --- a/crates/derive/src/types/batch/span_batch/batch.rs +++ b/crates/derive/src/types/batch/span_batch/batch.rs @@ -138,8 +138,10 @@ impl SpanBatch { } if !self.check_parent_hash(parent_block.block_info.parent_hash) { warn!( - "parent block number mismatch, expected: {parent_num}, received: {}", - parent_block.block_info.number + "parent block number mismatch, expected: {parent_num}, received: {}, parent hash: {}, self hash: {}", + parent_block.block_info.number, + parent_block.block_info.parent_hash, + self.parent_check, ); return BatchValidity::Drop; } @@ -307,7 +309,10 @@ impl SpanBatch { } }; if safe_block_ref.l1_origin.number != self.batches[i as usize].epoch_num { - warn!("overlapped block's L1 origin number does not match"); + warn!( + "overlapped block's L1 origin number does not match {}, {}", + safe_block_ref.l1_origin.number, self.batches[i as usize].epoch_num + ); return BatchValidity::Drop; } } @@ -330,6 +335,13 @@ impl SpanBatch { if batch.timestamp <= l2_safe_head.block_info.timestamp { continue; } + tracing::info!( + "checking {} l1 origins with first timestamp: {}, batch timestamp: {}, {}", + l1_origins.len(), + l1_origins[0].timestamp, + batch.timestamp, + batch.epoch_num + ); let origin_epoch_hash = l1_origins[origin_index..l1_origins.len()] .iter() .enumerate() diff --git a/crates/derive/src/types/payload.rs b/crates/derive/src/types/payload.rs index 2dda40aed..3c5bea111 100644 --- a/crates/derive/src/types/payload.rs +++ b/crates/derive/src/types/payload.rs @@ -3,7 +3,7 @@ use alloc::vec::Vec; use alloy_primitives::{Address, Bloom, Bytes, B256, U256}; use anyhow::Result; -use op_alloy_consensus::OpTxEnvelope; +use op_alloy_consensus::{OpTxEnvelope, OpTxType}; /// Fixed and variable memory costs for a payload. /// ~1000 bytes per payload, with some margin for overhead like map data. @@ -135,7 +135,12 @@ impl L2ExecutionPayloadEnvelope { execution_payload.block_hash ); } - let tx = OpTxEnvelope::decode(&mut execution_payload.transactions[0].as_ref()) + + let ty = execution_payload.transactions[0][0]; + if ty != OpTxType::Deposit as u8 { + anyhow::bail!("First payload transaction has unexpected type: {:?}", ty); + } + let tx = OpTxEnvelope::decode(&mut execution_payload.transactions[0][1..].as_ref()) .map_err(|e| anyhow::anyhow!(e))?; let OpTxEnvelope::Deposit(tx) = tx else { @@ -175,7 +180,11 @@ impl L2ExecutionPayloadEnvelope { execution_payload.block_hash ); } - let tx = OpTxEnvelope::decode(&mut execution_payload.transactions[0].as_ref()) + let ty = execution_payload.transactions[0][0]; + if ty != OpTxType::Deposit as u8 { + anyhow::bail!("First payload transaction has unexpected type: {:?}", ty); + } + let tx = OpTxEnvelope::decode(&mut execution_payload.transactions[0][1..].as_ref()) .map_err(|e| anyhow::anyhow!(e))?; let OpTxEnvelope::Deposit(tx) = tx else {