From d373c1767454ea28b29a9d7cc41a0c1da12a52f9 Mon Sep 17 00:00:00 2001 From: clabby Date: Sat, 24 Feb 2024 22:31:36 -0700 Subject: [PATCH] init channel bank --- Cargo.lock | 49 ++++++++ README.md | 1 + crates/derive/Cargo.toml | 1 + crates/derive/README.md | 3 + crates/derive/src/params.rs | 2 +- crates/derive/src/stages/channel_bank.rs | 48 ++++++++ crates/derive/src/stages/frame_queue.rs | 13 +-- crates/derive/src/stages/l1_retrieval.rs | 14 +-- crates/derive/src/traits/data_sources.rs | 6 +- crates/derive/src/types/channel.rs | 140 +++++++++++++++++++++++ crates/derive/src/types/frame.rs | 9 +- crates/derive/src/types/mod.rs | 3 + 12 files changed, 268 insertions(+), 21 deletions(-) create mode 100644 crates/derive/src/types/channel.rs diff --git a/Cargo.lock b/Cargo.lock index 4a39c2c6e..6e3e2fced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,24 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "alloy-primitives" version = "0.6.3" @@ -242,6 +260,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -304,6 +326,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "async-trait", + "hashbrown", "serde", ] @@ -410,6 +433,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + [[package]] name = "parking_lot" version = "0.12.1" @@ -912,6 +941,26 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/README.md b/README.md index cb63b3ef4..1193c1c07 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ verify an [L2 output root][g-output-root] from the L1 inputs it was [derived fro - [`common`](./crates/common): A suite of utilities for developing `client` programs to be ran on top of Fault Proof VMs. - [`preimage`](./crates/preimage): High level interfaces to the [`PreimageOracle`][fpp-specs] ABI +- [`derive`](./crates/derive): `no_std` compatible implementation of the [derivation pipeline][g-derivation-pipeline]. ## Book diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index d628856c7..3bb3abf75 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -17,6 +17,7 @@ alloy-primitives = { version = "0.6.3", default-features = false, features = ["r alloy-rlp = { version = "0.3.4", default-features = false, features = ["derive"] } alloy-sol-types = { version = "0.6.3", default-features = false } async-trait = "0.1.77" +hashbrown = "0.14.3" # Optional serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true } diff --git a/crates/derive/README.md b/crates/derive/README.md index 33dc82728..77430eeb7 100644 --- a/crates/derive/README.md +++ b/crates/derive/README.md @@ -1,4 +1,7 @@ # `kona-derive` +> [!WARNING] +> WIP + A `no_std` compatible implementation of the OP Stack's [derivation pipeline](https://specs.optimism.io/protocol/derivation.html#l2-chain-derivation-specification). diff --git a/crates/derive/src/params.rs b/crates/derive/src/params.rs index ffb47801b..21281e820 100644 --- a/crates/derive/src/params.rs +++ b/crates/derive/src/params.rs @@ -1,7 +1,7 @@ //! This module contains the parameters and identifying types for the derivation pipeline. /// Count the tagging info as 200 in terms of buffer size. -pub const FRAME_OVERHEAD: u64 = 200; +pub const FRAME_OVERHEAD: usize = 200; /// The version of the derivation pipeline. pub const DERIVATION_VERSION_0: u8 = 0; diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 8b1378917..0fe5e7e2f 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -1 +1,49 @@ +//! This module contains the `ChannelBank` struct. +use alloc::collections::VecDeque; +use alloy_primitives::Bytes; +use hashbrown::HashMap; + +use crate::{ + params::ChannelID, + traits::{ChainProvider, DataAvailabilityProvider}, + types::{Channel, RollupConfig}, +}; + +use super::l1_retrieval::L1Retrieval; + +/// [ChannelBank] is a stateful stage that does the following: +/// 1. Unmarshalls frames from L1 transaction data +/// 2. Applies those frames to a channel +/// 3. Attempts to read from the channel when it is ready +/// 4. Prunes channels (not frames) when the channel bank is too large. +/// +/// Note: we prune before we ingest data. +/// As we switch between ingesting data & reading, the prune step occurs at an odd point +/// Specifically, the channel bank is not allowed to become too large between successive calls +/// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large. +/// [ChannelBank] buffers channel frames, and emits full channel data +pub struct ChannelBank +where + DAP: DataAvailabilityProvider, + CP: ChainProvider, +{ + /// The rollup configuration. + cfg: RollupConfig, + /// Map of channels by ID. + channels: HashMap, + /// Channels in FIFO order. + channel_queue: VecDeque, + /// The previous stage of the derivation pipeline. + prev: L1Retrieval, + /// Chain provider. + chain_provider: CP, +} + +impl ChannelBank +where + DAP: DataAvailabilityProvider, + CP: ChainProvider, +{ + // TODO +} diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index be8e4b769..351d07d74 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -10,26 +10,24 @@ use alloy_primitives::Bytes; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; -pub struct FrameQueue +pub struct FrameQueue where - T: Into, DAP: DataAvailabilityProvider, CP: ChainProvider, { /// The previous stage in the pipeline. - pub prev: L1Retrieval, + pub prev: L1Retrieval, /// The current frame queue. queue: VecDeque, } -impl FrameQueue +impl FrameQueue where - T: Into, DAP: DataAvailabilityProvider, CP: ChainProvider, { /// Create a new frame queue stage. - pub fn new(prev: L1Retrieval) -> Self { + pub fn new(prev: L1Retrieval) -> Self { Self { prev, queue: VecDeque::new(), @@ -67,9 +65,8 @@ where } #[async_trait] -impl ResettableStage for FrameQueue +impl ResettableStage for FrameQueue where - T: Into, DAP: DataAvailabilityProvider + Send, CP: ChainProvider + Send, { diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index c070cde6c..b0b85c030 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -5,14 +5,14 @@ use crate::{ traits::{ChainProvider, DataAvailabilityProvider, DataIter, ResettableStage}, types::{BlockInfo, SystemConfig}, }; -use alloc::boxed::Box; +use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use anyhow::{anyhow, Result}; use async_trait::async_trait; /// The L1 retrieval stage of the derivation pipeline. #[derive(Debug)] -pub struct L1Retrieval +pub struct L1Retrieval where DAP: DataAvailabilityProvider, CP: ChainProvider, @@ -22,12 +22,11 @@ where /// The data availability provider to use for the L1 retrieval stage. pub provider: DAP, /// The current data iterator. - data: Option>, + data: Option>, } -impl L1Retrieval +impl L1Retrieval where - T: Into, DAP: DataAvailabilityProvider, CP: ChainProvider, { @@ -66,14 +65,13 @@ where self.data = None; anyhow!("No more data to retrieve") })?; - Ok(data.into()) + Ok(data) } } #[async_trait] -impl ResettableStage for L1Retrieval +impl ResettableStage for L1Retrieval where - T: Into, DAP: DataAvailabilityProvider + Send, CP: ChainProvider + Send, { diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index a871d7cd2..e329f07a1 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -3,7 +3,7 @@ // use alloy_rpc_types::Block; use crate::types::{BlockInfo, Receipt}; use alloc::{boxed::Box, vec::Vec}; -use alloy_primitives::{Address, B256}; +use alloy_primitives::{Address, Bytes, B256}; use anyhow::Result; use async_trait::async_trait; @@ -22,11 +22,11 @@ pub trait ChainProvider { #[async_trait] pub trait DataAvailabilityProvider { /// A data iterator for the data source to return. - type DataIter: DataIter + Send; + type DataIter>: DataIter + Send; /// Returns the data availability for the block with the given hash, or an error if the block does not exist in the /// data source. - async fn open_data( + async fn open_data>( &self, block_ref: &BlockInfo, batcher_address: Address, diff --git a/crates/derive/src/types/channel.rs b/crates/derive/src/types/channel.rs new file mode 100644 index 000000000..d6499baca --- /dev/null +++ b/crates/derive/src/types/channel.rs @@ -0,0 +1,140 @@ +//! This module contains the [Channel] struct. + +use crate::{ + params::ChannelID, + types::{BlockInfo, Frame}, +}; +use anyhow::{bail, Result}; +use hashbrown::HashMap; + +/// A Channel is a set of batches that are split into at least one, but possibly multiple frames. +/// Frames are allowed to be ingested out of order. +/// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the +/// channel may mark itself as ready for reading once all intervening frames have been added +#[derive(Debug, Clone, Default)] +pub struct Channel { + /// The unique identifier for this channel + id: ChannelID, + /// The block that the channel is currently open at + open_block: BlockInfo, + /// Estimated memory size, used to drop the channel if we have too much data + estimated_size: usize, + /// True if the last frame has been buffered + closed: bool, + /// The highest frame number that has been ingested + highest_frame_number: u16, + /// The frame number of the frame where `is_last` is true + /// No other frame number may be higher than this + last_frame_number: u16, + /// Store a map of frame number to frame for constant time ordering + inputs: HashMap, + /// The highest L1 inclusion block that a frame was included in + highest_l1_inclusion_block: BlockInfo, +} + +impl Channel { + /// Create a new [Channel] with the given [ChannelID] and [BlockInfo]. + pub fn new(id: ChannelID, open_block: BlockInfo) -> Self { + Self { + id, + open_block, + inputs: HashMap::new(), + ..Default::default() + } + } + + /// Add a frame to the channel. + /// + /// ## Takes + /// - `frame`: The frame to add to the channel + /// - `l1_inclusion_block`: The block that the frame was included in + /// + /// ## Returns + /// - `Ok(()):` If the frame was successfully buffered + /// - `Err(_):` If the frame was invalid + pub fn add_frame(&mut self, frame: Frame, l1_inclusion_block: BlockInfo) -> Result<()> { + // Ensure that the frame ID is equal to the channel ID. + if frame.id != self.id { + bail!("Frame ID does not match channel ID"); + } + if frame.is_last && self.closed { + bail!( + "Cannot add ending frame to a closed channel. Channel ID: {:?}", + self.id + ); + } + if !self.inputs.contains_key(&frame.number) { + bail!( + "Frame number already exists in channel. Channel ID: {:?}", + self.id + ); + } + if self.closed && frame.number >= self.last_frame_number { + bail!( + "frame number {} is greater than or equal to end frame number {}", + frame.number, + self.last_frame_number + ); + } + + // Guaranteed to succeed at this point. Update the channel state. + if frame.is_last { + self.last_frame_number = frame.number; + self.closed = true; + + // Prune frames with a higher number than the last frame number when we receive a closing frame. + if self.last_frame_number < self.highest_frame_number { + self.inputs.retain(|id, frame| { + self.estimated_size -= frame.size(); + *id < self.last_frame_number + }); + self.highest_frame_number = self.last_frame_number; + } + } + + // Update the highest frame number. + if frame.number > self.highest_frame_number { + self.highest_frame_number = frame.number; + } + + if self.highest_l1_inclusion_block.number < l1_inclusion_block.number { + self.highest_l1_inclusion_block = l1_inclusion_block; + } + + self.estimated_size += frame.size(); + self.inputs.insert(frame.number, frame); + Ok(()) + } + + /// Returns the block number of the L1 block that contained the first [Frame] in this channel. + pub fn open_block_number(&self) -> u64 { + self.open_block.number + } + + /// Returns the estimated size of the channel including [Frame] overhead. + pub fn size(&self) -> usize { + self.estimated_size + } + + /// Returns `true` if the channel is ready to be read. + pub fn is_ready(&self) -> bool { + // Must have buffered the last frame before the channel is ready. + if !self.closed { + return false; + } + + // Must have the possibility of contiguous frames. + if self.inputs.len() != (self.last_frame_number + 1) as usize { + return false; + } + + // Check for contiguous frames. + for i in 0..=self.last_frame_number { + if !self.inputs.contains_key(&i) { + return false; + } + } + + true + } +} diff --git a/crates/derive/src/types/frame.rs b/crates/derive/src/types/frame.rs index 3c7fae171..23962eac7 100644 --- a/crates/derive/src/types/frame.rs +++ b/crates/derive/src/types/frame.rs @@ -1,6 +1,6 @@ //! This module contains the [Frame] type used within the derivation pipeline. -use crate::params::{ChannelID, DERIVATION_VERSION_0}; +use crate::params::{ChannelID, DERIVATION_VERSION_0, FRAME_OVERHEAD}; use alloc::vec::Vec; use anyhow::{anyhow, bail, Result}; @@ -109,6 +109,13 @@ impl Frame { Ok(frames) } + + /// Calculates the size of the frame + overhead for storing the frame. The sum of the frame size of each frame in + /// a channel determines the channel's size. The sum of the channel sizes is used for pruning & compared against + /// the max channel bank size. + pub fn size(&self) -> usize { + self.data.len() + FRAME_OVERHEAD + } } #[cfg(test)] diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index c01263e1c..bdcc02627 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -32,3 +32,6 @@ pub use genesis::Genesis; mod frame; pub use frame::Frame; + +mod channel; +pub use channel::Channel;