Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Frame queue stage #45

Merged
merged 2 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

extern crate alloc;

pub mod params;
pub mod stages;
pub mod traits;
pub mod types;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;
22 changes: 22 additions & 0 deletions crates/derive/src/params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! This module contains the parameters and identifying types for the derivation pipeline.

/// Count the tagging info as 200 in terms of buffer size.
pub const FRAME_OVERHEAD: u64 = 200;

/// The version of the derivation pipeline.
pub const DERIVATION_VERSION_0: u8 = 0;

/// [MAX_SPAN_BATCH_BYTES] is the maximum amount of bytes that will be needed
/// to decode every span batch field. This value cannot be larger than
/// MaxRLPBytesPerChannel because single batch cannot be larger than channel size.
pub const MAX_SPAN_BATCH_BYTES: u64 = MAX_RLP_BYTES_PER_CHANNEL;

/// [MAX_RLP_BYTES_PER_CHANNEL] is the maximum amount of bytes that will be read from
/// a channel. This limit is set when decoding the RLP.
pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;

/// [CHANNEL_ID_LENGTH] is the length of the channel ID.
pub const CHANNEL_ID_LENGTH: usize = 16;

/// [ChannelID] is an opaque identifier for a channel.
pub type ChannelID = [u8; CHANNEL_ID_LENGTH];
79 changes: 79 additions & 0 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
@@ -1 +1,80 @@
//! This module contains the [FrameQueue] stage of the derivation pipeline.

use super::l1_retrieval::L1Retrieval;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider, ResettableStage},
types::{BlockInfo, Frame, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque};
use alloy_primitives::Bytes;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;

pub struct FrameQueue<T, DAP, CP>
where
T: Into<Bytes>,
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// The previous stage in the pipeline.
pub prev: L1Retrieval<T, DAP, CP>,
/// The current frame queue.
queue: VecDeque<Frame>,
}

impl<T, DAP, CP> FrameQueue<T, DAP, CP>
where
T: Into<Bytes>,
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// Create a new frame queue stage.
pub fn new(prev: L1Retrieval<T, DAP, CP>) -> Self {
Self {
prev,
queue: VecDeque::new(),
}
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Fetches the next frame from the frame queue.
pub async fn next_frame(&mut self) -> Result<Frame> {
if self.queue.is_empty() {
match self.prev.next_data().await {
Ok(data) => {
if let Ok(frames) = Frame::parse_frames(data.as_ref()) {
self.queue.extend(frames);
}
}
Err(e) => {
bail!("Error fetching next data: {e}")
}
}
}
// If we did not add more frames but still have more data, retry this function.
if self.queue.is_empty() {
bail!("Not enough data");
}

self.queue
.pop_front()
.ok_or_else(|| anyhow!("Frame queue is impossibly empty."))
}
}

#[async_trait]
impl<T, DAP, CP> ResettableStage for FrameQueue<T, DAP, CP>
where
T: Into<Bytes>,
DAP: DataAvailabilityProvider + Send,
CP: ChainProvider + Send,
{
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> Result<()> {
self.queue = VecDeque::default();
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/derive/src/traits/data_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ pub trait DataAvailabilityProvider {
/// Describes the behavior of a data iterator.
pub trait DataIter<T> {
/// Returns the next item in the iterator, or `None` if the iterator is exhausted.
fn next(&self) -> Option<T>;
fn next(&mut self) -> Option<T>;
}
153 changes: 153 additions & 0 deletions crates/derive/src/types/frame.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! This module contains the [Frame] type used within the derivation pipeline.

use crate::params::{ChannelID, DERIVATION_VERSION_0};
use alloc::vec::Vec;
use anyhow::{anyhow, bail, Result};

/// Frames cannot be larger than 1MB.
/// Data transactions that carry frames are generally not larger than 128 KB due to L1 network conditions,
/// but we leave space to grow larger anyway (gas limit allows for more data).
const MAX_FRAME_LEN: usize = 1000;

/// A channel frame is a segment of a channel's data.
///
/// *Encoding*
/// frame = `channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last`
/// * channel_id = bytes16
/// * frame_number = uint16
/// * frame_data_length = uint32
/// * frame_data = bytes
/// * is_last = bool
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Frame {
/// The unique idetifier for the frame.
pub id: ChannelID,
/// The number of the frame.
pub number: u16,
/// The data within the frame.
pub data: Vec<u8>,
/// Whether or not the frame is the last in the sequence.
pub is_last: bool,
}

impl Frame {
/// Encode the frame into a byte vector.
pub fn encode(&self) -> Vec<u8> {
let mut encoded = Vec::with_capacity(16 + 2 + 4 + self.data.len() + 1);
encoded.extend_from_slice(&self.id);
encoded.extend_from_slice(&self.number.to_be_bytes());
encoded.extend_from_slice(&(self.data.len() as u32).to_be_bytes());
encoded.extend_from_slice(&self.data);
encoded.push(self.is_last as u8);
encoded
}

/// Decode a frame from a byte vector.
pub fn decode(encoded: &[u8]) -> Result<(usize, Self)> {
const BASE_FRAME_LEN: usize = 16 + 2 + 4 + 1;

if encoded.len() < BASE_FRAME_LEN {
bail!("Frame too short to decode");
}

let id = encoded[..16]
.try_into()
.map_err(|e| anyhow!("Error: {e}"))?;
let number = u16::from_be_bytes(
encoded[16..18]
.try_into()
.map_err(|e| anyhow!("Error: {e}"))?,
);
let data_len = u32::from_be_bytes(
encoded[18..22]
.try_into()
.map_err(|e| anyhow!("Error: {e}"))?,
) as usize;
let data = encoded[22..22 + data_len].to_vec();
let is_last = encoded[22 + data_len] != 0;
Ok((
BASE_FRAME_LEN + data_len,
Self {
id,
number,
data,
is_last,
},
))
}

/// ParseFrames parse the on chain serialization of frame(s) in an L1 transaction. Currently only version 0 of the
/// serialization format is supported. All frames must be parsed without error and there must not be any left over
/// data and there must be at least one frame.
///
/// Frames are stored in L1 transactions with the following format:
/// * `data = DerivationVersion0 ++ Frame(s)`
/// Where there is one or more frames concatenated together.
pub fn parse_frames(encoded: &[u8]) -> Result<Vec<Self>> {
if encoded.is_empty() {
bail!("No frames to parse");
}
if encoded[0] != DERIVATION_VERSION_0 {
bail!("Unsupported derivation version");
}

let data = &encoded[1..];
let mut frames = Vec::new();
let mut offset = 0;
while offset < data.len() {
let (frame_length, frame) = Self::decode(&data[offset..])?;
frames.push(frame);
offset += frame_length;
}

if offset != data.len() {
bail!("Frame data length mismatch");
}
if frames.is_empty() {
bail!("No frames decoded");
}

Ok(frames)
}
}

#[cfg(test)]
mod test {
extern crate std;

use super::*;

#[test]
fn test_encode_frame_roundtrip() {
let frame = Frame {
id: [0xFF; 16],
number: 0xEE,
data: std::vec![0xDD; 50],
is_last: true,
};

let (_, frame_decoded) = Frame::decode(&frame.encode()).unwrap();
assert_eq!(frame, frame_decoded);
}

#[test]
fn test_decode_many() {
let frame = Frame {
id: [0xFF; 16],
number: 0xEE,
data: std::vec![0xDD; 50],
is_last: true,
};
let mut bytes = Vec::new();
bytes.extend_from_slice(&[DERIVATION_VERSION_0]);
(0..5).for_each(|_| {
bytes.extend_from_slice(&frame.encode());
});

let frames = Frame::parse_frames(bytes.as_slice()).unwrap();
assert_eq!(frames.len(), 5);
(0..5).for_each(|i| {
assert_eq!(frames[i], frame);
});
}
}
3 changes: 3 additions & 0 deletions crates/derive/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ pub use eips::{

mod genesis;
pub use genesis::Genesis;

mod frame;
pub use frame::Frame;
5 changes: 3 additions & 2 deletions crates/derive/src/types/system_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ impl SystemConfig {
continue;
}

for log in receipt.logs.iter() {
receipt.logs.iter().try_for_each(|log| {
let topics = log.topics();
if log.address == rollup_config.l1_system_config_address
&& !topics.is_empty()
&& topics[0] == CONFIG_UPDATE_TOPIC
{
self.process_config_update_log(log, rollup_config, l1_time)?;
}
}
Ok(())
})?;
}
Ok(())
}
Expand Down
Loading