From 8acfb8244f28b57646e6044958f9d2ce85b74d8a Mon Sep 17 00:00:00 2001 From: refcell Date: Sun, 10 Nov 2024 18:42:16 -0500 Subject: [PATCH] feat(driver): Abstract, Default Pipeline (#796) * feat: abstract pipeline * fixes --- bin/client/src/l1/pipeline.rs | 158 +++++++++++---------------- crates/derive/src/pipeline/core.rs | 18 ++- crates/derive/src/traits/pipeline.rs | 14 ++- crates/driver/src/core.rs | 60 +++++----- crates/driver/src/lib.rs | 2 +- crates/driver/src/pipeline.rs | 95 ++++++++++++++-- 6 files changed, 209 insertions(+), 138 deletions(-) diff --git a/bin/client/src/l1/pipeline.rs b/bin/client/src/l1/pipeline.rs index 36cbf02dc..f540905dc 100644 --- a/bin/client/src/l1/pipeline.rs +++ b/bin/client/src/l1/pipeline.rs @@ -6,25 +6,22 @@ use async_trait::async_trait; use core::fmt::Debug; use kona_derive::{ attributes::StatefulAttributesBuilder, - errors::{PipelineError, PipelineErrorKind, ResetError}, + errors::PipelineErrorKind, pipeline::{DerivationPipeline, PipelineBuilder}, sources::EthereumDataSource, stages::{ AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, - traits::{ - BlobProvider, ChainProvider, L2ChainProvider, OriginProvider, Pipeline, SignalReceiver, - }, - types::{ActivationSignal, ResetSignal, Signal, StepResult}, + traits::{BlobProvider, ChainProvider, OriginProvider, Pipeline, SignalReceiver}, + types::{PipelineResult, Signal, StepResult}, }; -use kona_driver::SyncCursor; +use kona_driver::{DriverPipeline, SyncCursor}; use kona_mpt::TrieProvider; use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType}; -use op_alloy_genesis::RollupConfig; +use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::{BatchValidationProvider, BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; -use tracing::{info, warn}; use crate::{ errors::OracleProviderError, l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, BootInfo, @@ -185,101 +182,78 @@ where } #[async_trait] -impl kona_driver::Pipeline for OraclePipeline +impl DriverPipeline> for OraclePipeline where O: CommsClient + FlushableCache + Send + Sync + Debug, B: BlobProvider + Send + Sync + Debug + Clone, { - /// Produces the disputed [OpAttributesWithParent] payload, directly after the starting L2 - /// output root passed through the [crate::BootInfo]. - async fn produce_payload( - &mut self, - l2_safe_head: L2BlockInfo, - ) -> Result { - // As we start the safe head at the disputed block's parent, we step the pipeline until the - // first attributes are produced. All batches at and before the safe head will be - // dropped, so the first payload will always be the disputed one. - loop { - match self.pipeline.step(l2_safe_head).await { - StepResult::PreparedAttributes => { - info!(target: "client_derivation_driver", "Stepped derivation pipeline") - } - StepResult::AdvancedOrigin => { - info!(target: "client_derivation_driver", "Advanced origin") - } - StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => { - warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e); + /// Flushes the cache on re-org. + fn flush(&self) { + self.caching_oracle.flush(); + } +} - // Break the loop unless the error signifies that there is not enough data to - // complete the current step. In this case, we retry the step to see if other - // stages can make progress. - match e { - PipelineErrorKind::Temporary(_) => continue, - PipelineErrorKind::Reset(e) => { - let system_config = self - .pipeline - .l2_chain_provider - .system_config_by_number( - l2_safe_head.block_info.number, - self.pipeline.rollup_config.clone(), - ) - .await?; +#[async_trait] +impl SignalReceiver for OraclePipeline +where + O: CommsClient + FlushableCache + Send + Sync + Debug, + B: BlobProvider + Send + Sync + Debug + Clone, +{ + /// Receives a signal from the driver. + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.pipeline.signal(signal).await + } +} - if matches!(e, ResetError::HoloceneActivation) { - self.pipeline - .signal( - ActivationSignal { - l2_safe_head, - l1_origin: self - .pipeline - .origin() - .ok_or(PipelineError::MissingOrigin.crit())?, - system_config: Some(system_config), - } - .signal(), - ) - .await?; - } else { - // Flush the caching oracle if a reorg is detected. - if matches!(e, ResetError::ReorgDetected(_, _)) { - self.caching_oracle.as_ref().flush(); - } +impl OriginProvider for OraclePipeline +where + O: CommsClient + FlushableCache + Send + Sync + Debug, + B: BlobProvider + Send + Sync + Debug + Clone, +{ + /// Returns the optional L1 [BlockInfo] origin. + fn origin(&self) -> Option { + self.pipeline.origin() + } +} - // Reset the pipeline to the initial L2 safe head and L1 origin, - // and try again. - self.pipeline - .signal( - ResetSignal { - l2_safe_head, - l1_origin: self - .pipeline - .origin() - .ok_or(PipelineError::MissingOrigin.crit())?, - system_config: Some(system_config), - } - .signal(), - ) - .await?; - } - } - PipelineErrorKind::Critical(_) => return Err(e), - } - } - } +impl Iterator for OraclePipeline +where + O: CommsClient + FlushableCache + Send + Sync + Debug, + B: BlobProvider + Send + Sync + Debug + Clone, +{ + type Item = OpAttributesWithParent; - if let Some(attrs) = self.pipeline.next() { - return Ok(attrs); - } - } + fn next(&mut self) -> Option { + self.pipeline.next() } +} - /// Signals the derivation pipeline. - async fn signal(&mut self, signal: Signal) -> Result<(), PipelineErrorKind> { - self.pipeline.signal(signal).await +#[async_trait] +impl Pipeline for OraclePipeline +where + O: CommsClient + FlushableCache + Send + Sync + Debug, + B: BlobProvider + Send + Sync + Debug + Clone, +{ + /// Peeks at the next [OpAttributesWithParent] from the pipeline. + fn peek(&self) -> Option<&OpAttributesWithParent> { + self.pipeline.peek() + } + + /// Attempts to progress the pipeline. + async fn step(&mut self, cursor: L2BlockInfo) -> StepResult { + self.pipeline.step(cursor).await + } + + /// Returns the rollup config. + fn rollup_config(&self) -> &RollupConfig { + self.pipeline.rollup_config() } - /// Returns the rollup configuration. - fn rollup_config(&self) -> Arc { - self.pipeline.rollup_config.clone() + /// Returns the [SystemConfig] by L2 number. + async fn system_config_by_number( + &mut self, + number: u64, + ) -> Result { + self.pipeline.system_config_by_number(number).await } } diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index d96da92b3..28094ef6a 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -10,7 +10,7 @@ use crate::{ use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_genesis::RollupConfig; +use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; use tracing::{error, trace, warn}; @@ -134,6 +134,22 @@ where self.prepared.front() } + /// Returns the rollup config. + fn rollup_config(&self) -> &RollupConfig { + &self.rollup_config + } + + /// Returns the [SystemConfig] by L2 number. + async fn system_config_by_number( + &mut self, + number: u64, + ) -> Result { + self.l2_chain_provider + .system_config_by_number(number, self.rollup_config.clone()) + .await + .map_err(Into::into) + } + /// Attempts to progress the pipeline. /// /// ## Returns diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index c5be6f52b..a07f601a4 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -1,13 +1,14 @@ //! Defines the interface for the core derivation pipeline. -use super::OriginProvider; -use crate::types::StepResult; use alloc::boxed::Box; use async_trait::async_trait; use core::iter::Iterator; +use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpAttributesWithParent; +use crate::{errors::PipelineErrorKind, traits::OriginProvider, types::StepResult}; + /// This trait defines the interface for interacting with the derivation pipeline. #[async_trait] pub trait Pipeline: OriginProvider + Iterator { @@ -16,4 +17,13 @@ pub trait Pipeline: OriginProvider + Iterator { /// Attempts to progress the pipeline. async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; + + /// Returns the rollup config. + fn rollup_config(&self) -> &RollupConfig; + + /// Returns the [SystemConfig] by L2 number. + async fn system_config_by_number( + &mut self, + number: u64, + ) -> Result; } diff --git a/crates/driver/src/core.rs b/crates/driver/src/core.rs index b05104d80..c19982b28 100644 --- a/crates/driver/src/core.rs +++ b/crates/driver/src/core.rs @@ -1,12 +1,13 @@ //! The driver of the Derivation Pipeline. use alloc::vec::Vec; -use alloy_consensus::{BlockBody, Header, Sealable, Sealed}; +use alloy_consensus::{BlockBody, Sealable}; use alloy_primitives::B256; use alloy_rlp::Decodable; use core::fmt::Debug; use kona_derive::{ errors::{PipelineError, PipelineErrorKind}, + traits::{Pipeline, SignalReceiver}, types::Signal, }; use op_alloy_consensus::{OpBlock, OpTxEnvelope, OpTxType}; @@ -15,50 +16,45 @@ use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpAttributesWithParent; use tracing::{error, info, warn}; -use crate::{DriverError, DriverResult, Executor, ExecutorConstructor, Pipeline, SyncCursor}; +use crate::{DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, SyncCursor}; /// The Rollup Driver entrypoint. #[derive(Debug)] -pub struct Driver +pub struct Driver where E: Executor + Send + Sync + Debug, EC: ExecutorConstructor + Send + Sync + Debug, - P: Pipeline + Send + Sync + Debug, + DP: DriverPipeline

+ Send + Sync + Debug, + P: Pipeline + SignalReceiver + Send + Sync + Debug, { /// Marker for the executor. _marker: core::marker::PhantomData, + /// Marker for the pipeline. + _marker2: core::marker::PhantomData

, /// A pipeline abstraction. - pipeline: P, + pipeline: DP, /// Cursor to keep track of the L2 tip cursor: SyncCursor, /// Executor constructor. executor: EC, } -impl Driver +impl Driver where E: Executor + Send + Sync + Debug, EC: ExecutorConstructor + Send + Sync + Debug, - P: Pipeline + Send + Sync + Debug, + DP: DriverPipeline

+ Send + Sync + Debug, + P: Pipeline + SignalReceiver + Send + Sync + Debug, { /// Creates a new [Driver]. - pub const fn new(cursor: SyncCursor, executor: EC, pipeline: P) -> Self { - Self { _marker: core::marker::PhantomData, cursor, executor, pipeline } - } - - /// Returns the current L2 safe head. - pub const fn l2_safe_head(&self) -> &L2BlockInfo { - self.cursor.l2_safe_head() - } - - /// Returns the header of the L2 safe head. - pub const fn l2_safe_head_header(&self) -> &Sealed

{ - self.cursor.l2_safe_head_header() - } - - /// Returns the output root of the L2 safe head. - pub const fn l2_safe_head_output_root(&self) -> &B256 { - self.cursor.l2_safe_head_output_root() + pub const fn new(cursor: SyncCursor, executor: EC, pipeline: DP) -> Self { + Self { + _marker: core::marker::PhantomData, + _marker2: core::marker::PhantomData, + pipeline, + cursor, + executor, + } } /// Advances the derivation pipeline to the target block number. @@ -78,17 +74,17 @@ where ) -> DriverResult<(u64, B256), E::Error> { loop { // Check if we have reached the target block number. - if self.l2_safe_head().block_info.number >= target { + if self.cursor.l2_safe_head().block_info.number >= target { info!(target: "client", "Derivation complete, reached L2 safe head."); return Ok(( - self.l2_safe_head().block_info.number, - *self.l2_safe_head_output_root(), + self.cursor.l2_safe_head().block_info.number, + *self.cursor.l2_safe_head_output_root(), )); } let OpAttributesWithParent { mut attributes, .. } = match self .pipeline - .produce_payload(*self.l2_safe_head()) + .produce_payload(*self.cursor.l2_safe_head()) .await { Ok(attrs) => attrs, @@ -97,7 +93,7 @@ where // Adjust the target block number to the current safe head, as no more blocks // can be produced. - target = self.l2_safe_head().block_info.number; + target = self.cursor.l2_safe_head().block_info.number; continue; } Err(e) => { @@ -106,7 +102,8 @@ where } }; - let mut executor = self.executor.new_executor(self.l2_safe_head_header().clone()); + let mut executor = + self.executor.new_executor(self.cursor.l2_safe_head_header().clone()); let header = match executor.execute_payload(attributes.clone()) { Ok(header) => header, Err(e) => { @@ -130,7 +127,8 @@ where }); // Retry the execution. - executor = self.executor.new_executor(self.l2_safe_head_header().clone()); + executor = + self.executor.new_executor(self.cursor.l2_safe_head_header().clone()); match executor.execute_payload(attributes.clone()) { Ok(header) => header, Err(e) => { diff --git a/crates/driver/src/lib.rs b/crates/driver/src/lib.rs index 68124bc05..0731f63aa 100644 --- a/crates/driver/src/lib.rs +++ b/crates/driver/src/lib.rs @@ -13,7 +13,7 @@ mod errors; pub use errors::{DriverError, DriverResult}; mod pipeline; -pub use pipeline::Pipeline; +pub use pipeline::DriverPipeline; mod executor; pub use executor::{Executor, ExecutorConstructor}; diff --git a/crates/driver/src/pipeline.rs b/crates/driver/src/pipeline.rs index dba137db4..523f8057f 100644 --- a/crates/driver/src/pipeline.rs +++ b/crates/driver/src/pipeline.rs @@ -1,26 +1,99 @@ //! Abstracts the derivation pipeline from the driver. -use alloc::{boxed::Box, sync::Arc}; +use alloc::boxed::Box; use async_trait::async_trait; -use kona_derive::{errors::PipelineErrorKind, types::Signal}; -use op_alloy_genesis::RollupConfig; use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpAttributesWithParent; -/// Pipeline +use kona_derive::{ + errors::{PipelineError, PipelineErrorKind, ResetError}, + traits::{Pipeline, SignalReceiver}, + types::{ActivationSignal, ResetSignal, StepResult}, +}; +use tracing::{info, warn}; + +/// The Driver's Pipeline /// /// A high-level abstraction for the driver's derivation pipeline. #[async_trait] -pub trait Pipeline { - /// Advance the pipeline to the target block. +pub trait DriverPipeline

: Pipeline + SignalReceiver +where + P: Pipeline + SignalReceiver, +{ + /// Flushes any cache on re-org. + fn flush(&self); + + /// Produces the disputed [OpAttributesWithParent] payload, directly after the given + /// starting l2 safe head. async fn produce_payload( &mut self, l2_safe_head: L2BlockInfo, - ) -> Result; + ) -> Result { + // As we start the safe head at the disputed block's parent, we step the pipeline until the + // first attributes are produced. All batches at and before the safe head will be + // dropped, so the first payload will always be the disputed one. + loop { + match self.step(l2_safe_head).await { + StepResult::PreparedAttributes => { + info!(target: "client_derivation_driver", "Stepped derivation pipeline") + } + StepResult::AdvancedOrigin => { + info!(target: "client_derivation_driver", "Advanced origin") + } + StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => { + warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e); + + // Break the loop unless the error signifies that there is not enough data to + // complete the current step. In this case, we retry the step to see if other + // stages can make progress. + match e { + PipelineErrorKind::Temporary(_) => continue, + PipelineErrorKind::Reset(e) => { + let system_config = self + .system_config_by_number(l2_safe_head.block_info.number) + .await?; + + if matches!(e, ResetError::HoloceneActivation) { + let l1_origin = + self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + self.signal( + ActivationSignal { + l2_safe_head, + l1_origin, + system_config: Some(system_config), + } + .signal(), + ) + .await?; + } else { + // Flushes cache if a reorg is detected. + if matches!(e, ResetError::ReorgDetected(_, _)) { + self.flush(); + } - /// Signal the pipeline. - async fn signal(&mut self, signal: Signal) -> Result<(), PipelineErrorKind>; + // Reset the pipeline to the initial L2 safe head and L1 origin, + // and try again. + let l1_origin = + self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + self.signal( + ResetSignal { + l2_safe_head, + l1_origin, + system_config: Some(system_config), + } + .signal(), + ) + .await?; + } + } + PipelineErrorKind::Critical(_) => return Err(e), + } + } + } - /// Returns the Pipeline's rollup config. - fn rollup_config(&self) -> Arc; + if let Some(attrs) = self.next() { + return Ok(attrs); + } + } + } }