diff --git a/Cargo.lock b/Cargo.lock index 94861bff..557eb6e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9285,8 +9285,8 @@ dependencies = [ "alloy-transport", "arbitrary", "async-trait", - "derive_more 2.0.1", "eyre", + "itertools 0.14.0", "rand 0.9.0", "rollup-node-primitives", "scroll-alloy-consensus", @@ -9740,13 +9740,16 @@ dependencies = [ name = "scroll-derivation-pipeline" version = "0.0.1" dependencies = [ + "alloy-eips", "alloy-primitives", "alloy-rpc-types-engine", "eyre", "reth-scroll-chainspec", + "rollup-node-primitives", "scroll-alloy-consensus", "scroll-alloy-rpc-types-engine", "scroll-codec", + "thiserror 2.0.12", ] [[package]] diff --git a/crates/codec/src/decoding/batch.rs b/crates/codec/src/decoding/batch.rs index a175ef73..c90b1060 100644 --- a/crates/codec/src/decoding/batch.rs +++ b/crates/codec/src/decoding/batch.rs @@ -54,6 +54,7 @@ impl Batch { let num_l1_messages = b.context.num_l1_messages as usize; let block_messages = l1_messages_buf.get(..num_l1_messages).unwrap_or(&[]); *l1_messages_buf = l1_messages_buf.get(num_l1_messages..).unwrap_or(&[]); + block_messages }) .collect::>(); @@ -113,7 +114,7 @@ fn hash_chunk( mod tests { use crate::decoding::{test_utils::read_to_bytes, v0::decode_v0, v1::decode_v1}; - use alloy_primitives::{address, b256, bytes, U256}; + use alloy_primitives::b256; use scroll_alloy_consensus::TxL1Message; #[test] @@ -134,35 +135,10 @@ mod tests { // let raw_calldata = read_to_bytes("./testdata/calldata_v0_with_l1_messages.bin")?; let batch = decode_v0(&raw_calldata)?; + let l1_messages: Vec = + serde_json::from_str(&std::fs::read_to_string("./testdata/l1_messages_v0.json")?)?; - let hash = batch - .try_compute_data_hash(&[ - TxL1Message { - queue_index: 39, - gas_limit: 180000, - to: address!("781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC"), - value: U256::ZERO, - sender: address!("7885BcBd5CeCEf1336b5300fb5186A12DDD8c478"), - input: bytes!("8ef1332e000000000000000000000000f1af3b23de0a5ca3cab7261cb0061c0d779a5c7b00000000000000000000000033b60d5dd260d453cac3782b0bdc01ce846721420000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002700000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000e48431f5c1000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb4800000000000000000000000006efdbff2a14a7c8e15944d1f4a48f9f95f663a4000000000000000000000000c451b0191351ce308fdfd779d73814c910fc5ecb000000000000000000000000c451b0191351ce308fdfd779d73814c910fc5ecb00000000000000000000000000000000000000000000000000000005d21dba0000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), - }, - TxL1Message { - queue_index: 40, - gas_limit: 168000, - to: address!("781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC"), - value: U256::ZERO, - sender: address!("7885BcBd5CeCEf1336b5300fb5186A12DDD8c478"), - input: bytes!("8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf00000000000000000000000000000000000000000000000000011c37937e08000000000000000000000000000000000000000000000000000000000000000002800000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000b89db2813541287a4dd1fc6801eec30595ecdc6c000000000000000000000000b89db2813541287a4dd1fc6801eec30595ecdc6c0000000000000000000000000000000000000000000000000011c37937e080000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), - }, - TxL1Message { - queue_index: 41, - gas_limit: 168000, - to: address!("781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC"), - value: U256::ZERO, - sender: address!("7885BcBd5CeCEf1336b5300fb5186A12DDD8c478"), - input: bytes!("8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf0000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000000000000000002900000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e87480000000000000000000000003219c394111d45757ccb68a4fd353b4f7f9660960000000000000000000000003219c394111d45757ccb68a4fd353b4f7f966096000000000000000000000000000000000000000000000000002386f26fc100000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), - }, - ]) - .unwrap(); + let hash = batch.try_compute_data_hash(&l1_messages).unwrap(); assert_eq!(hash, b256!("55fd647c58461d910b5bfb4539f2177ba575c9c8d578a344558976a4375cc287")); diff --git a/crates/codec/src/error.rs b/crates/codec/src/error.rs index 2ead2503..1b3e6182 100644 --- a/crates/codec/src/error.rs +++ b/crates/codec/src/error.rs @@ -1,3 +1,5 @@ +use alloy_primitives::U256; + /// An error occurring during the codec process. #[derive(Debug, thiserror::Error)] pub enum CodecError { @@ -13,6 +15,8 @@ pub enum DecodingError { MissingCodecVersion, #[error("unsupported codec version {0}")] UnsupportedCodecVersion(u8), + #[error("malformed codec version: {0}")] + MalformedCodecVersion(U256), #[error("missing blob from data source")] MissingBlob, #[error("missing chunk data")] diff --git a/crates/codec/src/lib.rs b/crates/codec/src/lib.rs index 6d7a520d..a5231e45 100644 --- a/crates/codec/src/lib.rs +++ b/crates/codec/src/lib.rs @@ -13,7 +13,7 @@ use crate::decoding::{ }; use alloy_eips::eip4844::Blob; -use alloy_primitives::Bytes; +use alloy_primitives::{ruint::UintTryTo, Bytes, U256}; /// The Codec. #[derive(Debug)] @@ -43,7 +43,7 @@ impl Codec { /// Decodes the input data and returns the decoded [`Batch`]. pub fn decode(input: &T) -> Result { let calldata = input.calldata(); - let version = calldata.first().ok_or(DecodingError::MissingCodecVersion)?; + let version = get_codec_version(calldata)?; let payload = match version { 0 => decode_v0(calldata)?, @@ -63,7 +63,7 @@ impl Codec { let blob = input.blob().ok_or(DecodingError::MissingBlob)?; decode_v7(blob.as_ref())? } - v => return Err(DecodingError::UnsupportedCodecVersion(*v).into()), + v => return Err(DecodingError::UnsupportedCodecVersion(v).into()), }; Ok(payload) @@ -77,3 +77,23 @@ pub trait CommitDataSource { /// Returns the blob for decoding. fn blob(&self) -> Option<&Blob>; } + +/// Returns the codec version from the calldata. +fn get_codec_version(calldata: &[u8]) -> Result { + const CODEC_VERSION_OFFSET_START: usize = 4; + const CODEC_VERSION_LEN: usize = 32; + const CODEC_VERSION_OFFSET_END: usize = CODEC_VERSION_OFFSET_START + CODEC_VERSION_LEN; + const HIGH_BYTES_MASK: U256 = + U256::from_limbs([u64::MAX, u64::MAX, u64::MAX, 0xffffffffffffff00]); + + let version = calldata + .get(CODEC_VERSION_OFFSET_START..CODEC_VERSION_OFFSET_END) + .ok_or(DecodingError::Eof)?; + let version = U256::from_be_slice(version); + + if (version & HIGH_BYTES_MASK) != U256::ZERO { + return Err(DecodingError::MalformedCodecVersion(version)) + } + + Ok(version.uint_try_to().expect("fits in single byte")) +} diff --git a/crates/codec/testdata/l1_messages_v0.json b/crates/codec/testdata/l1_messages_v0.json new file mode 100644 index 00000000..59d86993 --- /dev/null +++ b/crates/codec/testdata/l1_messages_v0.json @@ -0,0 +1,26 @@ +[ + { + "queueIndex": "39", + "gas": "180000", + "to": "0x781e90f1c8fc4611c9b7497c3b47f99ef6969cbc", + "value": "0x0", + "sender": "0x7885bcbd5cecef1336b5300fb5186a12ddd8c478", + "input": "8ef1332e000000000000000000000000f1af3b23de0a5ca3cab7261cb0061c0d779a5c7b00000000000000000000000033b60d5dd260d453cac3782b0bdc01ce846721420000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002700000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000e48431f5c1000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb4800000000000000000000000006efdbff2a14a7c8e15944d1f4a48f9f95f663a4000000000000000000000000c451b0191351ce308fdfd779d73814c910fc5ecb000000000000000000000000c451b0191351ce308fdfd779d73814c910fc5ecb00000000000000000000000000000000000000000000000000000005d21dba0000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + }, + { + "queueIndex": "40", + "gas": "168000", + "to": "0x781e90f1c8fc4611c9b7497c3b47f99ef6969cbc", + "value": "0x0", + "sender": "0x7885bcbd5cecef1336b5300fb5186a12ddd8c478", + "input": "8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf00000000000000000000000000000000000000000000000000011c37937e08000000000000000000000000000000000000000000000000000000000000000002800000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000b89db2813541287a4dd1fc6801eec30595ecdc6c000000000000000000000000b89db2813541287a4dd1fc6801eec30595ecdc6c0000000000000000000000000000000000000000000000000011c37937e080000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + }, + { + "queueIndex": "41", + "gas": "168000", + "to": "0x781e90f1c8fc4611c9b7497c3b47f99ef6969cbc", + "value": "0x0", + "sender": "0x7885bcbd5cecef1336b5300fb5186a12ddd8c478", + "input": "8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf0000000000000000000000000000000000000000000000000002386f26fc10000000000000000000000000000000000000000000000000000000000000000002900000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e87480000000000000000000000003219c394111d45757ccb68a4fd353b4f7f9660960000000000000000000000003219c394111d45757ccb68a4fd353b4f7f966096000000000000000000000000000000000000000000000000002386f26fc100000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + } +] diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 1e27998a..4a6bedea 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -46,15 +46,14 @@ impl From for Database { #[cfg(test)] mod test { use crate::{operations::DatabaseOperations, test_utils::setup_test_db}; + use arbitrary::{Arbitrary, Unstructured}; use futures::StreamExt; use rand::Rng; - use rollup_node_primitives::{ - BatchInput, BatchInputV1, BatchInputV2, L1MessageWithBlockNumber, - }; + use rollup_node_primitives::{BatchCommitData, L1MessageWithBlockNumber}; #[tokio::test] - async fn test_database_round_trip_batch_input() { + async fn test_database_round_trip_batch_commit() { // Set up the test database. let db = setup_test_db().await; @@ -64,24 +63,13 @@ mod test { let mut u = Unstructured::new(&bytes); // Generate a random BatchInputV1. - let batch_input_v1 = BatchInputV1::arbitrary(&mut u).unwrap(); - let batch_input = BatchInput::BatchInputDataV1(batch_input_v1); - - // Round trip the BatchInput through the database. - db.insert_batch_input(batch_input.clone()).await.unwrap(); - let batch_input_from_db = - db.get_batch_input_by_batch_index(batch_input.batch_index()).await.unwrap().unwrap(); - assert_eq!(batch_input, batch_input_from_db); - - // Generate a random BatchInputV2. - let batch_input_v2 = BatchInputV2::arbitrary(&mut u).unwrap(); - let batch_input = BatchInput::BatchInputDataV2(batch_input_v2); + let batch_commit = BatchCommitData::arbitrary(&mut u).unwrap(); // Round trip the BatchInput through the database. - db.insert_batch_input(batch_input.clone()).await.unwrap(); - let batch_input_from_db = - db.get_batch_input_by_batch_index(batch_input.batch_index()).await.unwrap().unwrap(); - assert_eq!(batch_input, batch_input_from_db); + db.insert_batch(batch_commit.clone()).await.unwrap(); + let batch_commit_from_db = + db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); + assert_eq!(batch_commit, batch_commit_from_db); } #[tokio::test] diff --git a/crates/database/db/src/models/batch_commit.rs b/crates/database/db/src/models/batch_commit.rs new file mode 100644 index 00000000..2553e9c9 --- /dev/null +++ b/crates/database/db/src/models/batch_commit.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use rollup_node_primitives::BatchCommitData; +use sea_orm::{entity::prelude::*, ActiveValue}; + +/// A database model that represents a batch input. +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "batch_commit")] +pub struct Model { + #[sea_orm(primary_key)] + index: i64, + hash: Vec, + block_number: i64, + calldata: Vec, + blob_hash: Option>, + finalized_block_number: Option, +} + +/// The relation for the batch input model. +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +/// The active model behavior for the batch input model. +impl ActiveModelBehavior for ActiveModel {} + +impl From for ActiveModel { + fn from(batch_commit: BatchCommitData) -> Self { + Self { + index: ActiveValue::Set( + batch_commit.index.try_into().expect("index should fit in i64"), + ), + hash: ActiveValue::Set(batch_commit.hash.to_vec()), + block_number: ActiveValue::Set( + batch_commit.block_number.try_into().expect("block number should fit in i64"), + ), + calldata: ActiveValue::Set(batch_commit.calldata.0.to_vec()), + blob_hash: ActiveValue::Set(batch_commit.blob_versioned_hash.map(|b| b.to_vec())), + finalized_block_number: ActiveValue::Unchanged(None), + } + } +} + +impl From for BatchCommitData { + fn from(value: Model) -> Self { + Self { + hash: value.hash.as_slice().try_into().expect("data persisted in database is valid"), + index: value.index as u64, + block_number: value.block_number as u64, + calldata: Arc::new(value.calldata.into()), + blob_versioned_hash: value + .blob_hash + .map(|b| b.as_slice().try_into().expect("data persisted in database is valid")), + } + } +} diff --git a/crates/database/db/src/models/batch_input.rs b/crates/database/db/src/models/batch_input.rs deleted file mode 100644 index 9ba77d91..00000000 --- a/crates/database/db/src/models/batch_input.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::ops::Deref; - -use rollup_node_primitives::{BatchInput as BatchInputPrimitive, BatchInputV1, BatchInputV2}; -use sea_orm::{entity::prelude::*, ActiveValue, FromJsonQueryResult}; - -/// A database model that represents a batch input. -#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] -#[sea_orm(table_name = "batch_input")] -pub struct Model { - #[sea_orm(primary_key)] - index: i64, - version: u8, - codec_version: u8, - hash: Vec, - block_number: i64, - parent_batch_header: Vec, - #[sea_orm(column_type = "JsonBinary")] - chunks: Chunks, - skipped_l1_message_bitmap: Vec, - blob_hash: Vec, - finalized_block_number: Option, -} - -/// The relation for the batch input model. -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} - -/// The active model behavior for the batch input model. -impl ActiveModelBehavior for ActiveModel {} - -/// A wrapper for a list of chunks. -#[derive( - Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, FromJsonQueryResult, -)] -pub struct Chunks(pub Vec>); - -impl Deref for Chunks { - type Target = Vec>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From for ActiveModel { - fn from(batch_input: BatchInputPrimitive) -> Self { - let (version, batch_input_v1, blob_hash) = match batch_input { - BatchInputPrimitive::BatchInputDataV1(batch_input) => (1, batch_input, vec![]), - BatchInputPrimitive::BatchInputDataV2(batch_input) => { - (2, batch_input.batch_input_base, batch_input.blob_hash.to_vec()) - } - }; - Self { - index: ActiveValue::Set( - batch_input_v1.batch_index.try_into().expect("index should fit in i64"), - ), - version: ActiveValue::Set(version), - codec_version: ActiveValue::Set(batch_input_v1.version), - hash: ActiveValue::Set(batch_input_v1.batch_hash.to_vec()), - block_number: ActiveValue::Set( - batch_input_v1.block_number.try_into().expect("block number should fit in i64"), - ), - parent_batch_header: ActiveValue::Set(batch_input_v1.parent_batch_header), - chunks: ActiveValue::Set(Chunks(batch_input_v1.chunks)), - skipped_l1_message_bitmap: ActiveValue::Set(batch_input_v1.skipped_l1_message_bitmap), - blob_hash: ActiveValue::Set(blob_hash), - finalized_block_number: ActiveValue::Unchanged(None), - } - } -} - -impl From for BatchInputPrimitive { - fn from(value: Model) -> Self { - let chunks = value.chunks.0; - let batch_input_v1 = BatchInputV1 { - version: value.codec_version, - batch_index: value.index.try_into().expect("data persisted in database is valid"), - batch_hash: value - .hash - .as_slice() - .try_into() - .expect("data persisted in database is valid"), - block_number: value - .block_number - .try_into() - .expect("data persisted in database is valid"), - parent_batch_header: value.parent_batch_header, - chunks, - skipped_l1_message_bitmap: value.skipped_l1_message_bitmap, - }; - - if value.version == 1 { - Self::BatchInputDataV1(batch_input_v1) - } else { - Self::BatchInputDataV2(BatchInputV2 { - batch_input_base: batch_input_v1, - blob_hash: value - .blob_hash - .as_slice() - .try_into() - .expect("data persisted in database is valid"), - }) - } - } -} diff --git a/crates/database/db/src/models/mod.rs b/crates/database/db/src/models/mod.rs index 97eb8b6d..a8d63e56 100644 --- a/crates/database/db/src/models/mod.rs +++ b/crates/database/db/src/models/mod.rs @@ -1,5 +1,5 @@ -/// This module contains the batch input database model. -pub mod batch_input; +/// This module contains the batch commit database model. +pub mod batch_commit; /// This module contains the L1 message database model. pub mod l1_message; diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index e28d3f3b..d138ee6e 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -1,38 +1,39 @@ use super::{models, DatabaseError}; use crate::DatabaseConnectionProvider; + use alloy_primitives::B256; use futures::{Stream, StreamExt}; -use rollup_node_primitives::{BatchInput, L1MessageWithBlockNumber}; +use rollup_node_primitives::{BatchCommitData, L1MessageWithBlockNumber}; use sea_orm::{ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, QueryFilter, Set}; /// The [`DatabaseOperations`] trait provides methods for interacting with the database. #[async_trait::async_trait] pub trait DatabaseOperations: DatabaseConnectionProvider { - /// Insert a [`BatchInput`] into the database. - async fn insert_batch_input(&self, batch_input: BatchInput) -> Result<(), DatabaseError> { - tracing::trace!(target: "scroll::db", batch_hash = ?batch_input.batch_hash(), batch_index = batch_input.batch_index(), "Inserting batch input into database."); - let batch_input: models::batch_input::ActiveModel = batch_input.into(); - batch_input.insert(self.get_connection()).await?; + /// Insert a [`BatchCommitData`] into the database. + async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database."); + let batch_commit: models::batch_commit::ActiveModel = batch_commit.into(); + batch_commit.insert(self.get_connection()).await?; Ok(()) } - /// Finalize a [`BatchInput`] with the provided `batch_hash` in the database and set the + /// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the /// finalized block number to the provided block number. /// - /// Errors if the [`BatchInput`] associated with the provided `batch_hash` is not found in the - /// database, this method logs and returns an error. - async fn finalize_batch_input( + /// Errors if the [`BatchCommitData`] associated with the provided `batch_hash` is not found in + /// the database, this method logs and returns an error. + async fn finalize_batch( &self, batch_hash: B256, block_number: u64, ) -> Result<(), DatabaseError> { - if let Some(batch) = models::batch_input::Entity::find() - .filter(models::batch_input::Column::Hash.eq(batch_hash.to_vec())) + if let Some(batch) = models::batch_commit::Entity::find() + .filter(models::batch_commit::Column::Hash.eq(batch_hash.to_vec())) .one(self.get_connection()) .await? { tracing::trace!(target: "scroll::db", batch_hash = ?batch_hash, block_number, "Finalizing batch input in database."); - let mut batch: models::batch_input::ActiveModel = batch.into(); + let mut batch: models::batch_commit::ActiveModel = batch.into(); batch.finalized_block_number = Set(Some(block_number as i64)); batch.update(self.get_connection()).await?; } else { @@ -48,12 +49,12 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { Ok(()) } - /// Get a [`BatchInput`] from the database by its batch index. - async fn get_batch_input_by_batch_index( + /// Get a [`BatchCommitData`] from the database by its batch index. + async fn get_batch_by_index( &self, batch_index: u64, - ) -> Result, DatabaseError> { - Ok(models::batch_input::Entity::find_by_id( + ) -> Result, DatabaseError> { + Ok(models::batch_commit::Entity::find_by_id( TryInto::::try_into(batch_index).expect("index should fit in i64"), ) .one(self.get_connection()) @@ -61,21 +62,21 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.map(Into::into))?) } - /// Delete all [`BatchInput`]s with a block number greater than the provided block number. - async fn delete_batch_inputs_gt(&self, block_number: u64) -> Result<(), DatabaseError> { + /// Delete all [`BatchCommitData`]s with a block number greater than the provided block number. + async fn delete_batches_gt(&self, block_number: u64) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", block_number, "Deleting batch inputs greater than block number."); - Ok(models::batch_input::Entity::delete_many() - .filter(models::batch_input::Column::BlockNumber.gt(block_number as i64)) + Ok(models::batch_commit::Entity::delete_many() + .filter(models::batch_commit::Column::BlockNumber.gt(block_number as i64)) .exec(self.get_connection()) .await .map(|_| ())?) } - /// Get an iterator over all [`BatchInput`]s in the database. - async fn get_batch_inputs<'a>( + /// Get an iterator over all [`BatchCommitData`]s in the database. + async fn get_batches<'a>( &'a self, - ) -> Result> + 'a, DbErr> { - Ok(models::batch_input::Entity::find() + ) -> Result> + 'a, DbErr> { + Ok(models::batch_commit::Entity::find() .stream(self.get_connection()) .await? .map(|res| res.map(Into::into))) diff --git a/crates/database/migration/src/lib.rs b/crates/database/migration/src/lib.rs index 4485aaf2..92c48bb5 100644 --- a/crates/database/migration/src/lib.rs +++ b/crates/database/migration/src/lib.rs @@ -1,6 +1,6 @@ pub use sea_orm_migration::prelude::*; -mod m20220101_000001_create_batch_input_table; +mod m20220101_000001_create_batch_commit_table; mod m20250304_125946_add_l1_msg_table; pub struct Migrator; @@ -9,7 +9,7 @@ pub struct Migrator; impl MigratorTrait for Migrator { fn migrations() -> Vec> { vec![ - Box::new(m20220101_000001_create_batch_input_table::Migration), + Box::new(m20220101_000001_create_batch_commit_table::Migration), Box::new(m20250304_125946_add_l1_msg_table::Migration), ] } diff --git a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs new file mode 100644 index 00000000..226a5b49 --- /dev/null +++ b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs @@ -0,0 +1,42 @@ +use sea_orm_migration::{prelude::*, schema::*}; + +// TODO: migrate these to a constants module +const HASH_LENGTH: u32 = 32; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(BatchCommit::Table) + .if_not_exists() + .col(pk_auto(BatchCommit::Index)) + .col(binary_len(BatchCommit::Hash, HASH_LENGTH)) + .col(big_unsigned(BatchCommit::BlockNumber)) + .col(binary(BatchCommit::Calldata)) + .col(binary_len_null(BatchCommit::BlobHash, HASH_LENGTH)) + .col(boolean_null(BatchCommit::FinalizedBlockNumber)) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager.drop_table(Table::drop().table(BatchCommit::Table).to_owned()).await + } +} + +#[derive(DeriveIden)] +enum BatchCommit { + Table, + Index, + Hash, + BlockNumber, + Calldata, + BlobHash, + FinalizedBlockNumber, +} diff --git a/crates/database/migration/src/m20220101_000001_create_batch_input_table.rs b/crates/database/migration/src/m20220101_000001_create_batch_input_table.rs deleted file mode 100644 index a0a22e02..00000000 --- a/crates/database/migration/src/m20220101_000001_create_batch_input_table.rs +++ /dev/null @@ -1,57 +0,0 @@ -use sea_orm_migration::{prelude::*, schema::*}; - -// TODO: migrate these to a constants module -// CONSTANTS -const BATCH_HEADER_LENGTH: u32 = 32; - -const CHUNKS_LENGTH: u32 = 1024; - -const HASH_LENGTH: u32 = 32; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager - .create_table( - Table::create() - .table(BatchInput::Table) - .if_not_exists() - .col(pk_auto(BatchInput::Index)) - .col(tiny_integer(BatchInput::Version)) - .col(tiny_integer(BatchInput::CodecVersion)) - .col(binary_len(BatchInput::Hash, HASH_LENGTH)) - .col(big_unsigned(BatchInput::BlockNumber)) - .col(binary_len(BatchInput::ParentBatchHeader, BATCH_HEADER_LENGTH)) - .col(var_binary(BatchInput::Chunks, CHUNKS_LENGTH)) - .col(var_binary(BatchInput::SkippedL1MessageBitmap, HASH_LENGTH)) - // TODO: Set the blob hash as nullable - .col(binary_len(BatchInput::BlobHash, HASH_LENGTH)) - .col(boolean_null(BatchInput::FinalizedBlockNumber)) - .to_owned(), - ) - .await - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager.drop_table(Table::drop().table(BatchInput::Table).to_owned()).await - } -} - -#[derive(DeriveIden)] -enum BatchInput { - Table, - Version, - Index, - CodecVersion, - Hash, - BlockNumber, - ParentBatchHeader, - Chunks, - SkippedL1MessageBitmap, - BlobHash, - FinalizedBlockNumber, - // TODO: Do we need the blob proof? -} diff --git a/crates/derivation-pipeline/Cargo.toml b/crates/derivation-pipeline/Cargo.toml index 8d716066..d5b82cbe 100644 --- a/crates/derivation-pipeline/Cargo.toml +++ b/crates/derivation-pipeline/Cargo.toml @@ -11,6 +11,7 @@ workspace = true [dependencies] # alloy +alloy-eips.workspace = true alloy-primitives = { workspace = true, default-features = false } alloy-rpc-types-engine = { workspace = true, default-features = false } @@ -20,8 +21,12 @@ scroll-alloy-consensus = { workspace = true, default-features = false } scroll-alloy-rpc-types-engine.workspace = true # rollup node +rollup-node-primitives.workspace = true scroll-codec.workspace = true +# misc +thiserror.workspace = true + [dev-dependencies] eyre.workspace = true scroll-codec = { workspace = true, features = ["test-utils"] } @@ -33,5 +38,8 @@ std = [ "scroll-alloy-rpc-types-engine/std", "alloy-primitives/std", "alloy-rpc-types-engine/std", + "alloy-eips/std", "reth-scroll-chainspec/std", + "scroll-alloy-consensus/std", + "scroll-alloy-rpc-types-engine/std", ] diff --git a/crates/derivation-pipeline/src/data_source.rs b/crates/derivation-pipeline/src/data_source.rs new file mode 100644 index 00000000..4b1d4e1a --- /dev/null +++ b/crates/derivation-pipeline/src/data_source.rs @@ -0,0 +1,19 @@ +use alloy_eips::eip4844::Blob; +use alloy_primitives::Bytes; +use scroll_codec::CommitDataSource; + +/// Holds the data for the codec. +pub(crate) struct CodecDataSource<'a> { + pub(crate) calldata: &'a Bytes, + pub(crate) blob: Option<&'a Blob>, +} + +impl<'a> CommitDataSource for CodecDataSource<'a> { + fn calldata(&self) -> &Bytes { + self.calldata + } + + fn blob(&self) -> Option<&Blob> { + self.blob + } +} diff --git a/crates/derivation-pipeline/src/error.rs b/crates/derivation-pipeline/src/error.rs new file mode 100644 index 00000000..4023e63e --- /dev/null +++ b/crates/derivation-pipeline/src/error.rs @@ -0,0 +1,12 @@ +use scroll_codec::CodecError; + +/// An error occurred during the derivation process. +#[derive(Debug, thiserror::Error)] +pub enum DerivationPipelineError { + /// An error in the codec. + #[error(transparent)] + Codec(#[from] CodecError), + /// Missing L1 messages cursor. + #[error("missing l1 message queue cursor")] + MissingL1MessageQueueCursor, +} diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 7dea2def..a4e13896 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -4,35 +4,75 @@ //! into payload attributes for block building. #![cfg_attr(not(feature = "std"), no_std)] + +mod data_source; +mod error; + #[cfg(not(feature = "std"))] extern crate alloc as std; use std::vec::Vec; +use crate::{data_source::CodecDataSource, error::DerivationPipelineError}; +use alloy_eips::eip4844::Blob; use alloy_primitives::B256; use alloy_rpc_types_engine::PayloadAttributes; use reth_scroll_chainspec::SCROLL_FEE_VAULT_ADDRESS; +use rollup_node_primitives::BatchCommitData; use scroll_alloy_consensus::TxL1Message; use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes; -use scroll_codec::decoding::batch::Batch; +use scroll_codec::Codec; -/// An instance of the trait can be used to provide the next L1 message to be used in the derivation -/// pipeline. +/// An instance of the trait can provide L1 messages using a cursor approach. Set the cursor for the +/// provider using the queue index or hash and then call [`L1MessageProvider::next_l1_message`] to +/// iterate the queue. pub trait L1MessageProvider { - /// Returns the next L1 message. + /// Returns the L1 message at the current cursor and advances the cursor. fn next_l1_message(&self) -> TxL1Message; + /// Set the index cursor for the provider. + fn set_index_cursor(&mut self, index: u64); + /// Set the hash cursor for the provider. + fn set_hash_cursor(&mut self, hash: B256); +} + +/// An instance of the trait can be used to fetch L1 blob data. +pub trait L1BlobProvider { + /// Returns corresponding blob data for the provided hash. + fn blob(&self, hash: B256) -> Option; } -/// Returns an iterator over [`ScrollPayloadAttributes`] from the [`Batch`] and a -/// [`L1MessageProvider`]. -pub fn derive( - batch: Batch, - l1_message_provider: &P, -) -> impl Iterator + use<'_, P> { - batch.data.into_l2_blocks().into_iter().map(|mut block| { +/// An instance of the trait can be used to provide L1 data. +pub trait L1Provider: L1BlobProvider + L1MessageProvider {} +impl L1Provider for T where T: L1BlobProvider + L1MessageProvider {} + +/// Returns an iterator over [`ScrollPayloadAttributes`] from the [`BatchCommitData`] and a +/// [`L1Provider`]. +pub fn derive( + batch: BatchCommitData, + l1_provider: &mut P, +) -> Result + use<'_, P>, DerivationPipelineError> { + // fetch the blob then decode the input batch. + let blob = batch.blob_versioned_hash.and_then(|hash| l1_provider.blob(hash)); + let data = CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_ref() }; + let decoded = Codec::decode(&data)?; + + // set the cursor for the l1 provider. + let data = &decoded.data; + if let Some(index) = data.queue_index_start() { + l1_provider.set_index_cursor(index) + } else if let Some(hash) = data.prev_l1_message_queue_hash() { + l1_provider.set_hash_cursor(*hash); + // we skip the first l1 message, as we are interested in the one starting after + // prev_l1_message_queue_hash. + let _ = l1_provider.next_l1_message(); + } else { + return Err(DerivationPipelineError::MissingL1MessageQueueCursor) + } + + let iter = decoded.data.into_l2_blocks().into_iter().map(|mut block| { // query the appropriate amount of l1 messages. let mut txs = (0..block.context.num_l1_messages) - .map(|_| l1_message_provider.next_l1_message()) + .map(|_| l1_provider.next_l1_message()) .map(|tx| { let mut bytes = Vec::new(); tx.eip2718_encode(&mut bytes); @@ -56,31 +96,51 @@ pub fn derive( transactions: Some(txs), no_tx_pool: true, } - }) + }); + + Ok(iter) } #[cfg(test)] mod tests { use super::*; - use alloy_primitives::{address, bytes, U256}; - use scroll_codec::decoding::{test_utils::read_to_bytes, v0::decode_v0}; - use std::cell::RefCell; + use core::cell::RefCell; + use std::sync::Arc; + + use alloy_primitives::{address, b256, bytes, U256}; + use scroll_codec::decoding::test_utils::read_to_bytes; struct TestL1MessageProvider { messages: RefCell>, } + impl L1BlobProvider for TestL1MessageProvider { + fn blob(&self, _hash: B256) -> Option { + None + } + } + impl L1MessageProvider for TestL1MessageProvider { fn next_l1_message(&self) -> TxL1Message { self.messages.borrow_mut().remove(0) } + + fn set_index_cursor(&mut self, _index: u64) {} + + fn set_hash_cursor(&mut self, _hash: B256) {} } #[test] fn test_should_derive_batch() -> eyre::Result<()> { // https://etherscan.io/tx/0x8f4f0fcab656aa81589db5b53255094606c4624bfd99702b56b2debaf6211f48 let raw_calldata = read_to_bytes("./testdata/calldata_v0.bin")?; - let batch = decode_v0(&raw_calldata)?; + let batch_data = BatchCommitData { + hash: b256!("7f26edf8e3decbc1620b4d2ba5f010a6bdd10d6bb16430c4f458134e36ab3961"), + index: 12, + block_number: 18319648, + calldata: Arc::new(raw_calldata), + blob_versioned_hash: None, + }; let l1_messages = vec![TxL1Message { queue_index: 33, @@ -97,9 +157,9 @@ mod tests { sender: address!("7885BcBd5CeCEf1336b5300fb5186A12DDD8c478"), input: bytes!("8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf000000000000000000000000000000000000000000000000000470de4df820000000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f00000000000000000000000000000000000000000000000000470de4df8200000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), }]; - let provider = TestL1MessageProvider { messages: RefCell::new(l1_messages) }; + let mut provider = TestL1MessageProvider { messages: RefCell::new(l1_messages) }; - let mut attributes = derive(batch, &provider); + let mut attributes = derive(batch_data, &mut provider)?; let attribute = attributes.find(|a| a.payload_attributes.timestamp == 1696935384).unwrap(); let expected = ScrollPayloadAttributes{ diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index 286ccc8b..57ad62c9 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -1,7 +1,7 @@ //! A library responsible for indexing data relevant to the L1. use alloy_primitives::B256; use futures::Stream; -use rollup_node_primitives::{BatchInput, L1MessageWithBlockNumber}; +use rollup_node_primitives::{BatchCommitData, L1MessageWithBlockNumber}; use rollup_node_watcher::L1Notification; use scroll_db::{Database, DatabaseOperations}; use std::{ @@ -44,9 +44,9 @@ impl Indexer { )), L1Notification::NewBlock(_block_number) | L1Notification::Finalized(_block_number) => return, - L1Notification::BatchCommit(batch_input) => IndexerFuture::HandleBatchCommit( - Box::pin(Self::handle_batch_commit(self.database.clone(), batch_input)), - ), + L1Notification::BatchCommit(batch) => IndexerFuture::HandleBatchCommit(Box::pin( + Self::handle_batch_commit(self.database.clone(), batch), + )), L1Notification::L1Message(l1_message) => IndexerFuture::HandleL1Message(Box::pin( Self::handle_l1_message(self.database.clone(), l1_message), )), @@ -70,7 +70,7 @@ impl Indexer { let txn = database.tx().await?; // delete batch inputs and l1 messages - txn.delete_batch_inputs_gt(block_number).await?; + txn.delete_batches_gt(block_number).await?; txn.delete_l1_messages_gt(block_number).await?; // commit the transaction @@ -91,10 +91,10 @@ impl Indexer { /// Handles a batch input by inserting it into the database. async fn handle_batch_commit( database: Arc, - batch_input: BatchInput, + batch: BatchCommitData, ) -> Result { - let event = IndexerEvent::BatchCommitIndexed(batch_input.batch_index()); - database.insert_batch_input(batch_input).await?; + let event = IndexerEvent::BatchCommitIndexed(batch.index); + database.insert_batch(batch).await?; Ok(event) } @@ -105,7 +105,7 @@ impl Indexer { block_number: u64, ) -> Result { let event = IndexerEvent::BatchFinalizationIndexed(batch_hash); - database.finalize_batch_input(batch_hash, block_number).await?; + database.finalize_batch(batch_hash, block_number).await?; Ok(event) } } @@ -132,10 +132,11 @@ impl Stream for Indexer { #[cfg(test)] mod test { use super::*; + use arbitrary::{Arbitrary, Unstructured}; use futures::StreamExt; use rand::Rng; - use rollup_node_primitives::BatchInput; + use rollup_node_primitives::BatchCommitData; use scroll_db::test_utils::setup_test_db; async fn setup_test_indexer() -> (Indexer, Arc) { @@ -153,15 +154,14 @@ mod test { rand::rng().fill(bytes.as_mut_slice()); let mut u = Unstructured::new(&bytes); - let batch_input = BatchInput::arbitrary(&mut u).unwrap(); - indexer.handle_l1_notification(L1Notification::BatchCommit(batch_input.clone())); + let batch_commit = BatchCommitData::arbitrary(&mut u).unwrap(); + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit.clone())); let _ = indexer.next().await; - let batch_input_result = - db.get_batch_input_by_batch_index(batch_input.batch_index()).await.unwrap().unwrap(); + let batch_commit_result = db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); - assert_eq!(batch_input, batch_input_result); + assert_eq!(batch_commit, batch_commit_result); } #[tokio::test] @@ -196,22 +196,22 @@ mod test { let mut u = Unstructured::new(&bytes); // Generate a 3 random batch inputs and set their block numbers - let mut batch_input_block_1 = BatchInput::arbitrary(&mut u).unwrap(); - batch_input_block_1.set_block_number(1); - let batch_input_block_1 = batch_input_block_1; + let mut batch_commit_block_1 = BatchCommitData::arbitrary(&mut u).unwrap(); + batch_commit_block_1.block_number = 1; + let batch_commit_block_1 = batch_commit_block_1; - let mut batch_input_block_20 = BatchInput::arbitrary(&mut u).unwrap(); - batch_input_block_20.set_block_number(20); - let batch_input_block_20 = batch_input_block_20; + let mut batch_commit_block_20 = BatchCommitData::arbitrary(&mut u).unwrap(); + batch_commit_block_20.block_number = 20; + let batch_commit_block_20 = batch_commit_block_20; - let mut batch_input_block_30 = BatchInput::arbitrary(&mut u).unwrap(); - batch_input_block_30.set_block_number(30); - let batch_input_block_30 = batch_input_block_30; + let mut batch_commit_block_30 = BatchCommitData::arbitrary(&mut u).unwrap(); + batch_commit_block_30.block_number = 30; + let batch_commit_block_30 = batch_commit_block_30; // Index batch inputs - indexer.handle_l1_notification(L1Notification::BatchCommit(batch_input_block_1.clone())); - indexer.handle_l1_notification(L1Notification::BatchCommit(batch_input_block_20.clone())); - indexer.handle_l1_notification(L1Notification::BatchCommit(batch_input_block_30.clone())); + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_1.clone())); + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_20.clone())); + indexer.handle_l1_notification(L1Notification::BatchCommit(batch_commit_block_30.clone())); // Generate 3 random L1 messages and set their block numbers let mut l1_message_block_1 = L1MessageWithBlockNumber::arbitrary(&mut u).unwrap(); @@ -239,12 +239,12 @@ mod test { } // Check that the batch input at block 30 is deleted - let batch_inputs = - db.get_batch_inputs().await.unwrap().map(|res| res.unwrap()).collect::>().await; + let batch_commits = + db.get_batches().await.unwrap().map(|res| res.unwrap()).collect::>().await; - assert_eq!(2, batch_inputs.len()); - assert!(batch_inputs.contains(&batch_input_block_1)); - assert!(batch_inputs.contains(&batch_input_block_20)); + assert_eq!(2, batch_commits.len()); + assert!(batch_commits.contains(&batch_commit_block_1)); + assert!(batch_commits.contains(&batch_commit_block_20)); // check that the L1 message at block 30 is deleted let l1_messages = diff --git a/crates/l1/src/abi/logs.rs b/crates/l1/src/abi/logs.rs index 4befab5b..ea62c127 100644 --- a/crates/l1/src/abi/logs.rs +++ b/crates/l1/src/abi/logs.rs @@ -15,12 +15,11 @@ sol! { #[cfg_attr(feature = "test-utils", derive(arbitrary::Arbitrary))] #[derive(Debug)] - event CommitBatch(uint256 indexed batchIndex, bytes32 indexed batchHash); + event CommitBatch(uint256 indexed batch_index, bytes32 indexed batch_hash); #[cfg_attr(feature = "test-utils", derive(arbitrary::Arbitrary))] #[derive(Debug)] - event FinalizeBatch(uint256 indexed batchIndex, bytes32 indexed batchHash, bytes32 stateRoot, bytes32 withdrawRoot); - + event FinalizeBatch(uint256 indexed batch_index, bytes32 indexed batch_hash, bytes32 state_root, bytes32 withdraw_root); } /// Tries to decode the provided log into the type T. diff --git a/crates/primitives/src/batch.rs b/crates/primitives/src/batch.rs index 408bc65c..38465cfc 100644 --- a/crates/primitives/src/batch.rs +++ b/crates/primitives/src/batch.rs @@ -1,239 +1,44 @@ -use std::vec::Vec; +use std::sync::Arc; -use alloy_primitives::{BlockNumber, B256}; +use alloy_primitives::{BlockNumber, Bytes, B256}; /// The input data for a batch. /// /// This is used as input for the derivation pipeline. All data remains in its raw serialized form. /// The data is then deserialized, enriched and processed in the derivation pipeline. -#[derive(Debug, Clone, PartialEq, Eq, derive_more::From)] -pub enum BatchInput { - /// The input data for a batch. - BatchInputDataV1(BatchInputV1), - /// The input data for a batch including the L1 blob. - BatchInputDataV2(BatchInputV2), -} - -impl BatchInput { - /// Returns the coded (protocol) version of the batch input. - pub const fn version(&self) -> u8 { - match self { - Self::BatchInputDataV1(data) => data.version, - Self::BatchInputDataV2(data) => data.batch_input_base.version, - } - } - - /// Returns the index of the batch. - pub const fn batch_index(&self) -> u64 { - match self { - Self::BatchInputDataV1(data) => data.batch_index, - Self::BatchInputDataV2(data) => data.batch_input_base.batch_index, - } - } - - /// Returns the hash of the batch. - pub const fn batch_hash(&self) -> &B256 { - match self { - Self::BatchInputDataV1(data) => &data.batch_hash, - Self::BatchInputDataV2(data) => &data.batch_input_base.batch_hash, - } - } - - /// Sets the block number of the batch. - pub fn set_block_number(&mut self, block_number: BlockNumber) { - match self { - Self::BatchInputDataV1(data) => data.block_number = block_number, - Self::BatchInputDataV2(data) => data.batch_input_base.block_number = block_number, - } - } -} - -/// The input data for a batch. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct BatchInputV1 { - /// The version of the batch input data. - pub version: u8, +pub struct BatchCommitData { + /// The hash of the committed batch. + pub hash: B256, /// The index of the batch. - pub batch_index: u64, - /// The batch hash. - pub batch_hash: B256, - /// The L1 block number at which the batch was committed. - pub block_number: u64, - /// The parent batch header. - pub parent_batch_header: Vec, - /// The chunks in the batch. - pub chunks: Vec>, - /// The skipped L1 message bitmap. - pub skipped_l1_message_bitmap: Vec, -} - -/// The input data for a batch including the L1 blob hash. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct BatchInputV2 { - /// The base input data for the batch. - pub batch_input_base: BatchInputV1, - /// The L1 blob hash associated with the batch. - pub blob_hash: B256, -} - -/// A builder for the batch input. Determines the batch version based on the passed input. -#[derive(Debug)] -pub struct BatchInputBuilder { - /// The version of the batch input data. - version: u8, - /// The index of the batch. - batch_index: u64, - /// The batch hash. - batch_hash: B256, - /// The L1 block number at which the batch was committed. - block_number: u64, - /// The parent batch header. - parent_batch_header: Option>, - /// The chunks in the batch. - chunks: Option>>, - /// The skipped L1 message bitmap. - skipped_l1_message_bitmap: Option>, - /// The L1 blob hashes for the batch - blob_hashes: Option>, -} - -impl BatchInputBuilder { - /// Returns a new instance of the builder. - pub const fn new(version: u8, index: u64, hash: B256, block_number: BlockNumber) -> Self { - Self { - version, - batch_index: index, - batch_hash: hash, - block_number, - parent_batch_header: None, - chunks: None, - skipped_l1_message_bitmap: None, - blob_hashes: None, - } - } - - /// Adds chunks to the builder. - pub fn with_parent_batch_header(mut self, header: Option>) -> Self { - self.parent_batch_header = header; - self - } - - /// Adds chunks to the builder. - pub fn with_chunks(mut self, chunks: Option>>) -> Self { - self.chunks = chunks; - self - } - - /// Adds skipped l1 message bitmap to the builder. - pub fn with_skipped_l1_message_bitmap( - mut self, - skipped_l1_message_bitmap: Option>, - ) -> Self { - self.skipped_l1_message_bitmap = skipped_l1_message_bitmap; - self - } - - /// Adds a blob hash to the builder. - pub fn with_blob_hashes(mut self, blob_hashes: Option>) -> Self { - self.blob_hashes = blob_hashes; - self - } - - /// Build the [`BatchInput`], returning [`None`] if fields haven't been correctly set. - pub fn try_build(self) -> Option { - // handle fields required for all batch inputs. - let version = self.version; - let batch_index = self.batch_index; - let batch_hash = self.batch_hash; - let block_number = self.block_number; - - match ( - self.parent_batch_header, - self.chunks, - self.skipped_l1_message_bitmap, - self.blob_hashes, - ) { - (Some(parent_batch_header), Some(chunks), Some(skipped_l1_message_bitmap), None) => { - Some( - BatchInputV1 { - version, - batch_index, - batch_hash, - block_number, - parent_batch_header, - chunks, - skipped_l1_message_bitmap, - } - .into(), - ) - } - ( - Some(parent_batch_header), - Some(chunks), - Some(skipped_l1_message_bitmap), - Some(blob), - ) => { - let batch_input_data = BatchInputV1 { - version, - batch_index, - batch_hash, - block_number, - parent_batch_header, - chunks, - skipped_l1_message_bitmap, - }; - let blob_hash = blob.first().copied()?; - Some(BatchInputV2 { batch_input_base: batch_input_data, blob_hash }.into()) - } - (None, None, None, Some(_blobs)) => { - // TODO(greg): for now None but this will be used in Euclid. - None - } - _ => None, - } - } + pub index: u64, + /// The block number the batch was committed at. + pub block_number: BlockNumber, + /// The commit transaction calldata. + pub calldata: Arc, + /// The optional blob hash for the commit. + pub blob_versioned_hash: Option, } #[cfg(feature = "arbitrary")] mod arbitrary_impl { use super::*; - impl arbitrary::Arbitrary<'_> for BatchInput { + impl arbitrary::Arbitrary<'_> for BatchCommitData { fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { - let version = u.arbitrary::()? % 2; - match version { - 0 => Ok(Self::BatchInputDataV1(u.arbitrary()?)), - 1 => Ok(Self::BatchInputDataV2(u.arbitrary()?)), - _ => unreachable!(), - } - } - } - - impl arbitrary::Arbitrary<'_> for BatchInputV1 { - fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { - let version = u.arbitrary::()? % 8; let batch_index = u.arbitrary::()? as u64; let batch_hash = u.arbitrary::()?; let block_number = u.arbitrary::()? as u64; - let parent_batch_header = u.arbitrary::>()?; - let chunks = u.arbitrary::>>()?; - let skipped_l1_message_bitmap = u.arbitrary::>()?; + let bytes = u.arbitrary::()?; + let blob_hash = u.arbitrary::()?.then_some(u.arbitrary::()?); Ok(Self { - version, - batch_index, - batch_hash, + hash: batch_hash, + index: batch_index, block_number, - parent_batch_header, - chunks, - skipped_l1_message_bitmap, + calldata: Arc::new(bytes), + blob_versioned_hash: blob_hash, }) } } - - impl arbitrary::Arbitrary<'_> for BatchInputV2 { - fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { - Ok(Self { batch_input_base: u.arbitrary()?, blob_hash: u.arbitrary()? }) - } - } } diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index a28217ad..9260b167 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -7,7 +7,7 @@ extern crate alloc as std; pub use block::BlockInfo; mod block; -pub use batch::{BatchInput, BatchInputBuilder, BatchInputV1, BatchInputV2}; +pub use batch::BatchCommitData; mod batch; pub use bounded_vec::BoundedVec; diff --git a/crates/watcher/Cargo.toml b/crates/watcher/Cargo.toml index 21b2d4d5..fea89c5c 100644 --- a/crates/watcher/Cargo.toml +++ b/crates/watcher/Cargo.toml @@ -31,7 +31,7 @@ scroll-alloy-consensus.workspace = true # misc arbitrary = { workspace = true, optional = true } async-trait.workspace = true -derive_more = { workspace = true, features = ["from"] } +itertools = "0.14" rand = { workspace = true, optional = true } thiserror.workspace = true tokio = { workspace = true, features = ["full"] } diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 820a3f46..eb2ac88b 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -15,16 +15,14 @@ pub mod test_utils; use std::{sync::Arc, time::Duration}; use alloy_network::Ethereum; -use alloy_primitives::{BlockNumber, B256}; +use alloy_primitives::{ruint::UintTryTo, BlockNumber, B256}; use alloy_provider::{Network, Provider}; use alloy_rpc_types_eth::{BlockNumberOrTag, Log, TransactionTrait}; use error::L1WatcherResult; -use rollup_node_primitives::{BatchInput, BatchInputBuilder, BoundedVec, L1MessageWithBlockNumber}; +use itertools::Itertools; +use rollup_node_primitives::{BatchCommitData, BoundedVec, L1MessageWithBlockNumber}; use scroll_alloy_consensus::TxL1Message; -use scroll_l1::abi::{ - calls::CommitBatchCall, - logs::{try_decode_log, CommitBatch, FinalizeBatch, QueueTransaction}, -}; +use scroll_l1::abi::logs::{try_decode_log, CommitBatch, FinalizeBatch, QueueTransaction}; use tokio::sync::mpsc; /// The block range used to fetch L1 logs. @@ -84,7 +82,7 @@ pub enum L1Notification { /// A notification for a reorg of the L1 up to a given block number. Reorg(u64), /// A new batch has been committed on the L1 rollup contract. - BatchCommit(BatchInput), + BatchCommit(BatchCommitData), /// A new batch has been finalized on the L1 rollup contract. BatchFinalization { /// The hash of the finalized batch. @@ -200,7 +198,7 @@ where // shortcircuit. if self.unfinalized_blocks.is_empty() { tracing::trace!(target: "scroll::watcher", "no unfinalized blocks"); - return + return; } let tail_block = self.unfinalized_blocks.last().expect("tail exists"); @@ -208,7 +206,7 @@ where // clear, the finalized block is past the tail. tracing::trace!(target: "scroll::watcher", tail = ?tail_block.number, finalized = ?finalized.number, "draining all unfinalized blocks"); self.unfinalized_blocks.clear(); - return + return; } let finalized_block_position = @@ -235,7 +233,7 @@ where let tail = self.unfinalized_blocks.last(); if tail.is_some_and(|h| h.hash == latest.hash) { - return Ok(()) + return Ok(()); } else if tail.is_some_and(|h| h.hash == latest.parent_hash) { // latest block extends the tip. tracing::trace!(target: "scroll::watcher", number = ?latest.number, hash = ?latest.hash, "block extends chain"); @@ -299,43 +297,51 @@ where #[tracing::instrument(skip_all)] async fn handle_batch_commits(&self, logs: &[Log]) -> L1WatcherResult<()> { // filter commit logs. - let commit_logs_with_tx = - logs.iter().map(|l| (l, l.transaction_hash)).filter_map(|(log, tx_hash)| { + let mut commit_logs_with_tx = logs + .iter() + .map(|l| (l, l.transaction_hash)) + .filter_map(|(log, tx_hash)| { + let tx_hash = tx_hash?; try_decode_log::(&log.inner) .map(|decoded| (log, decoded.data, tx_hash)) - }); + }) + .collect::>(); + + // sort the commits and group by tx hash. + commit_logs_with_tx + .sort_by(|(_, data_a, _), (_, data_b, _)| data_a.batch_index.cmp(&data_b.batch_index)); + let groups = commit_logs_with_tx.into_iter().chunk_by(|(_, _, hash)| *hash); + let groups: Vec<_> = + groups.into_iter().map(|(hash, group)| (hash, group.collect::>())).collect(); - for (raw_log, decoded_log, maybe_tx_hash) in commit_logs_with_tx { + for (tx_hash, group) in groups { // fetch the commit transaction. - let tx_hash = maybe_tx_hash.ok_or(FilterLogError::MissingTransactionHash)?; let transaction = self .execution_provider .get_transaction_by_hash(tx_hash) .await? .ok_or(EthRequestError::MissingTransactionHash(tx_hash))?; - // decode the transaction's input into a commit batch call. - let commit_info = CommitBatchCall::try_decode(transaction.inner.input()); - if let Some(info) = commit_info { - let batch_index: u64 = decoded_log.batchIndex.saturating_to(); + // get the optional blobs and calldata. + let mut blob_versioned_hashes = + transaction.blob_versioned_hashes().unwrap_or(&[]).iter().copied(); + let input = Arc::new(transaction.input().clone()); + + for (raw_log, decoded_log, _) in group { let block_number = raw_log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; - let batch_hash = decoded_log.batchHash; - let blob_hashes = transaction.blob_versioned_hashes().map(|blobs| blobs.to_vec()); - tracing::trace!(target: "scroll::watcher", commit_batch = ?decoded_log, ?block_number); - - // feed all batch information to the batch input builder. - let batch_builder = - BatchInputBuilder::new(info.version(), batch_index, batch_hash, block_number) - .with_parent_batch_header(info.parent_batch_header()) - .with_chunks(info.chunks().map(|c| c.iter().map(|c| c.to_vec()).collect())) - .with_skipped_l1_message_bitmap(info.skipped_l1_message_bitmap()) - .with_blob_hashes(blob_hashes); - - // if builder can build a batch input from data, notify via channel. - if let Some(batch_input) = batch_builder.try_build() { - self.notify(L1Notification::BatchCommit(batch_input)).await; - } + let batch_index = + decoded_log.batch_index.uint_try_to().expect("u256 to u64 conversion error"); + + // notify via channel. + self.notify(L1Notification::BatchCommit(BatchCommitData { + hash: decoded_log.batch_hash, + index: batch_index, + block_number, + calldata: input.clone(), + blob_versioned_hash: blob_versioned_hashes.next(), + })) + .await; } } Ok(()) @@ -359,7 +365,7 @@ where let _ = self .sender .send(Arc::new(L1Notification::BatchFinalization { - hash: decoded_log.batchHash, + hash: decoded_log.batch_hash, block_number, })) .await; @@ -472,7 +478,7 @@ mod tests { use crate::test_utils::provider::MockProvider; use alloy_consensus::{transaction::Recovered, Signed, TxEip1559}; - use alloy_primitives::Address; + use alloy_primitives::{Address, U256}; use alloy_rpc_types_eth::Transaction; use alloy_sol_types::{SolCall, SolEvent}; use arbitrary::Arbitrary; @@ -773,7 +779,9 @@ mod tests { let mut logs = (0..10).map(|_| random!(Log)).collect::>(); let mut batch_commit = random!(Log); let mut inner_log = random!(alloy_primitives::Log); - inner_log.data = random!(CommitBatch).encode_log_data(); + inner_log.data = + CommitBatch { batch_index: U256::from(random!(u64)), batch_hash: random!(B256) } + .encode_log_data(); batch_commit.inner = inner_log; batch_commit.transaction_hash = Some(*tx.inner.tx_hash()); batch_commit.block_number = Some(random!(u64)); @@ -784,7 +792,7 @@ mod tests { // Then let notification = receiver.recv().await.unwrap(); - assert!(matches!(*notification, L1Notification::BatchCommit(_))); + assert!(matches!(*notification, L1Notification::BatchCommit { .. })); Ok(()) }