From 8c18dec928d1d4309b52acf6570fcc497c04accd Mon Sep 17 00:00:00 2001 From: clabby Date: Wed, 3 Apr 2024 04:12:20 -0400 Subject: [PATCH] chore(derive): Channel reader tests + fixes, batch type fixes --- crates/derive/src/stages/channel_reader.rs | 52 +++++++++++++++++-- crates/derive/src/types/batch/batch_type.rs | 4 +- crates/derive/src/types/batch/mod.rs | 12 ++--- crates/derive/src/types/batch/single_batch.rs | 31 ++++++++++- .../derive/src/types/batch/span_batch/mod.rs | 3 -- .../derive/src/types/batch/span_batch/raw.rs | 8 +-- crates/derive/src/types/errors.rs | 5 ++ crates/derive/src/types/mod.rs | 1 - 8 files changed, 94 insertions(+), 22 deletions(-) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 05c2edc78..abebafbda 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -8,7 +8,7 @@ use crate::{ use alloc::vec::Vec; use anyhow::anyhow; use core::fmt::Debug; -use miniz_oxide::inflate::decompress_to_vec; +use miniz_oxide::inflate::decompress_to_vec_zlib; /// [ChannelReader] is a stateful stage that does the following: #[derive(Debug)] @@ -45,7 +45,7 @@ where match self .next_batch .as_mut() - .unwrap() + .expect("Cannot be None") .next_batch() .ok_or(StageError::NotEnoughData) { @@ -88,15 +88,26 @@ pub(crate) struct BatchReader { data: Option>, /// Decompressed data. decompressed: Vec, + /// The current cursor in the `decompressed` data. + cursor: usize, } impl BatchReader { /// Pulls out the next batch from the reader. pub(crate) fn next_batch(&mut self) -> Option { + // If the data is not already decompressed, decompress it. if let Some(data) = self.data.take() { - self.decompressed = decompress_to_vec(&data).ok()?; + let decompressed_data = decompress_to_vec_zlib(&data).ok()?; + self.decompressed = decompressed_data; } - let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?; + + // Decompress and RLP decode the batch data, before finally decoding the batch itself. + let mut decompressed_reader = self.decompressed.as_slice(); + let batch = Batch::decode(&mut decompressed_reader).ok()?; + + // Advance the cursor on the reader. + self.cursor += self.decompressed.len() - decompressed_reader.len(); + Some(batch) } } @@ -106,6 +117,39 @@ impl From<&[u8]> for BatchReader { Self { data: Some(data.to_vec()), decompressed: Vec::new(), + cursor: 0, + } + } +} + +impl From> for BatchReader { + fn from(data: Vec) -> Self { + Self { + data: Some(data), + decompressed: Vec::new(), + cursor: 0, } } } + +#[cfg(test)] +mod test { + use crate::{stages::channel_reader::BatchReader, types::BatchType}; + use alloc::vec; + use miniz_oxide::deflate::compress_to_vec_zlib; + + // TODO(clabby): More tests here for multiple batches, integration w/ channel bank, etc. + + #[test] + fn test_batch_reader() { + let raw_data = include_bytes!("../../testdata/raw_batch.hex"); + let mut typed_data = vec![BatchType::Span as u8]; + typed_data.extend_from_slice(raw_data.as_slice()); + + let compressed_raw_data = compress_to_vec_zlib(typed_data.as_slice(), 5); + let mut reader = BatchReader::from(compressed_raw_data); + reader.next_batch().unwrap(); + + assert_eq!(reader.cursor, typed_data.len()); + } +} diff --git a/crates/derive/src/types/batch/batch_type.rs b/crates/derive/src/types/batch/batch_type.rs index 5de2c8aad..8fea6f5ff 100644 --- a/crates/derive/src/types/batch/batch_type.rs +++ b/crates/derive/src/types/batch/batch_type.rs @@ -3,10 +3,10 @@ use alloy_rlp::{Decodable, Encodable}; /// The single batch type identifier. -pub(crate) const SINGLE_BATCH_TYPE: u8 = 0x01; +pub(crate) const SINGLE_BATCH_TYPE: u8 = 0x00; /// The span batch type identifier. -pub(crate) const SPAN_BATCH_TYPE: u8 = 0x02; +pub(crate) const SPAN_BATCH_TYPE: u8 = 0x01; /// The Batch Type. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/derive/src/types/batch/mod.rs b/crates/derive/src/types/batch/mod.rs index 6b95523d6..8af807815 100644 --- a/crates/derive/src/types/batch/mod.rs +++ b/crates/derive/src/types/batch/mod.rs @@ -12,7 +12,7 @@ pub use span_batch::{ RawSpanBatch, SpanBatch, SpanBatchBits, SpanBatchBuilder, SpanBatchEip1559TransactionData, SpanBatchEip2930TransactionData, SpanBatchElement, SpanBatchError, SpanBatchLegacyTransactionData, SpanBatchPayload, SpanBatchPrefix, SpanBatchTransactionData, - SpanBatchTransactions, SpanDecodingError, MAX_SPAN_BATCH_SIZE, SPAN_BATCH_TYPE, + SpanBatchTransactions, SpanDecodingError, MAX_SPAN_BATCH_SIZE, }; mod single_batch; @@ -20,11 +20,12 @@ pub use single_batch::SingleBatch; /// A Batch. #[derive(Debug, Clone, PartialEq, Eq)] +#[allow(clippy::large_enum_variant)] pub enum Batch { /// A single batch Single(SingleBatch), /// Span Batches - Span(SpanBatch), + Span(RawSpanBatch), } impl Batch { @@ -35,9 +36,7 @@ impl Batch { single_batch.encode(w); Ok(()) } - Self::Span(_span_batch) => { - unimplemented!() - } + Self::Span(span_batch) => span_batch.encode(w).map_err(DecodeError::SpanBatchError), } } @@ -57,7 +56,8 @@ impl Batch { Ok(Batch::Single(single_batch)) } BatchType::Span => { - unimplemented!() + let span_batch = RawSpanBatch::decode(r).map_err(DecodeError::SpanBatchError)?; + Ok(Batch::Span(span_batch)) } } } diff --git a/crates/derive/src/types/batch/single_batch.rs b/crates/derive/src/types/batch/single_batch.rs index a83eb6c64..1c9015b0b 100644 --- a/crates/derive/src/types/batch/single_batch.rs +++ b/crates/derive/src/types/batch/single_batch.rs @@ -3,10 +3,10 @@ use crate::types::RawTransaction; use alloc::vec::Vec; use alloy_primitives::BlockHash; -use alloy_rlp::{RlpDecodable, RlpEncodable}; +use alloy_rlp::{Decodable, Encodable}; /// Represents a single batch: a single encoded L2 block -#[derive(Debug, Default, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct SingleBatch { /// Block hash of the previous L2 block. `B256::ZERO` if it has not been set by the Batch Queue. pub parent_hash: BlockHash, @@ -29,6 +29,33 @@ impl SingleBatch { } } +impl Encodable for SingleBatch { + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + self.parent_hash.encode(out); + self.epoch_num.encode(out); + self.epoch_hash.encode(out); + self.timestamp.encode(out); + self.transactions.encode(out); + } +} + +impl Decodable for SingleBatch { + fn decode(rlp: &mut &[u8]) -> alloy_rlp::Result { + let parent_hash = BlockHash::decode(rlp)?; + let epoch_num = u64::decode(rlp)?; + let epoch_hash = BlockHash::decode(rlp)?; + let timestamp = u64::decode(rlp)?; + let transactions = Vec::::decode(rlp)?; + Ok(Self { + parent_hash, + epoch_num, + epoch_hash, + timestamp, + transactions, + }) + } +} + #[cfg(test)] mod test { use super::SingleBatch; diff --git a/crates/derive/src/types/batch/span_batch/mod.rs b/crates/derive/src/types/batch/span_batch/mod.rs index 28d17c5e4..11cf60401 100644 --- a/crates/derive/src/types/batch/span_batch/mod.rs +++ b/crates/derive/src/types/batch/span_batch/mod.rs @@ -12,9 +12,6 @@ use crate::MAX_RLP_BYTES_PER_CHANNEL; -/// The span batch type -pub const SPAN_BATCH_TYPE: u8 = 0x01; - /// The maximum amount of bytes that will be needed to decode every span /// batch field. This value cannot be larger than [MAX_RLP_BYTES_PER_CHANNEL] /// because single batch cannot be larger than channel size. diff --git a/crates/derive/src/types/batch/span_batch/raw.rs b/crates/derive/src/types/batch/span_batch/raw.rs index ffbd7a678..aef202957 100644 --- a/crates/derive/src/types/batch/span_batch/raw.rs +++ b/crates/derive/src/types/batch/span_batch/raw.rs @@ -3,8 +3,8 @@ use alloc::vec::Vec; use crate::types::{ - RawTransaction, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix, SpanDecodingError, - SPAN_BATCH_TYPE, + BatchType, RawTransaction, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix, + SpanDecodingError, }; use super::{SpanBatch, SpanBatchError}; @@ -20,8 +20,8 @@ pub struct RawSpanBatch { impl RawSpanBatch { /// Returns the batch type - pub fn get_batch_type(&self) -> u8 { - SPAN_BATCH_TYPE + pub fn get_batch_type(&self) -> BatchType { + BatchType::Span } /// Encodes the [RawSpanBatch] into a writer. diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index b5659922c..114a94954 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -2,6 +2,8 @@ use core::fmt::Display; +use super::SpanBatchError; + /// An error that is thrown within the stages of the derivation pipeline. #[derive(Debug)] pub enum StageError { @@ -51,6 +53,8 @@ pub enum DecodeError { EmptyBuffer, /// Alloy RLP Encoding Error. AlloyRlpError(alloy_rlp::Error), + /// Span Batch Error. + SpanBatchError(SpanBatchError), } impl From for DecodeError { @@ -74,6 +78,7 @@ impl Display for DecodeError { match self { DecodeError::EmptyBuffer => write!(f, "Empty buffer"), DecodeError::AlloyRlpError(e) => write!(f, "Alloy RLP Decoding Error: {}", e), + DecodeError::SpanBatchError(e) => write!(f, "Span Batch Decoding Error: {:?}", e), } } } diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 97cf28c3a..fe5cbb87e 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -19,7 +19,6 @@ pub use batch::{ SpanBatchEip1559TransactionData, SpanBatchEip2930TransactionData, SpanBatchElement, SpanBatchError, SpanBatchLegacyTransactionData, SpanBatchPayload, SpanBatchPrefix, SpanBatchTransactionData, SpanBatchTransactions, SpanDecodingError, MAX_SPAN_BATCH_SIZE, - SPAN_BATCH_TYPE, }; mod alloy;