From d0e92b82340ca056d6f58a5059482f3b68bae691 Mon Sep 17 00:00:00 2001 From: clabby Date: Sun, 25 Feb 2024 18:56:28 -0700 Subject: [PATCH] feat(derive): channel bank (#46) * init channel bank * `Channel` tests * ingest frame * Add rest of channel bank impl * Add custom `StageError` & `StageResult` Need more granularity in errors to distinguish responses for `EOF` & others. * Respect EOF --- Cargo.lock | 49 ++++ README.md | 1 + crates/derive/Cargo.toml | 1 + crates/derive/README.md | 2 + crates/derive/src/params.rs | 5 +- crates/derive/src/stages/channel_bank.rs | 203 ++++++++++++++ crates/derive/src/stages/frame_queue.rs | 27 +- crates/derive/src/stages/l1_retrieval.rs | 37 +-- crates/derive/src/stages/l1_traversal.rs | 24 +- crates/derive/src/traits/data_sources.rs | 12 +- crates/derive/src/traits/stages.rs | 5 +- crates/derive/src/types/channel.rs | 333 +++++++++++++++++++++++ crates/derive/src/types/errors.rs | 34 +++ crates/derive/src/types/frame.rs | 11 +- crates/derive/src/types/mod.rs | 6 + 15 files changed, 693 insertions(+), 57 deletions(-) create mode 100644 crates/derive/src/types/channel.rs create mode 100644 crates/derive/src/types/errors.rs diff --git a/Cargo.lock b/Cargo.lock index 4a39c2c6e..6e3e2fced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,24 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d713b3834d76b85304d4d525563c1276e2e30dc97cc67bfb4585a4a29fc2c89f" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "alloy-primitives" version = "0.6.3" @@ -242,6 +260,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -304,6 +326,7 @@ dependencies = [ "alloy-sol-types", "anyhow", "async-trait", + "hashbrown", "serde", ] @@ -410,6 +433,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + [[package]] name = "parking_lot" version = "0.12.1" @@ -912,6 +941,26 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/README.md b/README.md index cb63b3ef4..1193c1c07 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ verify an [L2 output root][g-output-root] from the L1 inputs it was [derived fro - [`common`](./crates/common): A suite of utilities for developing `client` programs to be ran on top of Fault Proof VMs. - [`preimage`](./crates/preimage): High level interfaces to the [`PreimageOracle`][fpp-specs] ABI +- [`derive`](./crates/derive): `no_std` compatible implementation of the [derivation pipeline][g-derivation-pipeline]. ## Book diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index d628856c7..3bb3abf75 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -17,6 +17,7 @@ alloy-primitives = { version = "0.6.3", default-features = false, features = ["r alloy-rlp = { version = "0.3.4", default-features = false, features = ["derive"] } alloy-sol-types = { version = "0.6.3", default-features = false } async-trait = "0.1.77" +hashbrown = "0.14.3" # Optional serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true } diff --git a/crates/derive/README.md b/crates/derive/README.md index 33dc82728..3957dd801 100644 --- a/crates/derive/README.md +++ b/crates/derive/README.md @@ -1,4 +1,6 @@ # `kona-derive` +> **Notice**: This crate is a WIP. + A `no_std` compatible implementation of the OP Stack's [derivation pipeline](https://specs.optimism.io/protocol/derivation.html#l2-chain-derivation-specification). diff --git a/crates/derive/src/params.rs b/crates/derive/src/params.rs index ffb47801b..f14f60d93 100644 --- a/crates/derive/src/params.rs +++ b/crates/derive/src/params.rs @@ -1,7 +1,7 @@ //! This module contains the parameters and identifying types for the derivation pipeline. /// Count the tagging info as 200 in terms of buffer size. -pub const FRAME_OVERHEAD: u64 = 200; +pub const FRAME_OVERHEAD: usize = 200; /// The version of the derivation pipeline. pub const DERIVATION_VERSION_0: u8 = 0; @@ -15,6 +15,9 @@ pub const MAX_SPAN_BATCH_BYTES: u64 = MAX_RLP_BYTES_PER_CHANNEL; /// a channel. This limit is set when decoding the RLP. pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000; +/// The maximum size of a channel bank. +pub const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000; + /// [CHANNEL_ID_LENGTH] is the length of the channel ID. pub const CHANNEL_ID_LENGTH: usize = 16; diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 8b1378917..f214efe1c 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -1 +1,204 @@ +//! This module contains the `ChannelBank` struct. +use super::{frame_queue::FrameQueue, l1_retrieval::L1Retrieval}; +use crate::{ + params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, + traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, + types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, +}; +use alloc::{boxed::Box, collections::VecDeque}; +use alloy_primitives::Bytes; +use anyhow::{anyhow, bail}; +use async_trait::async_trait; +use hashbrown::HashMap; + +/// [ChannelBank] is a stateful stage that does the following: +/// 1. Unmarshalls frames from L1 transaction data +/// 2. Applies those frames to a channel +/// 3. Attempts to read from the channel when it is ready +/// 4. Prunes channels (not frames) when the channel bank is too large. +/// +/// Note: we prune before we ingest data. +/// As we switch between ingesting data & reading, the prune step occurs at an odd point +/// Specifically, the channel bank is not allowed to become too large between successive calls +/// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large. +/// [ChannelBank] buffers channel frames, and emits full channel data +pub struct ChannelBank +where + DAP: DataAvailabilityProvider, + CP: ChainProvider, +{ + /// The rollup configuration. + cfg: RollupConfig, + /// Map of channels by ID. + channels: HashMap, + /// Channels in FIFO order. + channel_queue: VecDeque, + /// The previous stage of the derivation pipeline. + prev: FrameQueue, + /// Chain provider. + chain_provider: CP, +} + +impl ChannelBank +where + DAP: DataAvailabilityProvider, + CP: ChainProvider, +{ + /// Create a new [ChannelBank] stage. + pub fn new(cfg: RollupConfig, prev: FrameQueue, chain_provider: CP) -> Self { + Self { + cfg, + channels: HashMap::new(), + channel_queue: VecDeque::new(), + prev, + chain_provider, + } + } + + /// Returns the L1 origin [BlockInfo]. + pub fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } + + /// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE]. + pub fn prune(&mut self) -> StageResult<()> { + // Check total size + let mut total_size = self.channels.iter().fold(0, |acc, (_, c)| acc + c.size()); + // Prune until it is reasonable again. The high-priority channel failed to be read, + // so we prune from there. + while total_size > MAX_CHANNEL_BANK_SIZE { + let id = self + .channel_queue + .pop_front() + .ok_or(anyhow!("No channel to prune"))?; + let channel = self + .channels + .remove(&id) + .ok_or(anyhow!("Could not find channel"))?; + total_size -= channel.size(); + } + Ok(()) + } + + /// Adds new L1 data to the channel bank. Should only be called after all data has been read. + pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> { + let origin = *self.origin().ok_or(anyhow!("No origin"))?; + + let current_channel = self.channels.entry(frame.id).or_insert_with(|| { + // Create a new channel + let channel = Channel::new(frame.id, origin); + self.channel_queue.push_back(frame.id); + channel + }); + + // Check if the channel is not timed out. If it has, ignore the frame. + if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number { + return Ok(()); + } + + // Ingest the frame. If it fails, ignore the frame. + if current_channel.add_frame(frame, origin).is_err() { + return Ok(()); + } + + self.prune() + } + + /// Read the raw data of the first channel, if it's timed-out or closed. + /// + /// Returns an error if there is nothing new to read. + pub fn read(&mut self) -> StageResult> { + // Bail if there are no channels to read from. + if self.channel_queue.is_empty() { + return Err(StageError::Eof); + } + + // Return an `Ok(None)` if the first channel is timed out. There may be more timed + // out channels at the head of the queue and we want to remove them all. + let first = self.channel_queue[0]; + let channel = self + .channels + .get(&first) + .ok_or(anyhow!("Channel not found"))?; + let origin = self.origin().ok_or(anyhow!("No origin present"))?; + + if channel.open_block_number() + self.cfg.channel_timeout < origin.number { + self.channels.remove(&first); + self.channel_queue.pop_front(); + return Ok(None); + } + + // At the point we have removed all timed out channels from the front of the `channel_queue`. + // Pre-Canyon we simply check the first index. + // Post-Canyon we read the entire channelQueue for the first ready channel. If no channel is + // available, we return `nil, io.EOF`. + // Canyon is activated when the first L1 block whose time >= CanyonTime, not on the L2 timestamp. + if !self.cfg.is_canyon_active(origin.timestamp) { + return self.try_read_channel_at_index(0).map(Some); + } + + let channel_data = + (0..self.channel_queue.len()).find_map(|i| self.try_read_channel_at_index(i).ok()); + match channel_data { + Some(data) => Ok(Some(data)), + None => Err(StageError::Eof), + } + } + + /// Pulls the next piece of data from the channel bank. Note that it attempts to pull data out of the channel bank prior to + /// loading data in (unlike most other stages). This is to ensure maintain consistency around channel bank pruning which depends upon the order + /// of operations. + pub async fn next_data(&mut self) -> StageResult> { + match self.read() { + Err(StageError::Eof) => { + // continue - we will attempt to load data into the channel bank + } + Err(e) => { + return Err(anyhow!("Error fetching next data from channel bank: {:?}", e).into()); + } + data => return data, + }; + + // Load the data into the channel bank + let frame = self.prev.next_frame().await?; + self.ingest_frame(frame); + Err(StageError::NotEnoughData) + } + + /// Attempts to read the channel at the specified index. If the channel is not ready or timed out, + /// it will return an error. + /// If the channel read was successful, it will remove the channel from the channel queue. + fn try_read_channel_at_index(&mut self, index: usize) -> StageResult { + let channel_id = self.channel_queue[index]; + let channel = self + .channels + .get(&channel_id) + .ok_or(anyhow!("Channel not found"))?; + let origin = self.origin().ok_or(anyhow!("No origin present"))?; + + let timed_out = channel.open_block_number() + self.cfg.channel_timeout < origin.number; + if timed_out || !channel.is_ready() { + return Err(StageError::Eof); + } + + let frame_data = channel.frame_data(); + self.channels.remove(&channel_id); + self.channel_queue.remove(index); + + frame_data.map_err(StageError::Custom) + } +} + +#[async_trait] +impl ResettableStage for ChannelBank +where + DAP: DataAvailabilityProvider + Send, + CP: ChainProvider + Send, +{ + async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { + self.channels.clear(); + self.channel_queue = VecDeque::with_capacity(10); + Err(StageError::Eof) + } +} diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index be8e4b769..5fbe138ff 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -3,33 +3,31 @@ use super::l1_retrieval::L1Retrieval; use crate::{ traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, - types::{BlockInfo, Frame, SystemConfig}, + types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; use alloy_primitives::Bytes; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; -pub struct FrameQueue +pub struct FrameQueue where - T: Into, DAP: DataAvailabilityProvider, CP: ChainProvider, { /// The previous stage in the pipeline. - pub prev: L1Retrieval, + pub prev: L1Retrieval, /// The current frame queue. queue: VecDeque, } -impl FrameQueue +impl FrameQueue where - T: Into, DAP: DataAvailabilityProvider, CP: ChainProvider, { /// Create a new frame queue stage. - pub fn new(prev: L1Retrieval) -> Self { + pub fn new(prev: L1Retrieval) -> Self { Self { prev, queue: VecDeque::new(), @@ -42,7 +40,7 @@ where } /// Fetches the next frame from the frame queue. - pub async fn next_frame(&mut self) -> Result { + pub async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { match self.prev.next_data().await { Ok(data) => { @@ -51,30 +49,29 @@ where } } Err(e) => { - bail!("Error fetching next data: {e}") + return Err(anyhow!("Error fetching next data: {e}").into()); } } } // If we did not add more frames but still have more data, retry this function. if self.queue.is_empty() { - bail!("Not enough data"); + return Err(anyhow!("Not enough data").into()); } self.queue .pop_front() - .ok_or_else(|| anyhow!("Frame queue is impossibly empty.")) + .ok_or_else(|| anyhow!("Frame queue is impossibly empty.").into()) } } #[async_trait] -impl ResettableStage for FrameQueue +impl ResettableStage for FrameQueue where - T: Into, DAP: DataAvailabilityProvider + Send, CP: ChainProvider + Send, { - async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()> { + async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { self.queue = VecDeque::default(); - Ok(()) + Err(StageError::Eof) } } diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index c070cde6c..e73bcb053 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -3,16 +3,16 @@ use super::L1Traversal; use crate::{ traits::{ChainProvider, DataAvailabilityProvider, DataIter, ResettableStage}, - types::{BlockInfo, SystemConfig}, + types::{BlockInfo, StageError, StageResult, SystemConfig}, }; -use alloc::boxed::Box; +use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; -use anyhow::{anyhow, Result}; +use anyhow::anyhow; use async_trait::async_trait; /// The L1 retrieval stage of the derivation pipeline. #[derive(Debug)] -pub struct L1Retrieval +pub struct L1Retrieval where DAP: DataAvailabilityProvider, CP: ChainProvider, @@ -22,12 +22,11 @@ where /// The data availability provider to use for the L1 retrieval stage. pub provider: DAP, /// The current data iterator. - data: Option>, + data: Option>, } -impl L1Retrieval +impl L1Retrieval where - T: Into, DAP: DataAvailabilityProvider, CP: ChainProvider, { @@ -48,11 +47,11 @@ where /// Retrieves the next data item from the L1 retrieval stage. /// If there is data, it pushes it into the next stage. /// If there is no data, it returns an error. - pub async fn next_data(&mut self) -> Result { + pub async fn next_data(&mut self) -> StageResult { if self.data.is_none() { let next = self .prev - .next_l1_block() + .next_l1_block()? .ok_or_else(|| anyhow!("No block to retrieve data from"))?; self.data = Some( self.provider @@ -61,23 +60,25 @@ where ); } - // Fetch next data item from the iterator. - let data = self.data.as_mut().and_then(|d| d.next()).ok_or_else(|| { - self.data = None; - anyhow!("No more data to retrieve") - })?; - Ok(data.into()) + let data = self.data.as_mut().expect("Cannot be None").next(); + match data { + Ok(data) => Ok(data), + Err(StageError::Eof) => { + self.data = None; + Err(StageError::Eof) + } + Err(e) => Err(e), + } } } #[async_trait] -impl ResettableStage for L1Retrieval +impl ResettableStage for L1Retrieval where - T: Into, DAP: DataAvailabilityProvider + Send, CP: ChainProvider + Send, { - async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()> { + async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?); Ok(()) } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 7d8b0d7da..ff83591bc 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,10 +2,10 @@ use crate::{ traits::{ChainProvider, ResettableStage}, - types::{BlockInfo, RollupConfig, SystemConfig}, + types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail}; use async_trait::async_trait; /// The L1 traversal stage of the derivation pipeline. @@ -37,12 +37,12 @@ impl L1Traversal { /// Returns the next L1 block in the traversal stage, if the stage has not been completed. This function can only /// be called once, and will return `None` on subsequent calls unless the stage is reset. - pub fn next_l1_block(&mut self) -> Option { + pub fn next_l1_block(&mut self) -> StageResult> { if !self.done { self.done = true; - self.block + Ok(self.block) } else { - None + Err(StageError::Eof) } } @@ -52,7 +52,8 @@ impl L1Traversal { } /// Advances the internal state of the [L1Traversal] stage to the next L1 block. - pub async fn advance_l1_block(&mut self) -> Result<()> { + pub async fn advance_l1_block(&mut self) -> StageResult<()> { + // TODO: Return EOF if the block wasn't found. let block = self.block.ok_or(anyhow!("No block to advance from"))?; let next_l1_origin = self .data_source @@ -61,11 +62,12 @@ impl L1Traversal { // Check for reorgs if block.hash != next_l1_origin.parent_hash { - bail!( + return Err(anyhow!( "Detected L1 reorg from {} to {} with conflicting parent", block.hash, next_l1_origin.hash - ); + ) + .into()); } // Fetch receipts. @@ -87,12 +89,10 @@ impl L1Traversal { #[async_trait] impl ResettableStage for L1Traversal { - async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()> { + async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> { self.block = Some(base); self.done = false; self.system_config = cfg; - - // TODO: Do we want to return an error here w/ EOF? - Ok(()) + Err(StageError::Eof) } } diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index a871d7cd2..14f0ffe3e 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -1,9 +1,9 @@ //! Contains traits that describe the functionality of various data sources used in the derivation pipeline's stages. // use alloy_rpc_types::Block; -use crate::types::{BlockInfo, Receipt}; +use crate::types::{BlockInfo, Receipt, StageResult}; use alloc::{boxed::Box, vec::Vec}; -use alloy_primitives::{Address, B256}; +use alloy_primitives::{Address, Bytes, B256}; use anyhow::Result; use async_trait::async_trait; @@ -22,11 +22,11 @@ pub trait ChainProvider { #[async_trait] pub trait DataAvailabilityProvider { /// A data iterator for the data source to return. - type DataIter: DataIter + Send; + type DataIter>: DataIter + Send; /// Returns the data availability for the block with the given hash, or an error if the block does not exist in the /// data source. - async fn open_data( + async fn open_data>( &self, block_ref: &BlockInfo, batcher_address: Address, @@ -35,6 +35,6 @@ pub trait DataAvailabilityProvider { /// Describes the behavior of a data iterator. pub trait DataIter { - /// Returns the next item in the iterator, or `None` if the iterator is exhausted. - fn next(&mut self) -> Option; + /// Returns the next item in the iterator, or [crate::types::StageError::Eof] if the iterator is exhausted. + fn next(&mut self) -> StageResult; } diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index a039f3172..2c006de3e 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -1,13 +1,12 @@ //! This module contains common traits for stages within the derivation pipeline. -use crate::types::{BlockInfo, SystemConfig}; +use crate::types::{BlockInfo, StageResult, SystemConfig}; use alloc::boxed::Box; -use anyhow::Result; use async_trait::async_trait; /// Describes the functionality fo a resettable stage within the derivation pipeline. #[async_trait] pub trait ResettableStage { /// Resets the derivation stage to its initial state. - async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()>; + async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()>; } diff --git a/crates/derive/src/types/channel.rs b/crates/derive/src/types/channel.rs new file mode 100644 index 000000000..bb8316b6b --- /dev/null +++ b/crates/derive/src/types/channel.rs @@ -0,0 +1,333 @@ +//! This module contains the [Channel] struct. + +use crate::{ + params::ChannelID, + types::{BlockInfo, Frame}, +}; +use alloc::vec::Vec; +use alloy_primitives::Bytes; +use anyhow::{anyhow, bail, Result}; +use hashbrown::HashMap; + +/// A Channel is a set of batches that are split into at least one, but possibly multiple frames. +/// Frames are allowed to be ingested out of order. +/// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the +/// channel may mark itself as ready for reading once all intervening frames have been added +#[derive(Debug, Clone, Default)] +pub struct Channel { + /// The unique identifier for this channel + id: ChannelID, + /// The block that the channel is currently open at + open_block: BlockInfo, + /// Estimated memory size, used to drop the channel if we have too much data + estimated_size: usize, + /// True if the last frame has been buffered + closed: bool, + /// The highest frame number that has been ingested + highest_frame_number: u16, + /// The frame number of the frame where `is_last` is true + /// No other frame number may be higher than this + last_frame_number: u16, + /// Store a map of frame number to frame for constant time ordering + inputs: HashMap, + /// The highest L1 inclusion block that a frame was included in + highest_l1_inclusion_block: BlockInfo, +} + +impl Channel { + /// Create a new [Channel] with the given [ChannelID] and [BlockInfo]. + pub fn new(id: ChannelID, open_block: BlockInfo) -> Self { + Self { + id, + open_block, + inputs: HashMap::new(), + ..Default::default() + } + } + + /// Add a frame to the channel. + /// + /// ## Takes + /// - `frame`: The frame to add to the channel + /// - `l1_inclusion_block`: The block that the frame was included in + /// + /// ## Returns + /// - `Ok(()):` If the frame was successfully buffered + /// - `Err(_):` If the frame was invalid + pub fn add_frame(&mut self, frame: Frame, l1_inclusion_block: BlockInfo) -> Result<()> { + // Ensure that the frame ID is equal to the channel ID. + if frame.id != self.id { + bail!("Frame ID does not match channel ID"); + } + if frame.is_last && self.closed { + bail!( + "Cannot add ending frame to a closed channel. Channel ID: {:?}", + self.id + ); + } + if self.inputs.contains_key(&frame.number) { + bail!( + "Frame number already exists in channel. Channel ID: {:?}", + self.id + ); + } + if self.closed && frame.number >= self.last_frame_number { + bail!( + "frame number {} is greater than or equal to end frame number {}", + frame.number, + self.last_frame_number + ); + } + + // Guaranteed to succeed at this point. Update the channel state. + if frame.is_last { + self.last_frame_number = frame.number; + self.closed = true; + + // Prune frames with a higher number than the last frame number when we receive a closing frame. + if self.last_frame_number < self.highest_frame_number { + self.inputs.retain(|id, frame| { + self.estimated_size -= frame.size(); + *id < self.last_frame_number + }); + self.highest_frame_number = self.last_frame_number; + } + } + + // Update the highest frame number. + if frame.number > self.highest_frame_number { + self.highest_frame_number = frame.number; + } + + if self.highest_l1_inclusion_block.number < l1_inclusion_block.number { + self.highest_l1_inclusion_block = l1_inclusion_block; + } + + self.estimated_size += frame.size(); + self.inputs.insert(frame.number, frame); + Ok(()) + } + + /// Returns the block number of the L1 block that contained the first [Frame] in this channel. + pub fn open_block_number(&self) -> u64 { + self.open_block.number + } + + /// Returns the estimated size of the channel including [Frame] overhead. + pub fn size(&self) -> usize { + self.estimated_size + } + + /// Returns `true` if the channel is ready to be read. + pub fn is_ready(&self) -> bool { + // Must have buffered the last frame before the channel is ready. + if !self.closed { + return false; + } + + // Must have the possibility of contiguous frames. + if self.inputs.len() != (self.last_frame_number + 1) as usize { + return false; + } + + // Check for contiguous frames. + for i in 0..=self.last_frame_number { + if !self.inputs.contains_key(&i) { + return false; + } + } + + true + } + + /// Returns all of the channel's [Frame]s concatenated together. + pub fn frame_data(&self) -> Result { + let mut data = Vec::with_capacity(self.size()); + (0..=self.last_frame_number).try_for_each(|i| { + let frame = self + .inputs + .get(&i) + .ok_or_else(|| anyhow!("Frame not found"))?; + data.extend_from_slice(&frame.data); + Ok(()) + })?; + Ok(data.into()) + } +} + +#[cfg(test)] +mod test { + use super::Channel; + use crate::{ + params::ChannelID, + types::{BlockInfo, Frame}, + }; + use alloc::{ + string::{String, ToString}, + vec, + vec::Vec, + }; + + extern crate std; + + struct FrameValidityTestCase { + name: String, + frames: Vec, + should_error: Vec, + sizes: Vec, + } + + fn run_frame_validity_test(test_case: FrameValidityTestCase) { + let id = [0xFF; 16]; + let block = BlockInfo::default(); + let mut channel = Channel::new(id, block); + + if test_case.frames.len() != test_case.should_error.len() + || test_case.frames.len() != test_case.sizes.len() + { + panic!("Test case length mismatch"); + } + + for (i, frame) in test_case.frames.iter().enumerate() { + let result = channel.add_frame(frame.clone(), block); + if test_case.should_error[i] { + assert!(result.is_err()); + } else { + assert!(result.is_ok()); + } + assert_eq!(channel.size(), test_case.sizes[i] as usize); + } + } + + #[test] + fn test_frame_validity() { + let id = [0xFF; 16]; + let test_cases = [ + FrameValidityTestCase { + name: "wrong channel".to_string(), + frames: vec![Frame { + id: [0xEE; 16], + ..Default::default() + }], + should_error: vec![true], + sizes: vec![0], + }, + FrameValidityTestCase { + name: "double close".to_string(), + frames: vec![ + Frame { + id, + is_last: true, + number: 2, + data: b"four".to_vec(), + }, + Frame { + id, + is_last: true, + number: 1, + ..Default::default() + }, + ], + should_error: vec![false, true], + sizes: vec![204, 204], + }, + FrameValidityTestCase { + name: "duplicate frame".to_string(), + frames: vec![ + Frame { + id, + number: 2, + data: b"four".to_vec(), + ..Default::default() + }, + Frame { + id, + number: 2, + data: b"seven".to_vec(), + ..Default::default() + }, + ], + should_error: vec![false, true], + sizes: vec![204, 204], + }, + FrameValidityTestCase { + name: "duplicate closing frames".to_string(), + frames: vec![ + Frame { + id, + number: 2, + is_last: true, + data: b"four".to_vec(), + }, + Frame { + id, + number: 2, + is_last: true, + data: b"seven".to_vec(), + }, + ], + should_error: vec![false, true], + sizes: vec![204, 204], + }, + FrameValidityTestCase { + name: "frame past closing".to_string(), + frames: vec![ + Frame { + id, + number: 2, + is_last: true, + data: b"four".to_vec(), + }, + Frame { + id, + number: 10, + data: b"seven".to_vec(), + ..Default::default() + }, + ], + should_error: vec![false, true], + sizes: vec![204, 204], + }, + FrameValidityTestCase { + name: "prune after close frame".to_string(), + frames: vec![ + Frame { + id, + number: 10, + is_last: false, + data: b"seven".to_vec(), + }, + Frame { + id, + number: 2, + is_last: true, + data: b"four".to_vec(), + }, + ], + should_error: vec![false, false], + sizes: vec![205, 204], + }, + FrameValidityTestCase { + name: "multiple valid frames".to_string(), + frames: vec![ + Frame { + id, + number: 10, + data: b"seven__".to_vec(), + ..Default::default() + }, + Frame { + id, + number: 2, + data: b"four".to_vec(), + ..Default::default() + }, + ], + should_error: vec![false, false], + sizes: vec![207, 411], + }, + ]; + + test_cases.into_iter().for_each(run_frame_validity_test); + } +} diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs new file mode 100644 index 000000000..7d34b8404 --- /dev/null +++ b/crates/derive/src/types/errors.rs @@ -0,0 +1,34 @@ +//! This module contains derivation errors thrown within the pipeline. + +use core::fmt::Display; + +/// An error that is thrown within the stages of the derivation pipeline. +#[derive(Debug)] +pub enum StageError { + /// There is no data to read from the channel bank. + Eof, + /// There is not enough data progress, but if we wait, the stage will eventually return data + /// or produce an EOF error. + NotEnoughData, + /// Other wildcard error. + Custom(anyhow::Error), +} + +/// A result type for the derivation pipeline stages. +pub type StageResult = Result; + +impl From for StageError { + fn from(e: anyhow::Error) -> Self { + StageError::Custom(e) + } +} + +impl Display for StageError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + StageError::Eof => write!(f, "End of file"), + StageError::NotEnoughData => write!(f, "Not enough data"), + StageError::Custom(e) => write!(f, "Custom error: {}", e), + } + } +} diff --git a/crates/derive/src/types/frame.rs b/crates/derive/src/types/frame.rs index 3c7fae171..69428756e 100644 --- a/crates/derive/src/types/frame.rs +++ b/crates/derive/src/types/frame.rs @@ -1,6 +1,6 @@ //! This module contains the [Frame] type used within the derivation pipeline. -use crate::params::{ChannelID, DERIVATION_VERSION_0}; +use crate::params::{ChannelID, DERIVATION_VERSION_0, FRAME_OVERHEAD}; use alloc::vec::Vec; use anyhow::{anyhow, bail, Result}; @@ -18,7 +18,7 @@ const MAX_FRAME_LEN: usize = 1000; /// * frame_data_length = uint32 /// * frame_data = bytes /// * is_last = bool -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct Frame { /// The unique idetifier for the frame. pub id: ChannelID, @@ -109,6 +109,13 @@ impl Frame { Ok(frames) } + + /// Calculates the size of the frame + overhead for storing the frame. The sum of the frame size of each frame in + /// a channel determines the channel's size. The sum of the channel sizes is used for pruning & compared against + /// the max channel bank size. + pub fn size(&self) -> usize { + self.data.len() + FRAME_OVERHEAD + } } #[cfg(test)] diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index c01263e1c..8a0229f89 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -32,3 +32,9 @@ pub use genesis::Genesis; mod frame; pub use frame::Frame; + +mod channel; +pub use channel::Channel; + +mod errors; +pub use errors::{StageError, StageResult};