Skip to content

Commit

Permalink
feat(derive): Channel Reader Implementation (#65)
Browse files Browse the repository at this point in the history
* feat(derive): batch type for the channel reader

* fix(derive): batch type lints

* feat(derive): channel reader implementation with batch reader

* fix(derive): channel bank impl

* Update crates/derive/src/types/batch_type.rs

Co-authored-by: clabby <[email protected]>

* fix(derive): channel reader fixes

* fix(derive): revert unfurrling change

* fix(derive): batch decoding

---------

Co-authored-by: clabby <[email protected]>
  • Loading branch information
refcell and clabby authored Apr 3, 2024
1 parent 7684308 commit 13ddfb9
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ alloy-sol-types = { version = "0.6.3", default-features = false }
async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }

# Optional
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ where
pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> {
let origin = *self.origin().ok_or(anyhow!("No origin"))?;

// Get the channel for the frame, or create a new one if it doesn't exist.
let current_channel = self.channels.entry(frame.id).or_insert_with(|| {
// Create a new channel
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
channel
Expand Down
110 changes: 110 additions & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
@@ -1 +1,111 @@
//! This module contains the `ChannelReader` struct.
use super::channel_bank::ChannelBank;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider},
types::{Batch, BlockInfo, StageError, StageResult},
};
use alloc::vec::Vec;
use anyhow::anyhow;
use core::fmt::Debug;
use miniz_oxide::inflate::decompress_to_vec;

/// [ChannelReader] is a stateful stage that does the following:
#[derive(Debug)]
pub struct ChannelReader<DAP, CP>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
{
/// The previous stage of the derivation pipeline.
prev: ChannelBank<DAP, CP>,
/// The batch reader.
next_batch: Option<BatchReader>,
}

impl<DAP, CP> ChannelReader<DAP, CP>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
{
/// Create a new [ChannelReader] stage.
pub fn new(prev: ChannelBank<DAP, CP>) -> Self {
Self {
prev,
next_batch: None,
}
}

/// Pulls out the next Batch from the available channel.
pub async fn next_batch(&mut self) -> StageResult<Batch> {
if let Err(e) = self.set_batch_reader().await {
self.next_channel();
return Err(e);
}
match self
.next_batch
.as_mut()
.unwrap()
.next_batch()
.ok_or(StageError::NotEnoughData)
{
Ok(batch) => Ok(batch),
Err(e) => {
self.next_channel();
Err(e)
}
}
}

/// Creates the batch reader from available channel data.
async fn set_batch_reader(&mut self) -> StageResult<()> {
if self.next_batch.is_none() {
let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?;
self.next_batch = Some(BatchReader::from(&channel[..]));
}
Ok(())
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Forces the read to continue with the next channel, resetting any
/// decoding / decompression state to a fresh start.
pub fn next_channel(&mut self) {
self.next_batch = None;
}
}

/// Batch Reader provides a function that iteratively consumes batches from the reader.
/// The L1Inclusion block is also provided at creation time.
/// Warning: the batch reader can read every batch-type.
/// The caller of the batch-reader should filter the results.
#[derive(Debug)]
pub(crate) struct BatchReader {
/// The raw data to decode.
data: Option<Vec<u8>>,
/// Decompressed data.
decompressed: Vec<u8>,
}

impl BatchReader {
/// Pulls out the next batch from the reader.
pub(crate) fn next_batch(&mut self) -> Option<Batch> {
if let Some(data) = self.data.take() {
self.decompressed = decompress_to_vec(&data).ok()?;
}
let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?;
Some(batch)
}
}

impl From<&[u8]> for BatchReader {
fn from(data: &[u8]) -> Self {
Self {
data: Some(data.to_vec()),
decompressed: Vec::new(),
}
}
}
4 changes: 3 additions & 1 deletion crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub use frame_queue::FrameQueue;
mod channel_bank;
pub use channel_bank::ChannelBank;

mod batch_queue;
mod channel_reader;
pub use channel_reader::ChannelReader;

mod batch_queue;
mod engine_queue;
mod payload_derivation;
40 changes: 40 additions & 0 deletions crates/derive/src/types/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! This module contains the enumerable [Batch].
use super::batch_type::BatchType;
use super::single_batch::SingleBatch;
use crate::types::errors::DecodeError;

use alloy_rlp::Decodable;

// TODO: replace this with a span batch
/// Span Batch.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SpanBatch {}

/// A Batch.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Batch {
/// A single batch
Single(SingleBatch),
/// Span Batches
Span(SpanBatch),
}

impl Batch {
/// Attempts to decode a batch from a byte slice.
pub fn decode(r: &mut &[u8]) -> Result<Self, DecodeError> {
if r.is_empty() {
return Err(DecodeError::EmptyBuffer);
}
match BatchType::from(r[0]) {
BatchType::Single => {
let single_batch = SingleBatch::decode(r)?;
Ok(Batch::Single(single_batch))
}
BatchType::Span => {
// TODO: implement span batch decoding
unimplemented!()
}
}
}
}
67 changes: 67 additions & 0 deletions crates/derive/src/types/batch_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Contains the [BatchType] and its encodings.
use alloy_rlp::{Decodable, Encodable};

/// The single batch type identifier.
pub(crate) const SINGLE_BATCH_TYPE: u8 = 0x01;

/// The span batch type identifier.
pub(crate) const SPAN_BATCH_TYPE: u8 = 0x02;

/// The Batch Type.
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum BatchType {
/// Single Batch.
Single = SINGLE_BATCH_TYPE,
/// Span Batch.
Span = SPAN_BATCH_TYPE,
}

impl From<u8> for BatchType {
fn from(val: u8) -> Self {
match val {
SINGLE_BATCH_TYPE => BatchType::Single,
SPAN_BATCH_TYPE => BatchType::Span,
_ => panic!("Invalid batch type"),
}
}
}

impl From<&[u8]> for BatchType {
fn from(buf: &[u8]) -> Self {
BatchType::from(buf[0])
}
}

impl Encodable for BatchType {
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
let val = match self {
BatchType::Single => SINGLE_BATCH_TYPE,
BatchType::Span => SPAN_BATCH_TYPE,
};
val.encode(out);
}
}

impl Decodable for BatchType {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let val = u8::decode(buf)?;
Ok(BatchType::from(val))
}
}

#[cfg(test)]
mod test {
use super::*;
use alloc::vec::Vec;

#[test]
fn test_batch_type() {
let batch_type = BatchType::Single;
let mut buf = Vec::new();
batch_type.encode(&mut buf);
let decoded = BatchType::decode(&mut buf.as_slice()).unwrap();
assert_eq!(batch_type, decoded);
}
}
34 changes: 34 additions & 0 deletions crates/derive/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,37 @@ impl Display for StageError {
}
}
}

/// A decoding error.
#[derive(Debug)]
pub enum DecodeError {
/// The buffer is empty.
EmptyBuffer,
/// Alloy RLP Encoding Error.
AlloyRlpError(alloy_rlp::Error),
}

impl From<alloy_rlp::Error> for DecodeError {
fn from(e: alloy_rlp::Error) -> Self {
DecodeError::AlloyRlpError(e)
}
}

impl PartialEq<DecodeError> for DecodeError {
fn eq(&self, other: &DecodeError) -> bool {
matches!(
(self, other),
(DecodeError::EmptyBuffer, DecodeError::EmptyBuffer)
| (DecodeError::AlloyRlpError(_), DecodeError::AlloyRlpError(_))
)
}
}

impl Display for DecodeError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
DecodeError::EmptyBuffer => write!(f, "Empty buffer"),
DecodeError::AlloyRlpError(e) => write!(f, "Alloy RLP Decoding Error: {}", e),
}
}
}
8 changes: 7 additions & 1 deletion crates/derive/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
use alloc::vec::Vec;
use alloy_rlp::{Decodable, Encodable};

mod batch;
pub use batch::Batch;

mod batch_type;
pub use batch_type::BatchType;

mod system_config;
pub use system_config::{
SystemAccounts, SystemConfig, SystemConfigUpdateType, CONFIG_UPDATE_EVENT_VERSION_0,
Expand Down Expand Up @@ -43,7 +49,7 @@ mod channel;
pub use channel::Channel;

mod errors;
pub use errors::{StageError, StageResult};
pub use errors::{DecodeError, StageError, StageResult};

mod single_batch;
pub use single_batch::SingleBatch;
Expand Down

0 comments on commit 13ddfb9

Please sign in to comment.