diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 79206508f..aea6a7c9e 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -13,6 +13,11 @@ extern crate alloc; +pub mod params; pub mod stages; pub mod traits; pub mod types; + +/// The derivation pipeline is responsible for deriving L2 inputs from L1 data. +#[derive(Debug, Clone, Copy)] +pub struct DerivationPipeline; diff --git a/crates/derive/src/params.rs b/crates/derive/src/params.rs new file mode 100644 index 000000000..ffb47801b --- /dev/null +++ b/crates/derive/src/params.rs @@ -0,0 +1,22 @@ +//! This module contains the parameters and identifying types for the derivation pipeline. + +/// Count the tagging info as 200 in terms of buffer size. +pub const FRAME_OVERHEAD: u64 = 200; + +/// The version of the derivation pipeline. +pub const DERIVATION_VERSION_0: u8 = 0; + +/// [MAX_SPAN_BATCH_BYTES] is the maximum amount of bytes that will be needed +/// to decode every span batch field. This value cannot be larger than +/// MaxRLPBytesPerChannel because single batch cannot be larger than channel size. +pub const MAX_SPAN_BATCH_BYTES: u64 = MAX_RLP_BYTES_PER_CHANNEL; + +/// [MAX_RLP_BYTES_PER_CHANNEL] is the maximum amount of bytes that will be read from +/// a channel. This limit is set when decoding the RLP. +pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000; + +/// [CHANNEL_ID_LENGTH] is the length of the channel ID. +pub const CHANNEL_ID_LENGTH: usize = 16; + +/// [ChannelID] is an opaque identifier for a channel. +pub type ChannelID = [u8; CHANNEL_ID_LENGTH]; diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 8b1378917..be8e4b769 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -1 +1,80 @@ +//! This module contains the [FrameQueue] stage of the derivation pipeline. +use super::l1_retrieval::L1Retrieval; +use crate::{ + traits::{ChainProvider, DataAvailabilityProvider, ResettableStage}, + types::{BlockInfo, Frame, SystemConfig}, +}; +use alloc::{boxed::Box, collections::VecDeque}; +use alloy_primitives::Bytes; +use anyhow::{anyhow, bail, Result}; +use async_trait::async_trait; + +pub struct FrameQueue +where + T: Into, + DAP: DataAvailabilityProvider, + CP: ChainProvider, +{ + /// The previous stage in the pipeline. + pub prev: L1Retrieval, + /// The current frame queue. + queue: VecDeque, +} + +impl FrameQueue +where + T: Into, + DAP: DataAvailabilityProvider, + CP: ChainProvider, +{ + /// Create a new frame queue stage. + pub fn new(prev: L1Retrieval) -> Self { + Self { + prev, + queue: VecDeque::new(), + } + } + + /// Returns the L1 origin [BlockInfo]. + pub fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } + + /// Fetches the next frame from the frame queue. + pub async fn next_frame(&mut self) -> Result { + if self.queue.is_empty() { + match self.prev.next_data().await { + Ok(data) => { + if let Ok(frames) = Frame::parse_frames(data.as_ref()) { + self.queue.extend(frames); + } + } + Err(e) => { + bail!("Error fetching next data: {e}") + } + } + } + // If we did not add more frames but still have more data, retry this function. + if self.queue.is_empty() { + bail!("Not enough data"); + } + + self.queue + .pop_front() + .ok_or_else(|| anyhow!("Frame queue is impossibly empty.")) + } +} + +#[async_trait] +impl ResettableStage for FrameQueue +where + T: Into, + DAP: DataAvailabilityProvider + Send, + CP: ChainProvider + Send, +{ + async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()> { + self.queue = VecDeque::default(); + Ok(()) + } +} diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index 2a8b85c07..a871d7cd2 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -36,5 +36,5 @@ pub trait DataAvailabilityProvider { /// Describes the behavior of a data iterator. pub trait DataIter { /// Returns the next item in the iterator, or `None` if the iterator is exhausted. - fn next(&self) -> Option; + fn next(&mut self) -> Option; } diff --git a/crates/derive/src/types/frame.rs b/crates/derive/src/types/frame.rs new file mode 100644 index 000000000..3c7fae171 --- /dev/null +++ b/crates/derive/src/types/frame.rs @@ -0,0 +1,153 @@ +//! This module contains the [Frame] type used within the derivation pipeline. + +use crate::params::{ChannelID, DERIVATION_VERSION_0}; +use alloc::vec::Vec; +use anyhow::{anyhow, bail, Result}; + +/// Frames cannot be larger than 1MB. +/// Data transactions that carry frames are generally not larger than 128 KB due to L1 network conditions, +/// but we leave space to grow larger anyway (gas limit allows for more data). +const MAX_FRAME_LEN: usize = 1000; + +/// A channel frame is a segment of a channel's data. +/// +/// *Encoding* +/// frame = `channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last` +/// * channel_id = bytes16 +/// * frame_number = uint16 +/// * frame_data_length = uint32 +/// * frame_data = bytes +/// * is_last = bool +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Frame { + /// The unique idetifier for the frame. + pub id: ChannelID, + /// The number of the frame. + pub number: u16, + /// The data within the frame. + pub data: Vec, + /// Whether or not the frame is the last in the sequence. + pub is_last: bool, +} + +impl Frame { + /// Encode the frame into a byte vector. + pub fn encode(&self) -> Vec { + let mut encoded = Vec::with_capacity(16 + 2 + 4 + self.data.len() + 1); + encoded.extend_from_slice(&self.id); + encoded.extend_from_slice(&self.number.to_be_bytes()); + encoded.extend_from_slice(&(self.data.len() as u32).to_be_bytes()); + encoded.extend_from_slice(&self.data); + encoded.push(self.is_last as u8); + encoded + } + + /// Decode a frame from a byte vector. + pub fn decode(encoded: &[u8]) -> Result<(usize, Self)> { + const BASE_FRAME_LEN: usize = 16 + 2 + 4 + 1; + + if encoded.len() < BASE_FRAME_LEN { + bail!("Frame too short to decode"); + } + + let id = encoded[..16] + .try_into() + .map_err(|e| anyhow!("Error: {e}"))?; + let number = u16::from_be_bytes( + encoded[16..18] + .try_into() + .map_err(|e| anyhow!("Error: {e}"))?, + ); + let data_len = u32::from_be_bytes( + encoded[18..22] + .try_into() + .map_err(|e| anyhow!("Error: {e}"))?, + ) as usize; + let data = encoded[22..22 + data_len].to_vec(); + let is_last = encoded[22 + data_len] != 0; + Ok(( + BASE_FRAME_LEN + data_len, + Self { + id, + number, + data, + is_last, + }, + )) + } + + /// ParseFrames parse the on chain serialization of frame(s) in an L1 transaction. Currently only version 0 of the + /// serialization format is supported. All frames must be parsed without error and there must not be any left over + /// data and there must be at least one frame. + /// + /// Frames are stored in L1 transactions with the following format: + /// * `data = DerivationVersion0 ++ Frame(s)` + /// Where there is one or more frames concatenated together. + pub fn parse_frames(encoded: &[u8]) -> Result> { + if encoded.is_empty() { + bail!("No frames to parse"); + } + if encoded[0] != DERIVATION_VERSION_0 { + bail!("Unsupported derivation version"); + } + + let data = &encoded[1..]; + let mut frames = Vec::new(); + let mut offset = 0; + while offset < data.len() { + let (frame_length, frame) = Self::decode(&data[offset..])?; + frames.push(frame); + offset += frame_length; + } + + if offset != data.len() { + bail!("Frame data length mismatch"); + } + if frames.is_empty() { + bail!("No frames decoded"); + } + + Ok(frames) + } +} + +#[cfg(test)] +mod test { + extern crate std; + + use super::*; + + #[test] + fn test_encode_frame_roundtrip() { + let frame = Frame { + id: [0xFF; 16], + number: 0xEE, + data: std::vec![0xDD; 50], + is_last: true, + }; + + let (_, frame_decoded) = Frame::decode(&frame.encode()).unwrap(); + assert_eq!(frame, frame_decoded); + } + + #[test] + fn test_decode_many() { + let frame = Frame { + id: [0xFF; 16], + number: 0xEE, + data: std::vec![0xDD; 50], + is_last: true, + }; + let mut bytes = Vec::new(); + bytes.extend_from_slice(&[DERIVATION_VERSION_0]); + (0..5).for_each(|_| { + bytes.extend_from_slice(&frame.encode()); + }); + + let frames = Frame::parse_frames(bytes.as_slice()).unwrap(); + assert_eq!(frames.len(), 5); + (0..5).for_each(|i| { + assert_eq!(frames[i], frame); + }); + } +} diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 52fc6be11..c01263e1c 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -29,3 +29,6 @@ pub use eips::{ mod genesis; pub use genesis::Genesis; + +mod frame; +pub use frame::Frame; diff --git a/crates/derive/src/types/system_config.rs b/crates/derive/src/types/system_config.rs index 188cb21f5..dc5e79b0c 100644 --- a/crates/derive/src/types/system_config.rs +++ b/crates/derive/src/types/system_config.rs @@ -68,7 +68,7 @@ impl SystemConfig { continue; } - for log in receipt.logs.iter() { + receipt.logs.iter().try_for_each(|log| { let topics = log.topics(); if log.address == rollup_config.l1_system_config_address && !topics.is_empty() @@ -76,7 +76,8 @@ impl SystemConfig { { self.process_config_update_log(log, rollup_config, l1_time)?; } - } + Ok(()) + })?; } Ok(()) }