From 2e943f4dd475f513b95f052368a99e9ef2e8410b Mon Sep 17 00:00:00 2001 From: refcell Date: Sun, 10 Nov 2024 20:32:35 -0500 Subject: [PATCH] feat(driver,client): Pipeline Cursor Refactor (#798) * fix(driver,client): pipeline cursor * trigger ci --- bin/client/src/kona.rs | 8 +-- bin/client/src/l1/pipeline.rs | 10 ++- bin/client/src/sync.rs | 107 +++++++++++++----------------- crates/driver/src/core.rs | 25 ++++--- crates/driver/src/cursor.rs | 121 +++++++++++++++++++++++++++------- crates/driver/src/lib.rs | 5 +- crates/driver/src/tip.rs | 42 ++++++++++++ 7 files changed, 211 insertions(+), 107 deletions(-) create mode 100644 crates/driver/src/tip.rs diff --git a/bin/client/src/kona.rs b/bin/client/src/kona.rs index 958dd71d2..434ca1794 100644 --- a/bin/client/src/kona.rs +++ b/bin/client/src/kona.rs @@ -12,7 +12,7 @@ use kona_client::{ executor::KonaExecutorConstructor, l1::{OracleBlobProvider, OracleL1ChainProvider, OraclePipeline}, l2::OracleL2ChainProvider, - sync::SyncStart, + sync::new_pipeline_cursor, BootInfo, CachingOracle, }; use kona_common::io; @@ -72,7 +72,7 @@ fn main() -> Result<(), String> { // Create a new derivation driver with the given boot information and oracle. - let Ok(sync_start) = SyncStart::from( + let Ok(cursor) = new_pipeline_cursor( oracle.clone(), &boot, &mut l1_provider.clone(), @@ -87,7 +87,7 @@ fn main() -> Result<(), String> { let cfg = Arc::new(boot.rollup_config.clone()); let pipeline = OraclePipeline::new( cfg.clone(), - sync_start.clone(), + cursor.clone(), oracle.clone(), beacon, l1_provider.clone(), @@ -99,7 +99,7 @@ fn main() -> Result<(), String> { l2_provider, fpvm_handle_register, ); - let mut driver = Driver::new(sync_start.cursor, executor, pipeline); + let mut driver = Driver::new(cursor, executor, pipeline); // Run the derivation pipeline until we are able to produce the output root of the claimed // L2 block. diff --git a/bin/client/src/l1/pipeline.rs b/bin/client/src/l1/pipeline.rs index e63accd4f..2c60f53df 100644 --- a/bin/client/src/l1/pipeline.rs +++ b/bin/client/src/l1/pipeline.rs @@ -15,15 +15,13 @@ use kona_derive::{ traits::{BlobProvider, OriginProvider, Pipeline, SignalReceiver}, types::{PipelineResult, Signal, StepResult}, }; -use kona_driver::DriverPipeline; +use kona_driver::{DriverPipeline, PipelineCursor}; use kona_preimage::CommsClient; use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use op_alloy_rpc_types_engine::OpAttributesWithParent; -use crate::{ - l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, sync::SyncStart, FlushableCache, -}; +use crate::{l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, FlushableCache}; /// An oracle-backed derivation pipeline. pub type OracleDerivationPipeline = DerivationPipeline< @@ -76,7 +74,7 @@ where /// Constructs a new oracle-backed derivation pipeline. pub fn new( cfg: Arc, - sync_start: SyncStart, + sync_start: PipelineCursor, caching_oracle: Arc, blob_provider: B, chain_provider: OracleL1ChainProvider, @@ -95,7 +93,7 @@ where .l2_chain_provider(l2_chain_provider) .chain_provider(chain_provider) .builder(attributes) - .origin(sync_start.origin) + .origin(sync_start.origin()) .build(); Self { pipeline, caching_oracle } } diff --git a/bin/client/src/sync.rs b/bin/client/src/sync.rs index ced19881c..bd0cb749f 100644 --- a/bin/client/src/sync.rs +++ b/bin/client/src/sync.rs @@ -8,72 +8,55 @@ use alloc::sync::Arc; use alloy_consensus::Sealed; use core::fmt::Debug; use kona_derive::traits::ChainProvider; -use kona_driver::SyncCursor; +use kona_driver::{PipelineCursor, TipCursor}; use kona_mpt::TrieProvider; use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType}; -use op_alloy_protocol::{BatchValidationProvider, BlockInfo}; +use op_alloy_protocol::BatchValidationProvider; -/// Sync Start -#[derive(Debug, Clone)] -pub struct SyncStart { - /// The l1 origin block info. - pub origin: BlockInfo, - /// The sync cursor used for the derivation driver. - pub cursor: SyncCursor, -} - -impl SyncStart { - /// Constructs the [`SyncStart`] from the caching oracle, boot info, and providers. - pub async fn from( - caching_oracle: Arc, - boot_info: &BootInfo, - chain_provider: &mut OracleL1ChainProvider, - l2_chain_provider: &mut OracleL2ChainProvider, - ) -> Result - where - O: CommsClient + FlushableCache + FlushableCache + Send + Sync + Debug, - { - // Find the initial safe head, based off of the starting L2 block number in the boot info. - caching_oracle - .write( - &HintType::StartingL2Output - .encode_with(&[boot_info.agreed_l2_output_root.as_ref()]), - ) - .await - .map_err(OracleProviderError::Preimage)?; - let mut output_preimage = [0u8; 128]; - caching_oracle - .get_exact( - PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256), - &mut output_preimage, - ) - .await - .map_err(OracleProviderError::Preimage)?; - - let safe_hash = - output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?; - let safe_header = l2_chain_provider.header_by_hash(safe_hash)?; - let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?; - let l1_origin = - chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?; +/// Constructs a [`PipelineCursor`] from the caching oracle, boot info, and providers. +pub async fn new_pipeline_cursor( + caching_oracle: Arc, + boot_info: &BootInfo, + chain_provider: &mut OracleL1ChainProvider, + l2_chain_provider: &mut OracleL2ChainProvider, +) -> Result +where + O: CommsClient + FlushableCache + FlushableCache + Send + Sync + Debug, +{ + // Find the initial safe head, based off of the starting L2 block number in the boot info. + caching_oracle + .write(&HintType::StartingL2Output.encode_with(&[boot_info.agreed_l2_output_root.as_ref()])) + .await + .map_err(OracleProviderError::Preimage)?; + let mut output_preimage = [0u8; 128]; + caching_oracle + .get_exact( + PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256), + &mut output_preimage, + ) + .await + .map_err(OracleProviderError::Preimage)?; - // Construct the sync cursor for the pipeline driver. - let cursor = SyncCursor::new( - safe_head_info, - Sealed::new_unchecked(safe_header, safe_hash), - boot_info.agreed_l2_output_root, - ); + let safe_hash = + output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?; + let safe_header = l2_chain_provider.header_by_hash(safe_hash)?; + let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?; + let l1_origin = chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?; - // Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is - // captured. - let channel_timeout = - boot_info.rollup_config.channel_timeout(safe_head_info.block_info.timestamp); - let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout); - if l1_origin_number < boot_info.rollup_config.genesis.l1.number { - l1_origin_number = boot_info.rollup_config.genesis.l1.number; - } - let origin = chain_provider.block_info_by_number(l1_origin_number).await?; - - Ok(Self { origin, cursor }) + // Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is + // captured. + let channel_timeout = + boot_info.rollup_config.channel_timeout(safe_head_info.block_info.timestamp); + let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout); + if l1_origin_number < boot_info.rollup_config.genesis.l1.number { + l1_origin_number = boot_info.rollup_config.genesis.l1.number; } + let origin = chain_provider.block_info_by_number(l1_origin_number).await?; + + // Construct the cursor. + let safe_header = Sealed::new_unchecked(safe_header, safe_hash); + let mut cursor = PipelineCursor::new(channel_timeout, origin); + let tip = TipCursor::new(safe_head_info, safe_header, boot_info.agreed_l2_output_root); + cursor.advance(origin, tip); + Ok(cursor) } diff --git a/crates/driver/src/core.rs b/crates/driver/src/core.rs index c19982b28..329e8f493 100644 --- a/crates/driver/src/core.rs +++ b/crates/driver/src/core.rs @@ -1,4 +1,4 @@ -//! The driver of the Derivation Pipeline. +//! The driver of the kona derivation pipeline. use alloc::vec::Vec; use alloy_consensus::{BlockBody, Sealable}; @@ -16,7 +16,10 @@ use op_alloy_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpAttributesWithParent; use tracing::{error, info, warn}; -use crate::{DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, SyncCursor}; +use crate::{ + DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, PipelineCursor, + TipCursor, +}; /// The Rollup Driver entrypoint. #[derive(Debug)] @@ -34,7 +37,7 @@ where /// A pipeline abstraction. pipeline: DP, /// Cursor to keep track of the L2 tip - cursor: SyncCursor, + cursor: PipelineCursor, /// Executor constructor. executor: EC, } @@ -47,7 +50,7 @@ where P: Pipeline + SignalReceiver + Send + Sync + Debug, { /// Creates a new [Driver]. - pub const fn new(cursor: SyncCursor, executor: EC, pipeline: DP) -> Self { + pub const fn new(cursor: PipelineCursor, executor: EC, pipeline: DP) -> Self { Self { _marker: core::marker::PhantomData, _marker2: core::marker::PhantomData, @@ -161,14 +164,18 @@ where }, }; - // Update the safe head. - self.cursor.l2_safe_head = L2BlockInfo::from_block_and_genesis( + // Get the pipeline origin and update the cursor. + let origin = self.pipeline.origin().ok_or(PipelineError::MissingOrigin.crit())?; + let l2_info = L2BlockInfo::from_block_and_genesis( &block, &self.pipeline.rollup_config().genesis, )?; - self.cursor.l2_safe_head_header = header.clone().seal_slow(); - self.cursor.l2_safe_head_output_root = - executor.compute_output_root().map_err(DriverError::Executor)?; + let cursor = TipCursor::new( + l2_info, + header.clone().seal_slow(), + executor.compute_output_root().map_err(DriverError::Executor)?, + ); + self.cursor.advance(origin, cursor); } } } diff --git a/crates/driver/src/cursor.rs b/crates/driver/src/cursor.rs index e3b7fa549..16c0b2f64 100644 --- a/crates/driver/src/cursor.rs +++ b/crates/driver/src/cursor.rs @@ -1,42 +1,113 @@ -//! Contains the cursor for the derivation driver. +//! Contains the cursor for the derivation pipeline. +use crate::TipCursor; +use alloc::collections::{btree_map::BTreeMap, vec_deque::VecDeque}; use alloy_consensus::{Header, Sealed}; -use alloy_primitives::B256; -use op_alloy_protocol::L2BlockInfo; +use alloy_primitives::{map::HashMap, B256}; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; -/// A cursor that keeps track of the L2 tip block. +/// A cursor that tracks the pipeline tip. #[derive(Debug, Clone)] -pub struct SyncCursor { - /// The current L2 safe head. - pub l2_safe_head: L2BlockInfo, - /// The header of the L2 safe head. - pub l2_safe_head_header: Sealed
, - /// The output root of the L2 safe head. - pub l2_safe_head_output_root: B256, +pub struct PipelineCursor { + /// The block cache capacity before evicting old entries + /// (to avoid unbounded memory growth) + capacity: usize, + /// The channel timeout used to create the cursor. + channel_timeout: u64, + /// The l1 Origin of the pipeline. + origin: BlockInfo, + /// The L1 origin block numbers for which we have an L2 block in the cache. + /// Used to keep track of the order of insertion and evict the oldest entry. + origins: VecDeque, + /// The L1 origin block info for which we have an L2 block in the cache. + origin_infos: HashMap, + /// A map from the l1 origin block number to its L2 tip. + tips: BTreeMap, } -impl SyncCursor { - /// Instantiates a new `SyncCursor`. - pub const fn new( - l2_safe_head: L2BlockInfo, - l2_safe_head_header: Sealed
, - l2_safe_head_output_root: B256, - ) -> Self { - Self { l2_safe_head, l2_safe_head_header, l2_safe_head_output_root } +impl PipelineCursor { + /// Create a new cursor with the default cache capacity + pub fn new(channel_timeout: u64, origin: BlockInfo) -> Self { + // NOTE: capacity must be greater than the `channel_timeout` to allow + // for derivation to proceed through a deep reorg. + // Ref: + let capacity = channel_timeout as usize + 5; + + let mut origins = VecDeque::with_capacity(capacity); + origins.push_back(origin.number); + let mut origin_infos = HashMap::default(); + origin_infos.insert(origin.number, origin); + Self { capacity, channel_timeout, origin, origins, origin_infos, tips: Default::default() } + } + + /// Returns the current origin of the pipeline. + pub const fn origin(&self) -> BlockInfo { + self.origin } /// Returns the current L2 safe head. - pub const fn l2_safe_head(&self) -> &L2BlockInfo { - &self.l2_safe_head + pub fn l2_safe_head(&self) -> &L2BlockInfo { + &self.tip().l2_safe_head } /// Returns the header of the L2 safe head. - pub const fn l2_safe_head_header(&self) -> &Sealed
{ - &self.l2_safe_head_header + pub fn l2_safe_head_header(&self) -> &Sealed
{ + &self.tip().l2_safe_head_header } /// Returns the output root of the L2 safe head. - pub const fn l2_safe_head_output_root(&self) -> &B256 { - &self.l2_safe_head_output_root + pub fn l2_safe_head_output_root(&self) -> &B256 { + &self.tip().l2_safe_head_output_root + } + + /// Get the current L2 tip + pub fn tip(&self) -> &TipCursor { + if let Some((_, l2_tip)) = self.tips.last_key_value() { + l2_tip + } else { + unreachable!("cursor must be initialized with one block before advancing") + } + } + + /// Advance the cursor to the provided L2 block, given the corresponding L1 origin block. + /// + /// If the cache is full, the oldest entry is evicted. + pub fn advance(&mut self, origin: BlockInfo, l2_tip_block: TipCursor) { + if self.tips.len() >= self.capacity { + let key = self.origins.pop_front().unwrap(); + self.tips.remove(&key); + } + + self.origin = origin; + self.origins.push_back(origin.number); + self.origin_infos.insert(origin.number, origin); + self.tips.insert(origin.number, l2_tip_block); + } + + /// When the L1 undergoes a reorg, we need to reset the cursor to the fork block minus + /// the channel timeout, because an L2 block might have started to be derived at the + /// beginning of the channel. + /// + /// Returns the (L2 block info, L1 origin block info) tuple for the new cursor state. + pub fn reset(&mut self, fork_block: u64) -> (TipCursor, BlockInfo) { + let channel_start = fork_block - self.channel_timeout; + + match self.tips.get(&channel_start) { + Some(l2_safe_tip) => { + // The channel start block is in the cache, we can use it to reset the cursor. + (l2_safe_tip.clone(), self.origin_infos[&channel_start]) + } + None => { + // If the channel start block is not in the cache, we reset the cursor + // to the closest known L1 block for which we have a corresponding L2 block. + let (last_l1_known_tip, l2_known_tip) = self + .tips + .range(..=channel_start) + .next_back() + .expect("walked back to genesis without finding anchor origin block"); + + (l2_known_tip.clone(), self.origin_infos[last_l1_known_tip]) + } + } } } diff --git a/crates/driver/src/lib.rs b/crates/driver/src/lib.rs index 0731f63aa..fe3bcf6de 100644 --- a/crates/driver/src/lib.rs +++ b/crates/driver/src/lib.rs @@ -22,4 +22,7 @@ mod core; pub use core::Driver; mod cursor; -pub use cursor::SyncCursor; +pub use cursor::PipelineCursor; + +mod tip; +pub use tip::TipCursor; diff --git a/crates/driver/src/tip.rs b/crates/driver/src/tip.rs new file mode 100644 index 000000000..2d0b31cbc --- /dev/null +++ b/crates/driver/src/tip.rs @@ -0,0 +1,42 @@ +//! Contains the tip for the derivation driver. + +use alloy_consensus::{Header, Sealed}; +use alloy_primitives::B256; +use op_alloy_protocol::L2BlockInfo; + +/// A cursor that keeps track of the L2 tip block. +#[derive(Debug, Clone)] +pub struct TipCursor { + /// The current L2 safe head. + pub l2_safe_head: L2BlockInfo, + /// The header of the L2 safe head. + pub l2_safe_head_header: Sealed
, + /// The output root of the L2 safe head. + pub l2_safe_head_output_root: B256, +} + +impl TipCursor { + /// Instantiates a new `SyncCursor`. + pub const fn new( + l2_safe_head: L2BlockInfo, + l2_safe_head_header: Sealed
, + l2_safe_head_output_root: B256, + ) -> Self { + Self { l2_safe_head, l2_safe_head_header, l2_safe_head_output_root } + } + + /// Returns the current L2 safe head. + pub const fn l2_safe_head(&self) -> &L2BlockInfo { + &self.l2_safe_head + } + + /// Returns the header of the L2 safe head. + pub const fn l2_safe_head_header(&self) -> &Sealed
{ + &self.l2_safe_head_header + } + + /// Returns the output root of the L2 safe head. + pub const fn l2_safe_head_output_root(&self) -> &B256 { + &self.l2_safe_head_output_root + } +}