Skip to content

Commit

Permalink
feat(derive): channel bank (#46)
Browse files Browse the repository at this point in the history
* init channel bank

* `Channel` tests

* ingest frame

* Add rest of channel bank impl

* Add custom `StageError` & `StageResult`

Need more granularity in errors to distinguish responses for `EOF` & others.

* Respect EOF
  • Loading branch information
clabby authored Feb 26, 2024
1 parent 68bb004 commit d0e92b8
Show file tree
Hide file tree
Showing 15 changed files with 693 additions and 57 deletions.
49 changes: 49 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ verify an [L2 output root][g-output-root] from the L1 inputs it was [derived fro

- [`common`](./crates/common): A suite of utilities for developing `client` programs to be ran on top of Fault Proof VMs.
- [`preimage`](./crates/preimage): High level interfaces to the [`PreimageOracle`][fpp-specs] ABI
- [`derive`](./crates/derive): `no_std` compatible implementation of the [derivation pipeline][g-derivation-pipeline].

## Book

Expand Down
1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ alloy-primitives = { version = "0.6.3", default-features = false, features = ["r
alloy-rlp = { version = "0.3.4", default-features = false, features = ["derive"] }
alloy-sol-types = { version = "0.6.3", default-features = false }
async-trait = "0.1.77"
hashbrown = "0.14.3"

# Optional
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/derive/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# `kona-derive`

> **Notice**: This crate is a WIP.
A `no_std` compatible implementation of the OP Stack's
[derivation pipeline](https://specs.optimism.io/protocol/derivation.html#l2-chain-derivation-specification).
5 changes: 4 additions & 1 deletion crates/derive/src/params.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module contains the parameters and identifying types for the derivation pipeline.
/// Count the tagging info as 200 in terms of buffer size.
pub const FRAME_OVERHEAD: u64 = 200;
pub const FRAME_OVERHEAD: usize = 200;

/// The version of the derivation pipeline.
pub const DERIVATION_VERSION_0: u8 = 0;
Expand All @@ -15,6 +15,9 @@ pub const MAX_SPAN_BATCH_BYTES: u64 = MAX_RLP_BYTES_PER_CHANNEL;
/// a channel. This limit is set when decoding the RLP.
pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;

/// The maximum size of a channel bank.
pub const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000;

/// [CHANNEL_ID_LENGTH] is the length of the channel ID.
pub const CHANNEL_ID_LENGTH: usize = 16;

Expand Down
203 changes: 203 additions & 0 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
@@ -1 +1,204 @@
//! This module contains the `ChannelBank` struct.
use super::{frame_queue::FrameQueue, l1_retrieval::L1Retrieval};
use crate::{
params::{ChannelID, MAX_CHANNEL_BANK_SIZE},
traits::{ChainProvider, DataAvailabilityProvider, ResettableStage},
types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque};
use alloy_primitives::Bytes;
use anyhow::{anyhow, bail};
use async_trait::async_trait;
use hashbrown::HashMap;

/// [ChannelBank] is a stateful stage that does the following:
/// 1. Unmarshalls frames from L1 transaction data
/// 2. Applies those frames to a channel
/// 3. Attempts to read from the channel when it is ready
/// 4. Prunes channels (not frames) when the channel bank is too large.
///
/// Note: we prune before we ingest data.
/// As we switch between ingesting data & reading, the prune step occurs at an odd point
/// Specifically, the channel bank is not allowed to become too large between successive calls
/// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large.
/// [ChannelBank] buffers channel frames, and emits full channel data
pub struct ChannelBank<DAP, CP>
where
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// The rollup configuration.
cfg: RollupConfig,
/// Map of channels by ID.
channels: HashMap<ChannelID, Channel>,
/// Channels in FIFO order.
channel_queue: VecDeque<ChannelID>,
/// The previous stage of the derivation pipeline.
prev: FrameQueue<DAP, CP>,
/// Chain provider.
chain_provider: CP,
}

impl<DAP, CP> ChannelBank<DAP, CP>
where
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// Create a new [ChannelBank] stage.
pub fn new(cfg: RollupConfig, prev: FrameQueue<DAP, CP>, chain_provider: CP) -> Self {
Self {
cfg,
channels: HashMap::new(),
channel_queue: VecDeque::new(),
prev,
chain_provider,
}
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE].
pub fn prune(&mut self) -> StageResult<()> {
// Check total size
let mut total_size = self.channels.iter().fold(0, |acc, (_, c)| acc + c.size());
// Prune until it is reasonable again. The high-priority channel failed to be read,
// so we prune from there.
while total_size > MAX_CHANNEL_BANK_SIZE {
let id = self
.channel_queue
.pop_front()
.ok_or(anyhow!("No channel to prune"))?;
let channel = self
.channels
.remove(&id)
.ok_or(anyhow!("Could not find channel"))?;
total_size -= channel.size();
}
Ok(())
}

/// Adds new L1 data to the channel bank. Should only be called after all data has been read.
pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> {
let origin = *self.origin().ok_or(anyhow!("No origin"))?;

let current_channel = self.channels.entry(frame.id).or_insert_with(|| {
// Create a new channel
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
channel
});

// Check if the channel is not timed out. If it has, ignore the frame.
if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number {
return Ok(());
}

// Ingest the frame. If it fails, ignore the frame.
if current_channel.add_frame(frame, origin).is_err() {
return Ok(());
}

self.prune()
}

/// Read the raw data of the first channel, if it's timed-out or closed.
///
/// Returns an error if there is nothing new to read.
pub fn read(&mut self) -> StageResult<Option<Bytes>> {
// Bail if there are no channels to read from.
if self.channel_queue.is_empty() {
return Err(StageError::Eof);
}

// Return an `Ok(None)` if the first channel is timed out. There may be more timed
// out channels at the head of the queue and we want to remove them all.
let first = self.channel_queue[0];
let channel = self
.channels
.get(&first)
.ok_or(anyhow!("Channel not found"))?;
let origin = self.origin().ok_or(anyhow!("No origin present"))?;

if channel.open_block_number() + self.cfg.channel_timeout < origin.number {
self.channels.remove(&first);
self.channel_queue.pop_front();
return Ok(None);
}

// At the point we have removed all timed out channels from the front of the `channel_queue`.
// Pre-Canyon we simply check the first index.
// Post-Canyon we read the entire channelQueue for the first ready channel. If no channel is
// available, we return `nil, io.EOF`.
// Canyon is activated when the first L1 block whose time >= CanyonTime, not on the L2 timestamp.
if !self.cfg.is_canyon_active(origin.timestamp) {
return self.try_read_channel_at_index(0).map(Some);
}

let channel_data =
(0..self.channel_queue.len()).find_map(|i| self.try_read_channel_at_index(i).ok());
match channel_data {
Some(data) => Ok(Some(data)),
None => Err(StageError::Eof),
}
}

/// Pulls the next piece of data from the channel bank. Note that it attempts to pull data out of the channel bank prior to
/// loading data in (unlike most other stages). This is to ensure maintain consistency around channel bank pruning which depends upon the order
/// of operations.
pub async fn next_data(&mut self) -> StageResult<Option<Bytes>> {
match self.read() {
Err(StageError::Eof) => {
// continue - we will attempt to load data into the channel bank
}
Err(e) => {
return Err(anyhow!("Error fetching next data from channel bank: {:?}", e).into());
}
data => return data,
};

// Load the data into the channel bank
let frame = self.prev.next_frame().await?;
self.ingest_frame(frame);
Err(StageError::NotEnoughData)
}

/// Attempts to read the channel at the specified index. If the channel is not ready or timed out,
/// it will return an error.
/// If the channel read was successful, it will remove the channel from the channel queue.
fn try_read_channel_at_index(&mut self, index: usize) -> StageResult<Bytes> {
let channel_id = self.channel_queue[index];
let channel = self
.channels
.get(&channel_id)
.ok_or(anyhow!("Channel not found"))?;
let origin = self.origin().ok_or(anyhow!("No origin present"))?;

let timed_out = channel.open_block_number() + self.cfg.channel_timeout < origin.number;
if timed_out || !channel.is_ready() {
return Err(StageError::Eof);
}

let frame_data = channel.frame_data();
self.channels.remove(&channel_id);
self.channel_queue.remove(index);

frame_data.map_err(StageError::Custom)
}
}

#[async_trait]
impl<DAP, CP> ResettableStage for ChannelBank<DAP, CP>
where
DAP: DataAvailabilityProvider + Send,
CP: ChainProvider + Send,
{
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
self.channels.clear();
self.channel_queue = VecDeque::with_capacity(10);
Err(StageError::Eof)
}
}
Loading

0 comments on commit d0e92b8

Please sign in to comment.