diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 72648e5f1..907a3fb7b 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -159,7 +159,7 @@ where l2_chain_provider.clone(), chain_provider.clone(), ); - let dap = EthereumDataSource::new(chain_provider.clone(), blob_provider, &cfg); + let dap = EthereumDataSource::new_from_parts(chain_provider.clone(), blob_provider, &cfg); let pipeline = PipelineBuilder::new() .rollup_config(cfg) diff --git a/book/src/sdk/pipeline/providers.md b/book/src/sdk/pipeline/providers.md index bb2de1c7d..1b47ec268 100644 --- a/book/src/sdk/pipeline/providers.md +++ b/book/src/sdk/pipeline/providers.md @@ -79,22 +79,11 @@ pub struct ExampleAvail { // Place your data in here } -#[async_trait] -impl AsyncIterator for ExampleDataIterator { - type Item = Bytes; - - async fn next(&mut self) -> PipelineResult { - todo!("return the next bytes") - } -} - - #[async_trait] impl DataAvailabilityProvider for ExampleAvail { type Item = Bytes; - type DataIter = ExampleDataIterator; - async fn open_data(&self, block_ref: &BlockInfo) -> PipelineResult { + async fn open_data(&self, block_ref: &BlockInfo) -> PipelineResult { todo!("return an AsyncIterator implementation here") } } diff --git a/crates/derive-alloy/README.md b/crates/derive-alloy/README.md index 34fa5170b..1a6f70120 100644 --- a/crates/derive-alloy/README.md +++ b/crates/derive-alloy/README.md @@ -31,7 +31,7 @@ let beacon_client = OnlineBeaconClient::new_http("http://127.0.0.1:5555".into()) let blob_provider = OnlineBlobProvider::new(beacon_client, None, None); let blob_provider = OnlineBlobProviderWithFallback::new(blob_provider, None); let dap_source = - EthereumDataSource::new(chain_provider.clone(), blob_provider, &rollup_config); + EthereumDataSource::new_from_parts(chain_provider.clone(), blob_provider, &rollup_config); let builder = StatefulAttributesBuilder::new( rollup_config.clone(), l2_chain_provider.clone(), diff --git a/crates/derive-alloy/src/pipeline.rs b/crates/derive-alloy/src/pipeline.rs index d284b0a83..096051815 100644 --- a/crates/derive-alloy/src/pipeline.rs +++ b/crates/derive-alloy/src/pipeline.rs @@ -84,8 +84,11 @@ mod tests { let beacon_client = OnlineBeaconClient::new_http("http://127.0.0.1:5555".into()); let blob_provider = OnlineBlobProvider::new(beacon_client, None, None); let blob_provider = OnlineBlobProviderWithFallback::new(blob_provider, None); - let dap_source = - EthereumDataSource::new(chain_provider.clone(), blob_provider, &rollup_config); + let dap_source = EthereumDataSource::new_from_parts( + chain_provider.clone(), + blob_provider, + &rollup_config, + ); let builder = StatefulAttributesBuilder::new( rollup_config.clone(), l2_chain_provider.clone(), diff --git a/crates/derive/src/sources/blob_data.rs b/crates/derive/src/sources/blob_data.rs new file mode 100644 index 000000000..20d100416 --- /dev/null +++ b/crates/derive/src/sources/blob_data.rs @@ -0,0 +1,287 @@ +//! Contains the `BlobData` struct. + +use crate::errors::BlobDecodingError; +use alloc::{boxed::Box, vec}; +use alloy_eips::eip4844::{Blob, BYTES_PER_BLOB, VERSIONED_HASH_VERSION_KZG}; +use alloy_primitives::Bytes; + +/// The blob encoding version +pub(crate) const BLOB_ENCODING_VERSION: u8 = 0; + +/// Maximum blob data size +pub(crate) const BLOB_MAX_DATA_SIZE: usize = (4 * 31 + 3) * 1024 - 4; // 130044 + +/// Blob Encoding/Decoding Rounds +pub(crate) const BLOB_ENCODING_ROUNDS: usize = 1024; + +/// The Blob Data +#[derive(Default, Clone, Debug)] +pub struct BlobData { + /// The blob data + pub(crate) data: Option, + /// The calldata + pub(crate) calldata: Option, +} + +impl BlobData { + /// Decodes the blob into raw byte data. + /// Returns a [BlobDecodingError] if the blob is invalid. + pub(crate) fn decode(&self) -> Result { + let data = self.data.as_ref().ok_or(BlobDecodingError::MissingData)?; + + // Validate the blob encoding version + if data[VERSIONED_HASH_VERSION_KZG as usize] != BLOB_ENCODING_VERSION { + return Err(BlobDecodingError::InvalidEncodingVersion); + } + + // Decode the 3 byte big endian length value into a 4 byte integer + let length = u32::from_be_bytes([0, data[2], data[3], data[4]]) as usize; + + // Validate the length + if length > BLOB_MAX_DATA_SIZE { + return Err(BlobDecodingError::InvalidLength); + } + + // Round 0 copies the remaining 27 bytes of the first field element + let mut output = vec![0u8; BLOB_MAX_DATA_SIZE]; + output[0..27].copy_from_slice(&data[5..32]); + + // Process the remaining 3 field elements to complete round 0 + let mut output_pos = 28; + let mut input_pos = 32; + let mut encoded_byte = [0u8; 4]; + encoded_byte[0] = data[0]; + + for b in encoded_byte.iter_mut().skip(1) { + let (enc, opos, ipos) = + self.decode_field_element(output_pos, input_pos, &mut output)?; + *b = enc; + output_pos = opos; + input_pos = ipos; + } + + // Reassemble the 4 by 6 bit encoded chunks into 3 bytes of output + output_pos = self.reassemble_bytes(output_pos, &encoded_byte, &mut output); + + // In each remaining round, decode 4 field elements (128 bytes) of the + // input into 127 bytes of output + for _ in 1..BLOB_ENCODING_ROUNDS { + // Break early if the output position is greater than the length + if output_pos >= length { + break; + } + + for d in &mut encoded_byte { + let (enc, opos, ipos) = + self.decode_field_element(output_pos, input_pos, &mut output)?; + *d = enc; + output_pos = opos; + input_pos = ipos; + } + output_pos = self.reassemble_bytes(output_pos, &encoded_byte, &mut output); + } + + // Validate the remaining bytes + for o in output.iter().skip(length) { + if *o != 0u8 { + return Err(BlobDecodingError::InvalidFieldElement); + } + } + + // Validate the remaining bytes + output.truncate(length); + for i in input_pos..BYTES_PER_BLOB { + if data[i] != 0 { + return Err(BlobDecodingError::InvalidFieldElement); + } + } + + Ok(Bytes::from(output)) + } + + /// Decodes the next input field element by writing its lower 31 bytes into its + /// appropriate place in the output and checking the high order byte is valid. + /// Returns a [BlobDecodingError] if a field element is seen with either of its + /// two high order bits set. + pub(crate) fn decode_field_element( + &self, + output_pos: usize, + input_pos: usize, + output: &mut [u8], + ) -> Result<(u8, usize, usize), BlobDecodingError> { + let Some(data) = self.data.as_ref() else { + return Err(BlobDecodingError::MissingData); + }; + + // two highest order bits of the first byte of each field element should always be 0 + if data[input_pos] & 0b1100_0000 != 0 { + return Err(BlobDecodingError::InvalidFieldElement); + } + output[output_pos..output_pos + 31].copy_from_slice(&data[input_pos + 1..input_pos + 32]); + Ok((data[input_pos], output_pos + 32, input_pos + 32)) + } + + /// Reassemble 4 by 6 bit encoded chunks into 3 bytes of output and place them in their + /// appropriate output positions. + pub(crate) fn reassemble_bytes( + &self, + mut output_pos: usize, + encoded_byte: &[u8], + output: &mut [u8], + ) -> usize { + output_pos -= 1; + let x = (encoded_byte[0] & 0b0011_1111) | ((encoded_byte[1] & 0b0011_0000) << 2); + let y = (encoded_byte[1] & 0b0000_1111) | ((encoded_byte[3] & 0b0000_1111) << 4); + let z = (encoded_byte[2] & 0b0011_1111) | ((encoded_byte[3] & 0b0011_0000) << 2); + output[output_pos - 32] = z; + output[output_pos - (32 * 2)] = y; + output[output_pos - (32 * 3)] = x; + output_pos + } + + /// Fills in the pointers to the fetched blob bodies. + /// There should be exactly one placeholder blobOrCalldata + /// element for each blob, otherwise an error is returned. + pub(crate) fn fill( + &mut self, + blobs: &[Box], + index: usize, + ) -> Result { + // Do not fill if there is calldata here + if self.calldata.is_some() { + return Ok(false); + } + + if index >= blobs.len() { + return Err(BlobDecodingError::InvalidLength); + } + + if blobs[index].is_empty() || blobs[index].is_zero() { + return Err(BlobDecodingError::MissingData); + } + + self.data = Some(Bytes::from(*blobs[index])); + Ok(true) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reassemble_bytes() { + let blob_data = BlobData::default(); + let mut output = vec![0u8; 128]; + let encoded_byte = [0x00, 0x00, 0x00, 0x00]; + let output_pos = blob_data.reassemble_bytes(127, &encoded_byte, &mut output); + assert_eq!(output_pos, 126); + assert_eq!(output, vec![0u8; 128]); + } + + #[test] + fn test_cannot_fill_empty_calldata() { + let mut blob_data = BlobData { calldata: Some(Bytes::new()), ..Default::default() }; + let blobs = vec![Box::new(Blob::with_last_byte(1u8))]; + assert_eq!(blob_data.fill(&blobs, 0), Ok(false)); + } + + #[test] + fn test_fill_oob_index() { + let mut blob_data = BlobData::default(); + let blobs = vec![Box::new(Blob::with_last_byte(1u8))]; + assert_eq!(blob_data.fill(&blobs, 1), Err(BlobDecodingError::InvalidLength)); + } + + #[test] + fn test_fill_zero_blob() { + let mut blob_data = BlobData::default(); + let blobs = vec![Box::new(Blob::ZERO)]; + assert_eq!(blob_data.fill(&blobs, 0), Err(BlobDecodingError::MissingData)); + } + + #[test] + fn test_fill_blob() { + let mut blob_data = BlobData::default(); + let blobs = vec![Box::new(Blob::with_last_byte(1u8))]; + assert_eq!(blob_data.fill(&blobs, 0), Ok(true)); + let expected = Bytes::from([&[0u8; 131071][..], &[1u8]].concat()); + assert_eq!(blob_data.data, Some(expected)); + } + + #[test] + fn test_blob_data_decode_missing_data() { + let blob_data = BlobData::default(); + assert_eq!(blob_data.decode(), Err(BlobDecodingError::MissingData)); + } + + #[test] + fn test_blob_data_decode_invalid_encoding_version() { + let blob_data = BlobData { data: Some(Bytes::from(vec![1u8; 32])), ..Default::default() }; + assert_eq!(blob_data.decode(), Err(BlobDecodingError::InvalidEncodingVersion)); + } + + #[test] + fn test_blob_data_decode_invalid_length() { + let mut data = vec![0u8; 32]; + data[VERSIONED_HASH_VERSION_KZG as usize] = BLOB_ENCODING_VERSION; + data[2] = 0xFF; + data[3] = 0xFF; + data[4] = 0xFF; + let blob_data = BlobData { data: Some(Bytes::from(data)), ..Default::default() }; + assert_eq!(blob_data.decode(), Err(BlobDecodingError::InvalidLength)); + } + + #[test] + fn test_blob_data_decode() { + let mut data = vec![0u8; alloy_eips::eip4844::BYTES_PER_BLOB]; + data[VERSIONED_HASH_VERSION_KZG as usize] = BLOB_ENCODING_VERSION; + data[2] = 0x00; + data[3] = 0x00; + data[4] = 0x01; + let blob_data = BlobData { data: Some(Bytes::from(data)), ..Default::default() }; + assert_eq!(blob_data.decode(), Ok(Bytes::from(vec![0u8; 1]))); + } + + #[test] + fn test_blob_data_decode_invalid_field_element() { + let mut data = vec![0u8; alloy_eips::eip4844::BYTES_PER_BLOB + 10]; + data[VERSIONED_HASH_VERSION_KZG as usize] = BLOB_ENCODING_VERSION; + data[2] = 0x00; + data[3] = 0x00; + data[4] = 0x01; + data[33] = 0x01; + let blob_data = BlobData { data: Some(Bytes::from(data)), ..Default::default() }; + assert_eq!(blob_data.decode(), Err(BlobDecodingError::InvalidFieldElement)); + } + + #[test] + fn test_decode_field_element_missing_data() { + let blob_data = BlobData::default(); + assert_eq!( + blob_data.decode_field_element(0, 0, &mut []), + Err(BlobDecodingError::MissingData) + ); + } + + #[test] + fn test_decode_field_element_invalid_field_element() { + let mut data = vec![0u8; 32]; + data[0] = 0b1100_0000; + let blob_data = BlobData { data: Some(Bytes::from(data)), ..Default::default() }; + assert_eq!( + blob_data.decode_field_element(0, 0, &mut []), + Err(BlobDecodingError::InvalidFieldElement) + ); + } + + #[test] + fn test_decode_field_element() { + let mut data = vec![0u8; 32]; + data[1..32].copy_from_slice(&[1u8; 31]); + let blob_data = BlobData { data: Some(Bytes::from(data)), ..Default::default() }; + let mut output = vec![0u8; 31]; + assert_eq!(blob_data.decode_field_element(0, 0, &mut output), Ok((0, 32, 32))); + assert_eq!(output, vec![1u8; 31]); + } +} diff --git a/crates/derive/src/sources/blob_hash.rs b/crates/derive/src/sources/blob_hash.rs new file mode 100644 index 000000000..18008936b --- /dev/null +++ b/crates/derive/src/sources/blob_hash.rs @@ -0,0 +1,45 @@ +//! Contains the `BlobHash` type and related types. + +use alloy_primitives::B256; + +/// A Blob hash +#[derive(Default, Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct IndexedBlobHash { + /// The index of the blob + pub index: usize, + /// The hash of the blob + pub hash: B256, +} + +impl PartialEq for IndexedBlobHash { + fn eq(&self, other: &Self) -> bool { + self.index == other.index && self.hash == other.hash + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_indexed_blob_hash() { + let hash = B256::from([1; 32]); + let indexed_blob_hash = IndexedBlobHash { index: 1, hash }; + + assert_eq!(indexed_blob_hash.index, 1); + assert_eq!(indexed_blob_hash.hash, hash); + } + + #[test] + #[cfg(feature = "serde")] + fn test_indexed_blob_hash_serde_roundtrip() { + let hash = B256::from([1; 32]); + let indexed_blob_hash = IndexedBlobHash { index: 1, hash }; + + let serialized = serde_json::to_string(&indexed_blob_hash).unwrap(); + let deserialized: IndexedBlobHash = serde_json::from_str(&serialized).unwrap(); + + assert_eq!(indexed_blob_hash, deserialized); + } +} diff --git a/crates/derive/src/sources/blobs.rs b/crates/derive/src/sources/blobs.rs index c20ddb0a2..46d778fca 100644 --- a/crates/derive/src/sources/blobs.rs +++ b/crates/derive/src/sources/blobs.rs @@ -1,194 +1,18 @@ //! Blob Data Source use crate::{ - errors::{BlobDecodingError, BlobProviderError, PipelineError}, - traits::{AsyncIterator, BlobProvider, ChainProvider}, + errors::{BlobProviderError, PipelineError}, + sources::{BlobData, IndexedBlobHash}, + traits::{BlobProvider, ChainProvider, DataAvailabilityProvider}, types::PipelineResult, }; -use alloc::{boxed::Box, string::ToString, vec, vec::Vec}; +use alloc::{boxed::Box, string::ToString, vec::Vec}; use alloy_consensus::{Transaction, TxEip4844Variant, TxEnvelope, TxType}; -use alloy_eips::eip4844::{Blob, BYTES_PER_BLOB, VERSIONED_HASH_VERSION_KZG}; -use alloy_primitives::{Address, Bytes, B256}; +use alloy_primitives::{Address, Bytes}; use async_trait::async_trait; use op_alloy_protocol::BlockInfo; use tracing::warn; -/// The blob encoding version -pub(crate) const BLOB_ENCODING_VERSION: u8 = 0; - -/// Maximum blob data size -pub(crate) const BLOB_MAX_DATA_SIZE: usize = (4 * 31 + 3) * 1024 - 4; // 130044 - -/// Blob Encoding/Decoding Rounds -pub(crate) const BLOB_ENCODING_ROUNDS: usize = 1024; - -/// A Blob hash -#[derive(Default, Clone, Debug)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct IndexedBlobHash { - /// The index of the blob - pub index: usize, - /// The hash of the blob - pub hash: B256, -} - -impl PartialEq for IndexedBlobHash { - fn eq(&self, other: &Self) -> bool { - self.index == other.index && self.hash == other.hash - } -} - -/// The Blob Data -#[derive(Default, Clone, Debug)] -pub struct BlobData { - /// The blob data - pub(crate) data: Option, - /// The calldata - pub(crate) calldata: Option, -} - -impl BlobData { - /// Decodes the blob into raw byte data. - /// Returns a [BlobDecodingError] if the blob is invalid. - pub(crate) fn decode(&self) -> Result { - let data = self.data.as_ref().ok_or(BlobDecodingError::MissingData)?; - - // Validate the blob encoding version - if data[VERSIONED_HASH_VERSION_KZG as usize] != BLOB_ENCODING_VERSION { - return Err(BlobDecodingError::InvalidEncodingVersion); - } - - // Decode the 3 byte big endian length value into a 4 byte integer - let length = u32::from_be_bytes([0, data[2], data[3], data[4]]) as usize; - - // Validate the length - if length > BLOB_MAX_DATA_SIZE { - return Err(BlobDecodingError::InvalidLength); - } - - // Round 0 copies the remaining 27 bytes of the first field element - let mut output = vec![0u8; BLOB_MAX_DATA_SIZE]; - output[0..27].copy_from_slice(&data[5..32]); - - // Process the remaining 3 field elements to complete round 0 - let mut output_pos = 28; - let mut input_pos = 32; - let mut encoded_byte = [0u8; 4]; - encoded_byte[0] = data[0]; - - for b in encoded_byte.iter_mut().skip(1) { - let (enc, opos, ipos) = - self.decode_field_element(output_pos, input_pos, &mut output)?; - *b = enc; - output_pos = opos; - input_pos = ipos; - } - - // Reassemble the 4 by 6 bit encoded chunks into 3 bytes of output - output_pos = self.reassemble_bytes(output_pos, &encoded_byte, &mut output); - - // In each remaining round, decode 4 field elements (128 bytes) of the - // input into 127 bytes of output - for _ in 1..BLOB_ENCODING_ROUNDS { - // Break early if the output position is greater than the length - if output_pos >= length { - break; - } - - for d in &mut encoded_byte { - let (enc, opos, ipos) = - self.decode_field_element(output_pos, input_pos, &mut output)?; - *d = enc; - output_pos = opos; - input_pos = ipos; - } - output_pos = self.reassemble_bytes(output_pos, &encoded_byte, &mut output); - } - - // Validate the remaining bytes - for o in output.iter().skip(length) { - if *o != 0u8 { - return Err(BlobDecodingError::InvalidFieldElement); - } - } - - // Validate the remaining bytes - output.truncate(length); - for i in input_pos..BYTES_PER_BLOB { - if data[i] != 0 { - return Err(BlobDecodingError::InvalidFieldElement); - } - } - - Ok(Bytes::from(output)) - } - - /// Decodes the next input field element by writing its lower 31 bytes into its - /// appropriate place in the output and checking the high order byte is valid. - /// Returns a [BlobDecodingError] if a field element is seen with either of its - /// two high order bits set. - pub(crate) fn decode_field_element( - &self, - output_pos: usize, - input_pos: usize, - output: &mut [u8], - ) -> Result<(u8, usize, usize), BlobDecodingError> { - let Some(data) = self.data.as_ref() else { - return Err(BlobDecodingError::MissingData); - }; - - // two highest order bits of the first byte of each field element should always be 0 - if data[input_pos] & 0b1100_0000 != 0 { - return Err(BlobDecodingError::InvalidFieldElement); - } - output[output_pos..output_pos + 31].copy_from_slice(&data[input_pos + 1..input_pos + 32]); - Ok((data[input_pos], output_pos + 32, input_pos + 32)) - } - - /// Reassemble 4 by 6 bit encoded chunks into 3 bytes of output and place them in their - /// appropriate output positions. - pub(crate) fn reassemble_bytes( - &self, - mut output_pos: usize, - encoded_byte: &[u8], - output: &mut [u8], - ) -> usize { - output_pos -= 1; - let x = (encoded_byte[0] & 0b0011_1111) | ((encoded_byte[1] & 0b0011_0000) << 2); - let y = (encoded_byte[1] & 0b0000_1111) | ((encoded_byte[3] & 0b0000_1111) << 4); - let z = (encoded_byte[2] & 0b0011_1111) | ((encoded_byte[3] & 0b0011_0000) << 2); - output[output_pos - 32] = z; - output[output_pos - (32 * 2)] = y; - output[output_pos - (32 * 3)] = x; - output_pos - } - - /// Fills in the pointers to the fetched blob bodies. - /// There should be exactly one placeholder blobOrCalldata - /// element for each blob, otherwise an error is returned. - pub(crate) fn fill( - &mut self, - blobs: &[Box], - index: usize, - ) -> Result { - // Do not fill if there is calldata here - if self.calldata.is_some() { - return Ok(false); - } - - if index >= blobs.len() { - return Err(BlobDecodingError::InvalidLength); - } - - if blobs[index].is_empty() { - return Err(BlobDecodingError::MissingData); - } - - self.data = Some(Bytes::from(*blobs[index])); - Ok(true) - } -} - /// A data iterator that reads from a blob. #[derive(Debug, Clone)] pub struct BlobSource @@ -202,8 +26,6 @@ where pub blob_fetcher: B, /// The address of the batcher contract. pub batcher_address: Address, - /// Block Ref - pub block_ref: BlockInfo, /// The L1 Signer. pub signer: Address, /// Data. @@ -222,14 +44,12 @@ where chain_provider: F, blob_fetcher: B, batcher_address: Address, - block_ref: BlockInfo, signer: Address, ) -> Self { Self { chain_provider, blob_fetcher, batcher_address, - block_ref, signer, data: Vec::new(), open: false, @@ -297,14 +117,14 @@ where } /// Loads blob data into the source if it is not open. - async fn load_blobs(&mut self) -> Result<(), BlobProviderError> { + async fn load_blobs(&mut self, block_ref: &BlockInfo) -> Result<(), BlobProviderError> { if self.open { return Ok(()); } let info = self .chain_provider - .block_info_and_transactions_by_hash(self.block_ref.hash) + .block_info_and_transactions_by_hash(block_ref.hash) .await .map_err(|e| BlobProviderError::Backend(e.to_string()))?; @@ -317,11 +137,10 @@ where return Ok(()); } - let blobs = - self.blob_fetcher.get_blobs(&self.block_ref, &blob_hashes).await.map_err(|e| { - warn!(target: "blob-source", "Failed to fetch blobs: {e}"); - BlobProviderError::Backend(e.to_string()) - })?; + let blobs = self.blob_fetcher.get_blobs(block_ref, &blob_hashes).await.map_err(|e| { + warn!(target: "blob-source", "Failed to fetch blobs: {e}"); + BlobProviderError::Backend(e.to_string()) + })?; // Fill the blob pointers. let mut blob_index = 0; @@ -354,15 +173,15 @@ where } #[async_trait] -impl AsyncIterator for BlobSource +impl DataAvailabilityProvider for BlobSource where - F: ChainProvider + Send, - B: BlobProvider + Send, + F: ChainProvider + Sync + Send, + B: BlobProvider + Sync + Send, { type Item = Bytes; - async fn next(&mut self) -> PipelineResult { - self.load_blobs().await?; + async fn next(&mut self, block_ref: &BlockInfo) -> PipelineResult { + self.load_blobs(block_ref).await?; let next_data = match self.next_data() { Ok(d) => d, @@ -378,10 +197,15 @@ where Ok(d) => Ok(d), Err(_) => { warn!(target: "blob-source", "Failed to decode blob data, skipping"); - self.next().await + self.next(block_ref).await } } } + + fn clear(&mut self) { + self.data.clear(); + self.open = false; + } } #[cfg(test)] @@ -393,98 +217,12 @@ pub(crate) mod tests { }; use alloy_rlp::Decodable; - #[test] - fn test_indexed_blob_hash() { - let hash = B256::from([1; 32]); - let indexed_blob_hash = IndexedBlobHash { index: 1, hash }; - - assert_eq!(indexed_blob_hash.index, 1); - assert_eq!(indexed_blob_hash.hash, hash); - } - - #[test] - #[cfg(feature = "serde")] - fn test_indexed_blob_hash_serde_roundtrip() { - let hash = B256::from([1; 32]); - let indexed_blob_hash = IndexedBlobHash { index: 1, hash }; - - let serialized = serde_json::to_string(&indexed_blob_hash).unwrap(); - let deserialized: IndexedBlobHash = serde_json::from_str(&serialized).unwrap(); - - assert_eq!(indexed_blob_hash, deserialized); - } - - #[test] - fn test_reassemble_bytes() { - let blob_data = BlobData::default(); - let mut output = vec![0u8; 128]; - let encoded_byte = [0x00, 0x00, 0x00, 0x00]; - let output_pos = blob_data.reassemble_bytes(127, &encoded_byte, &mut output); - assert_eq!(output_pos, 126); - assert_eq!(output, vec![0u8; 128]); - } - - #[test] - fn test_cannot_fill_empty_calldata() { - let mut blob_data = BlobData { calldata: Some(Bytes::new()), ..Default::default() }; - let blobs = vec![Box::new(Blob::with_last_byte(1u8))]; - assert_eq!(blob_data.fill(&blobs, 0), Ok(false)); - } - - #[test] - fn test_fill_oob_index() { - let mut blob_data = BlobData::default(); - let blobs = vec![Box::new(Blob::with_last_byte(1u8))]; - assert_eq!(blob_data.fill(&blobs, 1), Err(BlobDecodingError::InvalidLength)); - } - - #[test] - #[ignore] - fn test_fill_empty_blob() { - let mut blob_data = BlobData::default(); - let blobs = vec![Box::new(Blob::ZERO)]; - assert_eq!(blob_data.fill(&blobs, 0), Err(BlobDecodingError::MissingData)); - } - - #[test] - fn test_fill_blob() { - let mut blob_data = BlobData::default(); - let blobs = vec![Box::new(Blob::with_last_byte(1u8))]; - assert_eq!(blob_data.fill(&blobs, 0), Ok(true)); - let expected = Bytes::from([&[0u8; 131071][..], &[1u8]].concat()); - assert_eq!(blob_data.data, Some(expected)); - } - - #[test] - fn test_blob_data_decode_missing_data() { - let blob_data = BlobData::default(); - assert_eq!(blob_data.decode(), Err(BlobDecodingError::MissingData)); - } - - #[test] - fn test_blob_data_decode_invalid_encoding_version() { - let blob_data = BlobData { data: Some(Bytes::from(vec![1u8; 32])), ..Default::default() }; - assert_eq!(blob_data.decode(), Err(BlobDecodingError::InvalidEncodingVersion)); - } - - #[test] - fn test_blob_data_decode_invalid_length() { - let mut data = vec![0u8; 32]; - data[VERSIONED_HASH_VERSION_KZG as usize] = BLOB_ENCODING_VERSION; - data[2] = 0xFF; - data[3] = 0xFF; - data[4] = 0xFF; - let blob_data = BlobData { data: Some(Bytes::from(data)), ..Default::default() }; - assert_eq!(blob_data.decode(), Err(BlobDecodingError::InvalidLength)); - } - pub(crate) fn default_test_blob_source() -> BlobSource { let chain_provider = TestChainProvider::default(); let blob_fetcher = TestBlobProvider::default(); let batcher_address = Address::default(); - let block_ref = BlockInfo::default(); let signer = Address::default(); - BlobSource::new(chain_provider, blob_fetcher, batcher_address, block_ref, signer) + BlobSource::new(chain_provider, blob_fetcher, batcher_address, signer) } pub(crate) fn valid_blob_txs() -> Vec { @@ -498,13 +236,16 @@ pub(crate) mod tests { async fn test_load_blobs_open() { let mut source = default_test_blob_source(); source.open = true; - assert!(source.load_blobs().await.is_ok()); + assert!(source.load_blobs(&BlockInfo::default()).await.is_ok()); } #[tokio::test] async fn test_load_blobs_chain_provider_err() { let mut source = default_test_blob_source(); - assert!(matches!(source.load_blobs().await, Err(BlobProviderError::Backend(_)))); + assert!(matches!( + source.load_blobs(&BlockInfo::default()).await, + Err(BlobProviderError::Backend(_)) + )); } #[tokio::test] @@ -513,7 +254,7 @@ pub(crate) mod tests { let block_info = BlockInfo::default(); source.chain_provider.insert_block_with_transactions(0, block_info, Vec::new()); assert!(!source.open); // Source is not open by default. - assert!(source.load_blobs().await.is_ok()); + assert!(source.load_blobs(&BlockInfo::default()).await.is_ok()); assert!(source.data.is_empty()); assert!(source.open); } @@ -528,7 +269,10 @@ pub(crate) mod tests { let txs = valid_blob_txs(); source.blob_fetcher.should_error = true; source.chain_provider.insert_block_with_transactions(1, block_info, txs); - assert!(matches!(source.load_blobs().await, Err(BlobProviderError::Backend(_)))); + assert!(matches!( + source.load_blobs(&BlockInfo::default()).await, + Err(BlobProviderError::Backend(_)) + )); } #[tokio::test] @@ -562,7 +306,7 @@ pub(crate) mod tests { for hash in hashes { source.blob_fetcher.insert_blob(hash, Blob::with_last_byte(1u8)); } - source.load_blobs().await.unwrap(); + source.load_blobs(&BlockInfo::default()).await.unwrap(); assert!(source.open); assert!(!source.data.is_empty()); } @@ -572,7 +316,7 @@ pub(crate) mod tests { let mut source = default_test_blob_source(); source.open = true; - let err = source.next().await.unwrap_err(); + let err = source.next(&BlockInfo::default()).await.unwrap_err(); assert!(matches!(err, PipelineErrorKind::Temporary(PipelineError::Eof))); } @@ -582,7 +326,7 @@ pub(crate) mod tests { source.open = true; source.data.push(BlobData { data: None, calldata: Some(Bytes::default()) }); - let data = source.next().await.unwrap(); + let data = source.next(&BlockInfo::default()).await.unwrap(); assert_eq!(data, Bytes::default()); } @@ -592,15 +336,14 @@ pub(crate) mod tests { source.open = true; source.data.push(BlobData { data: Some(Bytes::from(&[1; 32])), calldata: None }); - let err = source.next().await.unwrap_err(); + let err = source.next(&BlockInfo::default()).await.unwrap_err(); assert!(matches!(err, PipelineErrorKind::Temporary(PipelineError::Eof))); } #[tokio::test] async fn test_blob_source_pipeline_error() { let mut source = default_test_blob_source(); - - let err = source.next().await.unwrap_err(); + let err = source.next(&BlockInfo::default()).await.unwrap_err(); assert!(matches!(err, PipelineErrorKind::Temporary(PipelineError::Provider(_)))); } } diff --git a/crates/derive/src/sources/calldata.rs b/crates/derive/src/sources/calldata.rs index b3af6ccc8..d937d576c 100644 --- a/crates/derive/src/sources/calldata.rs +++ b/crates/derive/src/sources/calldata.rs @@ -2,7 +2,7 @@ use crate::{ errors::PipelineError, - traits::{AsyncIterator, ChainProvider}, + traits::{ChainProvider, DataAvailabilityProvider}, types::PipelineResult, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -21,8 +21,6 @@ where pub chain_provider: CP, /// The batch inbox address. pub batch_inbox_address: Address, - /// Block Ref - pub block_ref: BlockInfo, /// The L1 Signer. pub signer: Address, /// Current calldata. @@ -33,30 +31,18 @@ where impl CalldataSource { /// Creates a new calldata source. - pub const fn new( - chain_provider: CP, - batch_inbox_address: Address, - block_ref: BlockInfo, - signer: Address, - ) -> Self { - Self { - chain_provider, - batch_inbox_address, - block_ref, - signer, - calldata: VecDeque::new(), - open: false, - } + pub const fn new(chain_provider: CP, batch_inbox_address: Address, signer: Address) -> Self { + Self { chain_provider, batch_inbox_address, signer, calldata: VecDeque::new(), open: false } } /// Loads the calldata into the source if it is not open. - async fn load_calldata(&mut self) -> Result<(), CP::Error> { + async fn load_calldata(&mut self, block_ref: &BlockInfo) -> Result<(), CP::Error> { if self.open { return Ok(()); } let (_, txs) = - self.chain_provider.block_info_and_transactions_by_hash(self.block_ref.hash).await?; + self.chain_provider.block_info_and_transactions_by_hash(block_ref.hash).await?; self.calldata = txs .iter() @@ -86,13 +72,18 @@ impl CalldataSource { } #[async_trait] -impl AsyncIterator for CalldataSource { +impl DataAvailabilityProvider for CalldataSource { type Item = Bytes; - async fn next(&mut self) -> PipelineResult { - self.load_calldata().await.map_err(Into::into)?; + async fn next(&mut self, block_ref: &BlockInfo) -> PipelineResult { + self.load_calldata(block_ref).await.map_err(Into::into)?; self.calldata.pop_front().ok_or(PipelineError::Eof.temp()) } + + fn clear(&mut self) { + self.calldata.clear(); + self.open = false; + } } #[cfg(test)] @@ -131,25 +122,30 @@ mod tests { } pub(crate) fn default_test_calldata_source() -> CalldataSource { - CalldataSource::new( - TestChainProvider::default(), - Default::default(), - BlockInfo::default(), - Default::default(), - ) + CalldataSource::new(TestChainProvider::default(), Default::default(), Default::default()) + } + + #[tokio::test] + async fn test_clear_calldata() { + let mut source = default_test_calldata_source(); + source.open = true; + source.calldata.push_back(Bytes::default()); + source.clear(); + assert!(source.calldata.is_empty()); + assert!(!source.open); } #[tokio::test] async fn test_load_calldata_open() { let mut source = default_test_calldata_source(); source.open = true; - assert!(source.load_calldata().await.is_ok()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_ok()); } #[tokio::test] async fn test_load_calldata_provider_err() { let mut source = default_test_calldata_source(); - assert!(source.load_calldata().await.is_err()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_err()); } #[tokio::test] @@ -158,7 +154,7 @@ mod tests { let block_info = BlockInfo::default(); source.chain_provider.insert_block_with_transactions(0, block_info, Vec::new()); assert!(!source.open); // Source is not open by default. - assert!(source.load_calldata().await.is_ok()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_ok()); assert!(source.calldata.is_empty()); assert!(source.open); } @@ -171,7 +167,7 @@ mod tests { let tx = test_legacy_tx(batch_inbox_address); source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]); assert!(!source.open); // Source is not open by default. - assert!(source.load_calldata().await.is_ok()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_ok()); assert!(source.calldata.is_empty()); assert!(source.open); } @@ -185,7 +181,7 @@ mod tests { let tx = test_legacy_tx(batch_inbox_address); source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]); assert!(!source.open); // Source is not open by default. - assert!(source.load_calldata().await.is_ok()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_ok()); assert!(source.calldata.is_empty()); assert!(source.open); } @@ -200,7 +196,7 @@ mod tests { let block_info = BlockInfo::default(); source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]); assert!(!source.open); // Source is not open by default. - assert!(source.load_calldata().await.is_ok()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_ok()); assert!(!source.calldata.is_empty()); // Calldata is NOT empty. assert!(source.open); } @@ -215,7 +211,7 @@ mod tests { let block_info = BlockInfo::default(); source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]); assert!(!source.open); // Source is not open by default. - assert!(source.load_calldata().await.is_ok()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_ok()); assert!(!source.calldata.is_empty()); // Calldata is NOT empty. assert!(source.open); } @@ -230,7 +226,7 @@ mod tests { let block_info = BlockInfo::default(); source.chain_provider.insert_block_with_transactions(0, block_info, vec![tx]); assert!(!source.open); // Source is not open by default. - assert!(source.load_calldata().await.is_ok()); + assert!(source.load_calldata(&BlockInfo::default()).await.is_ok()); assert!(source.calldata.is_empty()); assert!(source.open); } @@ -238,6 +234,9 @@ mod tests { #[tokio::test] async fn test_next_err_loading_calldata() { let mut source = default_test_calldata_source(); - assert!(matches!(source.next().await, Err(PipelineErrorKind::Temporary(_)))); + assert!(matches!( + source.next(&BlockInfo::default()).await, + Err(PipelineErrorKind::Temporary(_)) + )); } } diff --git a/crates/derive/src/sources/ethereum.rs b/crates/derive/src/sources/ethereum.rs index 307d2c084..22e01bcd5 100644 --- a/crates/derive/src/sources/ethereum.rs +++ b/crates/derive/src/sources/ethereum.rs @@ -2,53 +2,53 @@ //! [DataAvailabilityProvider] trait for the Ethereum protocol. use crate::{ - sources::{BlobSource, CalldataSource, EthereumDataSourceVariant}, + sources::{BlobSource, CalldataSource}, traits::{BlobProvider, ChainProvider, DataAvailabilityProvider}, types::PipelineResult, }; use alloc::{boxed::Box, fmt::Debug}; -use alloy_primitives::{Address, Bytes}; +use alloy_primitives::Bytes; use async_trait::async_trait; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::BlockInfo; /// A factory for creating an Ethereum data source provider. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct EthereumDataSource where C: ChainProvider + Send + Clone, - B: BlobProvider + Clone, + B: BlobProvider + Send + Clone, { - /// The chain provider to use for the factory. - pub chain_provider: C, - /// The blob provider - pub blob_provider: B, /// The ecotone timestamp. pub ecotone_timestamp: Option, - /// The L1 Signer. - pub signer: Address, - /// The batch inbox address. - pub batch_inbox_address: Address, + /// The blob source. + pub blob_source: BlobSource, + /// The calldata source. + pub calldata_source: CalldataSource, } impl EthereumDataSource where C: ChainProvider + Send + Clone + Debug, - B: BlobProvider + Clone + Debug, + B: BlobProvider + Send + Clone + Debug, { - /// Creates a new factory. - pub fn new(provider: C, blobs: B, cfg: &RollupConfig) -> Self { + /// Instantiates a new [EthereumDataSource]. + pub const fn new( + blob_source: BlobSource, + calldata_source: CalldataSource, + cfg: &RollupConfig, + ) -> Self { + Self { ecotone_timestamp: cfg.ecotone_time, blob_source, calldata_source } + } + + /// Instantiates a new [EthereumDataSource] from parts. + pub fn new_from_parts(provider: C, blobs: B, cfg: &RollupConfig) -> Self { + let signer = + cfg.genesis.system_config.as_ref().map(|sc| sc.batcher_address).unwrap_or_default(); Self { - chain_provider: provider, - blob_provider: blobs, ecotone_timestamp: cfg.ecotone_time, - signer: cfg - .genesis - .system_config - .as_ref() - .map(|sc| sc.batcher_address) - .unwrap_or_default(), - batch_inbox_address: cfg.batch_inbox_address, + blob_source: BlobSource::new(provider.clone(), blobs, cfg.batch_inbox_address, signer), + calldata_source: CalldataSource::new(provider, cfg.batch_inbox_address, signer), } } } @@ -60,69 +60,77 @@ where B: BlobProvider + Send + Sync + Clone + Debug, { type Item = Bytes; - type DataIter = EthereumDataSourceVariant; - async fn open_data(&self, block_ref: &BlockInfo) -> PipelineResult { + async fn next(&mut self, block_ref: &BlockInfo) -> PipelineResult { let ecotone_enabled = self.ecotone_timestamp.map(|e| block_ref.timestamp >= e).unwrap_or(false); if ecotone_enabled { - Ok(EthereumDataSourceVariant::Blob(BlobSource::new( - self.chain_provider.clone(), - self.blob_provider.clone(), - self.batch_inbox_address, - *block_ref, - self.signer, - ))) + self.blob_source.next(block_ref).await } else { - Ok(EthereumDataSourceVariant::Calldata(CalldataSource::new( - self.chain_provider.clone(), - self.batch_inbox_address, - *block_ref, - self.signer, - ))) + self.calldata_source.next(block_ref).await } } + + fn clear(&mut self) { + self.blob_source.clear(); + self.calldata_source.clear(); + } } #[cfg(test)] mod tests { - use crate::test_utils::TestChainProvider; + use super::*; + use crate::{ + sources::BlobData, + test_utils::{TestBlobProvider, TestChainProvider}, + }; use alloy_consensus::TxEnvelope; use alloy_eips::eip2718::Decodable2718; - use alloy_primitives::address; + use alloy_primitives::{address, Address}; use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::BlockInfo; - use crate::{ - sources::{EthereumDataSource, EthereumDataSourceVariant}, - test_utils::TestBlobProvider, - traits::{AsyncIterator, DataAvailabilityProvider}, - }; + fn default_test_blob_source() -> BlobSource { + let chain_provider = TestChainProvider::default(); + let blob_fetcher = TestBlobProvider::default(); + let batcher_address = Address::default(); + let signer = Address::default(); + BlobSource::new(chain_provider, blob_fetcher, batcher_address, signer) + } #[tokio::test] - async fn test_validate_ethereum_data_source() { + async fn test_clear_ethereum_data_source() { let chain = TestChainProvider::default(); let blob = TestBlobProvider::default(); - let block_ref = BlockInfo::default(); - - // If the ecotone_timestamp is not set, a Calldata source should be returned. - let cfg = RollupConfig { ecotone_time: None, ..Default::default() }; - let data_source = EthereumDataSource::new(chain.clone(), blob.clone(), &cfg); - let data_iter = data_source.open_data(&block_ref).await.unwrap(); - assert!(matches!(data_iter, EthereumDataSourceVariant::Calldata(_))); - - // If the ecotone_timestamp is set, and the block_ref timestamp is prior to the - // ecotone_timestamp, a calldata source is created. - let cfg = RollupConfig { ecotone_time: Some(100), ..Default::default() }; - let data_source = EthereumDataSource::new(chain, blob, &cfg); - let data_iter = data_source.open_data(&block_ref).await.unwrap(); - assert!(matches!(data_iter, EthereumDataSourceVariant::Calldata(_))); - - // If the ecotone_timestamp is set, and the block_ref timestamp is greater than - // or equal to the ecotone_timestamp, a Blob source is created. - let block_ref = BlockInfo { timestamp: 101, ..Default::default() }; - let data_iter = data_source.open_data(&block_ref).await.unwrap(); - assert!(matches!(data_iter, EthereumDataSourceVariant::Blob(_))); + let cfg = RollupConfig::default(); + let mut calldata = CalldataSource::new(chain.clone(), Address::ZERO, Address::ZERO); + calldata.calldata.insert(0, Default::default()); + calldata.open = true; + let mut blob = BlobSource::new(chain, blob, Address::ZERO, Address::ZERO); + blob.data = vec![Default::default()]; + blob.open = true; + let mut data_source = EthereumDataSource::new(blob, calldata, &cfg); + + data_source.clear(); + assert!(data_source.blob_source.data.is_empty()); + assert!(!data_source.blob_source.open); + assert!(data_source.calldata_source.calldata.is_empty()); + assert!(!data_source.calldata_source.open); + } + + #[tokio::test] + async fn test_open_blob_source() { + let chain = TestChainProvider::default(); + let mut blob = default_test_blob_source(); + blob.open = true; + blob.data.push(BlobData { data: None, calldata: Some(Bytes::default()) }); + let calldata = CalldataSource::new(chain.clone(), Address::ZERO, Address::ZERO); + let cfg = RollupConfig { ecotone_time: Some(0), ..Default::default() }; + + // Should successfully retrieve a blob batch from the block + let mut data_source = EthereumDataSource::new(blob, calldata, &cfg); + let data = data_source.next(&BlockInfo::default()).await.unwrap(); + assert_eq!(data, Bytes::default()); } #[tokio::test] @@ -142,12 +150,9 @@ mod tests { let tx = TxEnvelope::decode_2718(&mut raw_batcher_tx.as_ref()).unwrap(); chain.insert_block_with_transactions(10, block_ref, alloc::vec![tx]); - let data_source = EthereumDataSource::new(chain, blob, &cfg); - let mut data_iter = data_source.open_data(&block_ref).await.unwrap(); - assert!(matches!(data_iter, EthereumDataSourceVariant::Calldata(_))); - // Should successfully retrieve a calldata batch from the block - let calldata_batch = data_iter.next().await.unwrap(); + let mut data_source = EthereumDataSource::new_from_parts(chain, blob, &cfg); + let calldata_batch = data_source.next(&block_ref).await.unwrap(); assert_eq!(calldata_batch.len(), 119823); } } diff --git a/crates/derive/src/sources/mod.rs b/crates/derive/src/sources/mod.rs index 2ae38ee2c..7ddd4b33a 100644 --- a/crates/derive/src/sources/mod.rs +++ b/crates/derive/src/sources/mod.rs @@ -7,14 +7,17 @@ //! [DataAvailabilityProvider]: crate::traits::DataAvailabilityProvider //! [BlockInfo]: op_alloy_protocol::BlockInfo +mod blob_hash; +pub use blob_hash::IndexedBlobHash; + +mod blob_data; +pub use blob_data::BlobData; + mod ethereum; pub use ethereum::EthereumDataSource; mod blobs; -pub use blobs::{BlobData, BlobSource, IndexedBlobHash}; +pub use blobs::BlobSource; mod calldata; pub use calldata::CalldataSource; - -mod variant; -pub use variant::EthereumDataSourceVariant; diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 41c9db004..9e68c5395 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -3,9 +3,7 @@ use crate::{ errors::{PipelineError, PipelineErrorKind}, stages::FrameQueueProvider, - traits::{ - AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, SignalReceiver, - }, + traits::{DataAvailabilityProvider, OriginAdvancer, OriginProvider, SignalReceiver}, types::{ActivationSignal, PipelineResult, ResetSignal, Signal}, }; use alloc::boxed::Box; @@ -32,11 +30,9 @@ pub trait L1RetrievalProvider { /// The [L1Retrieval] stage of the derivation pipeline. /// /// For each L1 [BlockInfo] pulled from the [L1Traversal] stage, [L1Retrieval] fetches the -/// associated data from a specified [DataAvailabilityProvider]. This data is returned as a generic -/// [DataIter] that can be iterated over. +/// associated data from a specified [DataAvailabilityProvider]. /// /// [L1Traversal]: crate::stages::L1Traversal -/// [DataIter]: crate::traits::DataAvailabilityProvider::DataIter #[derive(Debug)] pub struct L1Retrieval where @@ -47,8 +43,8 @@ where pub prev: P, /// The data availability provider to use for the L1 retrieval stage. pub provider: DAP, - /// The current data iterator. - pub(crate) data: Option, + /// The current block ref. + pub next: Option, } impl L1Retrieval @@ -61,7 +57,7 @@ where /// /// [L1Traversal]: crate::stages::L1Traversal pub const fn new(prev: P, provider: DAP) -> Self { - Self { prev, provider, data: None } + Self { prev, provider, next: None } } } @@ -85,20 +81,23 @@ where type Item = DAP::Item; async fn next_data(&mut self) -> PipelineResult { - if self.data.is_none() { - let next = self - .prev - .next_l1_block() - .await? // SAFETY: This question mark bubbles up the Eof error. - .ok_or(PipelineError::MissingL1Data.temp())?; - self.data = Some(self.provider.open_data(&next).await?); + if self.next.is_none() { + self.next = Some( + self.prev + .next_l1_block() + .await? // SAFETY: This question mark bubbles up the Eof error. + .ok_or(PipelineError::MissingL1Data.temp())?, + ); } + // SAFETY: The above check ensures that `next` is not None. + let next = self.next.as_ref().expect("infallible"); - match self.data.as_mut().expect("Cannot be None").next().await { + match self.provider.next(next).await { Ok(data) => Ok(data), Err(e) => { if let PipelineErrorKind::Temporary(PipelineError::Eof) = e { - self.data = None; + self.next = None; + self.provider.clear(); } Err(e) } @@ -127,7 +126,7 @@ where match signal { Signal::Reset(ResetSignal { l1_origin, .. }) | Signal::Activation(ActivationSignal { l1_origin, .. }) => { - self.data = Some(self.provider.open_data(&l1_origin).await?); + self.next = Some(l1_origin); } _ => {} } @@ -138,34 +137,31 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{ - stages::l1_traversal::tests::*, - test_utils::{TestDAP, TestIter}, - }; + use crate::{stages::l1_traversal::tests::*, test_utils::TestDAP}; use alloc::vec; use alloy_primitives::Bytes; #[tokio::test] async fn test_l1_retrieval_flush_channel() { let traversal = new_populated_test_traversal(); - let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; + let dap = TestDAP { results: vec![] }; let mut retrieval = L1Retrieval::new(traversal, dap); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); - retrieval.data = None; + retrieval.next = None; retrieval.signal(Signal::FlushChannel).await.unwrap(); - assert!(retrieval.data.is_none()); + assert!(retrieval.next.is_none()); assert!(retrieval.prev.block.is_none()); } #[tokio::test] async fn test_l1_retrieval_activation_signal() { let traversal = new_populated_test_traversal(); - let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; + let dap = TestDAP { results: vec![] }; let mut retrieval = L1Retrieval::new(traversal, dap); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); - retrieval.data = None; + retrieval.next = None; retrieval .signal( ActivationSignal { system_config: Some(Default::default()), ..Default::default() } @@ -173,18 +169,18 @@ mod tests { ) .await .unwrap(); - assert!(retrieval.data.is_some()); + assert!(retrieval.next.is_some()); assert_eq!(retrieval.prev.block, Some(BlockInfo::default())); } #[tokio::test] async fn test_l1_retrieval_reset_signal() { let traversal = new_populated_test_traversal(); - let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; + let dap = TestDAP { results: vec![] }; let mut retrieval = L1Retrieval::new(traversal, dap); retrieval.prev.block = None; assert!(retrieval.prev.block.is_none()); - retrieval.data = None; + retrieval.next = None; retrieval .signal( ResetSignal { system_config: Some(Default::default()), ..Default::default() } @@ -192,14 +188,14 @@ mod tests { ) .await .unwrap(); - assert!(retrieval.data.is_some()); + assert!(retrieval.next.is_some()); assert_eq!(retrieval.prev.block, Some(BlockInfo::default())); } #[tokio::test] async fn test_l1_retrieval_origin() { let traversal = new_populated_test_traversal(); - let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; + let dap = TestDAP { results: vec![] }; let retrieval = L1Retrieval::new(traversal, dap); let expected = BlockInfo::default(); assert_eq!(retrieval.origin(), Some(expected)); @@ -209,52 +205,49 @@ mod tests { async fn test_l1_retrieval_next_data() { let traversal = new_populated_test_traversal(); let results = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; - let dap = TestDAP { results, batch_inbox_address: Address::default() }; + let dap = TestDAP { results }; let mut retrieval = L1Retrieval::new(traversal, dap); - assert_eq!(retrieval.data, None); + assert_eq!(retrieval.next, None); let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); - assert!(retrieval.data.is_some()); - let retrieval_data = retrieval.data.as_ref().unwrap(); - assert_eq!(retrieval_data.open_data_calls.len(), 1); - assert_eq!(retrieval_data.open_data_calls[0].0, BlockInfo::default()); - assert_eq!(retrieval_data.open_data_calls[0].1, Address::default()); - // Data should be reset to none and the error should be bubbled up. - let data = retrieval.next_data().await.unwrap_err(); - assert_eq!(data, PipelineError::Eof.temp()); - assert!(retrieval.data.is_none()); } #[tokio::test] - async fn test_l1_retrieval_existing_data_is_respected() { - let data = TestIter { - open_data_calls: vec![(BlockInfo::default(), Address::default())], - results: vec![Ok(Bytes::default())], - }; - // Create a new traversal with no blocks or receipts. - // This would bubble up an error if the prev stage - // (traversal) is called in the retrieval stage. - let traversal = new_test_traversal(vec![], vec![]); - let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + async fn test_l1_retrieval_next_data_respect_next() { + let mut traversal = new_populated_test_traversal(); + traversal.done = true; + let results = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; + let dap = TestDAP { results }; + let mut retrieval = L1Retrieval::new(traversal, dap); + retrieval.next = Some(BlockInfo::default()); let data = retrieval.next_data().await.unwrap(); assert_eq!(data, Bytes::default()); - assert!(retrieval.data.is_some()); - let retrieval_data = retrieval.data.as_ref().unwrap(); - assert_eq!(retrieval_data.open_data_calls.len(), 1); + let err = retrieval.next_data().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + assert!(retrieval.next.is_none()); + } + + #[tokio::test] + async fn test_l1_retrieval_next_data_l1_block_errors() { + let mut traversal = new_populated_test_traversal(); + traversal.done = true; + let results = vec![Err(PipelineError::Eof.temp()), Ok(Bytes::default())]; + let dap = TestDAP { results }; + let mut retrieval = L1Retrieval::new(traversal, dap); + assert_eq!(retrieval.next, None); + let err = retrieval.next_data().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + assert!(retrieval.next.is_none()); } #[tokio::test] async fn test_l1_retrieval_existing_data_errors() { - let data = TestIter { - open_data_calls: vec![(BlockInfo::default(), Address::default())], - results: vec![Err(PipelineError::Eof.temp())], - }; let traversal = new_populated_test_traversal(); - let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() }; - let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) }; + let dap = TestDAP { results: vec![Err(PipelineError::Eof.temp())] }; + let mut retrieval = + L1Retrieval { prev: traversal, provider: dap, next: Some(BlockInfo::default()) }; let data = retrieval.next_data().await.unwrap_err(); assert_eq!(data, PipelineError::Eof.temp()); - assert!(retrieval.data.is_none()); + assert!(retrieval.next.is_none()); } } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index a19f0e96b..afc60ce31 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -25,9 +25,9 @@ pub struct L1Traversal { /// The current block in the traversal stage. pub block: Option, /// The data source for the traversal stage. - data_source: Provider, + pub data_source: Provider, /// Signals whether or not the traversal stage is complete. - done: bool, + pub done: bool, /// The system config. pub system_config: SystemConfig, /// A reference to the rollup config. diff --git a/crates/derive/src/test_utils/data_availability_provider.rs b/crates/derive/src/test_utils/data_availability_provider.rs index 3628f585b..d17ca43d6 100644 --- a/crates/derive/src/test_utils/data_availability_provider.rs +++ b/crates/derive/src/test_utils/data_availability_provider.rs @@ -1,55 +1,28 @@ //! An implementation of the [DataAvailabilityProvider] trait for tests. -use crate::{ - errors::PipelineError, - traits::{AsyncIterator, DataAvailabilityProvider}, - types::PipelineResult, -}; -use alloc::{boxed::Box, vec, vec::Vec}; -use alloy_primitives::{Address, Bytes}; +use crate::{errors::PipelineError, traits::DataAvailabilityProvider, types::PipelineResult}; +use alloc::{boxed::Box, vec::Vec}; +use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; use op_alloy_protocol::BlockInfo; -/// Mock data iterator -#[derive(Debug, Default, PartialEq)] -pub struct TestIter { - /// Holds open data calls with args for assertions. - pub(crate) open_data_calls: Vec<(BlockInfo, Address)>, - /// A queue of results to return as the next iterated data. - pub(crate) results: Vec>, -} - -#[async_trait] -impl AsyncIterator for TestIter { - type Item = Bytes; - - async fn next(&mut self) -> PipelineResult { - self.results.pop().unwrap_or(Err(PipelineError::Eof.temp())) - } -} - /// Mock data availability provider #[derive(Debug, Default)] pub struct TestDAP { - /// The batch inbox address. - pub batch_inbox_address: Address, - /// Specifies the stage results the test iter returns as data. - pub(crate) results: Vec>, + /// Specifies the stage results. + pub results: Vec>, } #[async_trait] impl DataAvailabilityProvider for TestDAP { type Item = Bytes; - type DataIter = TestIter; - async fn open_data(&self, block_ref: &BlockInfo) -> PipelineResult { - // Construct a new vec of results to return. - let results = self - .results - .iter() - .map(|i| i.as_ref().map_or_else(|_| Err(PipelineError::Eof.temp()), |r| Ok(r.clone()))) - .collect::>>(); - Ok(TestIter { open_data_calls: vec![(*block_ref, self.batch_inbox_address)], results }) + async fn next(&mut self, _: &BlockInfo) -> PipelineResult { + self.results.pop().unwrap_or(Err(PipelineError::Eof.temp())) + } + + fn clear(&mut self) { + self.results.clear(); } } diff --git a/crates/derive/src/test_utils/mod.rs b/crates/derive/src/test_utils/mod.rs index 8be980757..b0f28b5a9 100644 --- a/crates/derive/src/test_utils/mod.rs +++ b/crates/derive/src/test_utils/mod.rs @@ -14,7 +14,7 @@ mod chain_providers; pub use chain_providers::{TestChainProvider, TestL2ChainProvider, TestProviderError}; mod data_availability_provider; -pub use data_availability_provider::{TestDAP, TestIter}; +pub use data_availability_provider::TestDAP; mod batch_provider; pub use batch_provider::TestNextBatchProvider; diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index 22937cc96..027e42186 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -28,22 +28,11 @@ pub trait BlobProvider { pub trait DataAvailabilityProvider { /// The item type of the data iterator. type Item: Send + Sync + Debug + Into; - /// An iterator over returned bytes data. - type DataIter: AsyncIterator + Send + Debug; - /// Returns the data availability for the block with the given hash, or an error if the block - /// does not exist in the data source. - async fn open_data(&self, block_ref: &BlockInfo) -> PipelineResult; -} - -/// A simple asynchronous iterator trait. -/// This should be replaced with the `async-iterator` crate -#[async_trait] -pub trait AsyncIterator { - /// The item type of the iterator. - type Item: Send + Sync + Debug + Into; + /// Returns the next data for the given [BlockInfo]. + /// Returns a `PipelineError::Eof` if there is no more data for the given block ref. + async fn next(&mut self, block_ref: &BlockInfo) -> PipelineResult; - /// Returns the next item in the iterator, or [crate::errors::PipelineError::Eof] if the - /// iterator is exhausted. - async fn next(&mut self) -> PipelineResult; + /// Clears the data source for the next block ref. + fn clear(&mut self); } diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 88c2248b6..91f24c10f 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -11,7 +11,7 @@ mod attributes; pub use attributes::{AttributesBuilder, AttributesProvider, NextAttributes}; mod data_sources; -pub use data_sources::{AsyncIterator, BlobProvider, DataAvailabilityProvider}; +pub use data_sources::{BlobProvider, DataAvailabilityProvider}; mod reset; pub use reset::ResetProvider;