diff --git a/lib/l1_watcher/src/batch_range_watcher.rs b/lib/l1_watcher/src/batch_range_watcher.rs index 0471d7bf..2748a7b4 100644 --- a/lib/l1_watcher/src/batch_range_watcher.rs +++ b/lib/l1_watcher/src/batch_range_watcher.rs @@ -1,17 +1,13 @@ use crate::watcher::{L1Watcher, L1WatcherError}; use crate::{L1WatcherConfig, ProcessL1Event, util}; -use alloy::consensus::Transaction; use alloy::eips::BlockId; -use alloy::primitives::{Address, B256, BlockNumber, TxHash}; +use alloy::primitives::{Address, B256, BlockNumber}; use alloy::providers::{DynProvider, Provider}; -use alloy::rpc::types::{Filter, Log}; -use alloy::sol_types::SolEvent; +use alloy::rpc::types::Log; use anyhow::Context; use tokio::sync::mpsc; -use zksync_os_batch_types::BatchInfo; use zksync_os_contract_interface::IExecutor::ReportCommittedBatchRangeZKsyncOS; use zksync_os_contract_interface::ZkChain; -use zksync_os_contract_interface::calldata::CommitCalldata; use zksync_os_contract_interface::models::{CommitBatchInfo, StoredBatchInfo}; use zksync_os_types::ProtocolSemanticVersion; @@ -76,9 +72,9 @@ impl BatchRangeWatcher { last_committed_batch_on_startup: last_committed_batch, batch_ranges_sender, }; - let last_executed_batch_data = this - .fetch_stored_batch_data(last_l1_block, last_executed_batch) - .await?; + let last_executed_batch_data = + util::fetch_stored_batch_data(&this.zk_chain, last_l1_block, last_executed_batch) + .await?; let l1_watcher = L1Watcher::new( provider, @@ -95,82 +91,6 @@ impl BatchRangeWatcher { last_executed_batch_data, }) } - - /// Fetches and decodes batch commit transaction. Fails if transaction does not exist or is not - /// a valid commit transaction. - async fn fetch_commit_calldata( - &self, - tx_hash: TxHash, - ) -> Result { - // todo: retry-backoff logic in case tx is missing - let tx = self - .zk_chain - .provider() - .get_transaction_by_hash(tx_hash) - .await? - .expect("tx not found"); - let CommitCalldata { - commit_batch_info, .. - } = CommitCalldata::decode(tx.input()).map_err(L1WatcherError::Other)?; - - // L1 block where this batch got committed. - let l1_block_id = BlockId::number( - tx.block_number - .expect("mined transaction has no block number"), - ); - CommittedBatch::fetch(&self.zk_chain, commit_batch_info, l1_block_id).await - } - - /// Fetches and decodes stored batch data for batch `batch_number` that is expected to have been - /// committed in `l1_block_number`. Returns `None` if requested batch has not been committed in - /// the given L1 block. - async fn fetch_stored_batch_data( - &self, - l1_block_number: BlockNumber, - batch_number: u64, - ) -> anyhow::Result> { - let logs = self - .zk_chain - .provider() - .get_logs( - &Filter::new() - .address(*self.zk_chain.address()) - .event_signature(ReportCommittedBatchRangeZKsyncOS::SIGNATURE_HASH) - .from_block(l1_block_number) - .to_block(l1_block_number), - ) - .await?; - let Some((log, tx_hash)) = logs.into_iter().find_map(|log| { - let batch_log = ReportCommittedBatchRangeZKsyncOS::decode_log(&log.inner) - .expect("unable to decode `ReportCommittedBatchRangeZKsyncOS` log"); - if batch_log.batchNumber == batch_number { - Some(( - batch_log, - log.transaction_hash.expect("indexed log without tx hash"), - )) - } else { - None - } - }) else { - return Ok(None); - }; - let committed_batch = self.fetch_commit_calldata(tx_hash).await?; - - // todo: stop using this struct once fully migrated from S3 - let last_executed_batch_info = BatchInfo { - commit_info: committed_batch.commit_info, - chain_address: Default::default(), - upgrade_tx_hash: committed_batch.upgrade_tx_hash, - blob_sidecar: None, - }; - let batch_info = last_executed_batch_info.into_stored(&committed_batch.protocol_version); - - Ok(Some(StoredBatchData { - batch_info, - first_block_number: log.firstBlockNumber, - last_block_number: log.lastBlockNumber, - })) - } } #[async_trait::async_trait] @@ -212,7 +132,7 @@ impl ProcessL1Event for BatchRangeWatcher { tracing::trace!(batch_number, "batch is outside of range of interest"); } else { let tx_hash = log.transaction_hash.expect("indexed log without tx hash"); - let committed_batch = self.fetch_commit_calldata(tx_hash).await?; + let committed_batch = util::fetch_commit_calldata(&self.zk_chain, tx_hash).await?; if self.next_batch_number != committed_batch.commit_info.batch_number { return Err(L1WatcherError::Other(anyhow::anyhow!( @@ -254,7 +174,7 @@ pub struct CommittedBatch { impl CommittedBatch { /// Fetches extra information that is not available inside `CommitBatchInfo` from L1 to construct /// `CommitedBatch`. Requires `l1_block_id` where the batch was committed. - async fn fetch( + pub async fn fetch( zk_chain: &ZkChain, commit_batch_info: CommitBatchInfo, l1_block_id: BlockId, diff --git a/lib/l1_watcher/src/commit_watcher.rs b/lib/l1_watcher/src/commit_watcher.rs index 45adae5b..61ced27d 100644 --- a/lib/l1_watcher/src/commit_watcher.rs +++ b/lib/l1_watcher/src/commit_watcher.rs @@ -3,23 +3,22 @@ use crate::{L1WatcherConfig, ProcessL1Event, util}; use alloy::primitives::Address; use alloy::providers::{DynProvider, Provider}; use alloy::rpc::types::Log; -use zksync_os_contract_interface::IExecutor::BlockCommit; +use zksync_os_contract_interface::IExecutor::{BlockCommit, ReportCommittedBatchRangeZKsyncOS}; use zksync_os_contract_interface::ZkChain; -use zksync_os_storage_api::{ReadBatch, WriteFinality}; +use zksync_os_storage_api::WriteFinality; -pub struct L1CommitWatcher { +pub struct L1CommitWatcher { + provider: DynProvider, contract_address: Address, next_batch_number: u64, finality: Finality, - batch_storage: BatchStorage, } -impl L1CommitWatcher { +impl L1CommitWatcher { pub async fn create_watcher( config: L1WatcherConfig, zk_chain: ZkChain, finality: Finality, - batch_storage: BatchStorage, ) -> anyhow::Result { let current_l1_block = zk_chain.provider().get_block_number().await?; let last_committed_batch = finality.get_finality_status().last_committed_batch; @@ -40,10 +39,10 @@ impl L1CommitWatcher L1CommitWatcher ProcessL1Event - for L1CommitWatcher -{ +impl ProcessL1Event for L1CommitWatcher { const NAME: &'static str = "block_commit"; type SolEvent = BlockCommit; @@ -75,7 +72,7 @@ impl ProcessL1Event async fn process_event( &mut self, batch_commit: BlockCommit, - _log: Log, + log: Log, ) -> Result<(), L1WatcherError> { let batch_number = batch_commit.batchNumber.to::(); let batch_hash = batch_commit.batchHash; @@ -94,12 +91,29 @@ impl ProcessL1Event ?batch_commitment, "discovered committed batch" ); - let (_, last_committed_block) = self - .batch_storage - .get_batch_range_by_number(batch_number) - .await - .map_err(L1WatcherError::Batch)? - .expect("committed batch is missing"); + let tx_hash = log + .transaction_hash + .expect("indexed log does not belong to any transaction"); + // todo: retry-backoff logic in case tx is missing + let tx_receipt = self + .provider + .get_transaction_receipt(tx_hash) + .await? + .expect("tx not found"); + let report = tx_receipt + .inner + .into_logs() + .into_iter() + .find_map(|log| { + let log = log.log_decode::().ok()?; + if log.inner.data.batchNumber == batch_number { + Some(log) + } else { + None + } + }) + .expect("report range not found"); + let last_committed_block = report.inner.lastBlockNumber; self.finality.update_finality_status(|finality| { assert!( batch_number > finality.last_committed_batch, diff --git a/lib/l1_watcher/src/execute_watcher.rs b/lib/l1_watcher/src/execute_watcher.rs index 5c241861..9d7d60d3 100644 --- a/lib/l1_watcher/src/execute_watcher.rs +++ b/lib/l1_watcher/src/execute_watcher.rs @@ -3,23 +3,23 @@ use crate::{L1WatcherConfig, ProcessL1Event, util}; use alloy::primitives::Address; use alloy::providers::{DynProvider, Provider}; use alloy::rpc::types::Log; +use anyhow::Context; use zksync_os_contract_interface::IExecutor::BlockExecution; use zksync_os_contract_interface::ZkChain; -use zksync_os_storage_api::{ReadBatch, WriteFinality}; +use zksync_os_storage_api::WriteFinality; -pub struct L1ExecuteWatcher { - contract_address: Address, +pub struct L1ExecuteWatcher { + zk_chain: ZkChain, next_batch_number: u64, finality: Finality, - batch_storage: BatchStorage, + max_blocks_to_process: u64, } -impl L1ExecuteWatcher { +impl L1ExecuteWatcher { pub async fn create_watcher( config: L1WatcherConfig, zk_chain: ZkChain, finality: Finality, - batch_storage: BatchStorage, ) -> anyhow::Result { let current_l1_block = zk_chain.provider().get_block_number().await?; let last_executed_batch = finality.get_finality_status().last_executed_batch; @@ -37,10 +37,10 @@ impl L1ExecuteWatcher L1ExecuteWatcher ProcessL1Event - for L1ExecuteWatcher -{ +impl ProcessL1Event for L1ExecuteWatcher { const NAME: &'static str = "block_execution"; type SolEvent = BlockExecution; type WatchedEvent = BlockExecution; fn contract_address(&self) -> Address { - self.contract_address + *self.zk_chain.address() } async fn process_event( @@ -85,12 +83,18 @@ impl ProcessL1Event "skipping already processed executed batch", ); } else { - let (_, last_executed_block) = self - .batch_storage - .get_batch_range_by_number(batch_number) - .await - .map_err(L1WatcherError::Batch)? - .expect("executed batch is missing"); + // todo: This can take a while. For the majority of batches we have already pulled the + // range in commit watcher, we should find a way to reuse it. + let batch = util::find_stored_batch_data_by_batch_number( + &self.zk_chain, + batch_number, + self.max_blocks_to_process, + ) + .await + .unwrap() + .context("could not find where batch was committed on L1") + .unwrap(); + let last_executed_block = batch.last_block_number; self.finality.update_finality_status(|finality| { assert!( batch_number > finality.last_executed_batch, diff --git a/lib/l1_watcher/src/util.rs b/lib/l1_watcher/src/util.rs index 8e109489..bd3173d8 100644 --- a/lib/l1_watcher/src/util.rs +++ b/lib/l1_watcher/src/util.rs @@ -1,8 +1,15 @@ -use alloy::primitives::BlockNumber; +use crate::watcher::L1WatcherError; +use crate::{CommittedBatch, StoredBatchData}; +use alloy::consensus::Transaction; +use alloy::eips::BlockId; +use alloy::primitives::{BlockNumber, TxHash}; use alloy::providers::{DynProvider, Provider}; use alloy::rpc::types::Filter; use alloy::sol_types::SolEvent; use std::sync::Arc; +use zksync_os_batch_types::BatchInfo; +use zksync_os_contract_interface::IExecutor::ReportCommittedBatchRangeZKsyncOS; +use zksync_os_contract_interface::calldata::CommitCalldata; use zksync_os_contract_interface::{IExecutor, ZkChain}; pub const ANVIL_L1_CHAIN_ID: u64 = 31337; @@ -215,3 +222,89 @@ pub async fn find_l1_execute_block_by_batch_number( }) .await } + +/// Fetches and decodes stored batch data for batch `batch_number` that is expected to have been +/// committed in `l1_block_number`. Returns `None` if requested batch has not been committed in +/// the given L1 block. +pub async fn fetch_stored_batch_data( + zk_chain: &ZkChain, + l1_block_number: BlockNumber, + batch_number: u64, +) -> anyhow::Result> { + let logs = zk_chain + .provider() + .get_logs( + &Filter::new() + .address(*zk_chain.address()) + .event_signature(ReportCommittedBatchRangeZKsyncOS::SIGNATURE_HASH) + .from_block(l1_block_number) + .to_block(l1_block_number), + ) + .await?; + let Some((log, tx_hash)) = logs.into_iter().find_map(|log| { + let batch_log = ReportCommittedBatchRangeZKsyncOS::decode_log(&log.inner) + .expect("unable to decode `ReportCommittedBatchRangeZKsyncOS` log"); + if batch_log.batchNumber == batch_number { + Some(( + batch_log, + log.transaction_hash.expect("indexed log without tx hash"), + )) + } else { + None + } + }) else { + return Ok(None); + }; + let committed_batch = fetch_commit_calldata(zk_chain, tx_hash).await?; + + // todo: stop using this struct once fully migrated from S3 + let last_executed_batch_info = BatchInfo { + commit_info: committed_batch.commit_info, + chain_address: Default::default(), + upgrade_tx_hash: committed_batch.upgrade_tx_hash, + blob_sidecar: None, + }; + let batch_info = last_executed_batch_info.into_stored(&committed_batch.protocol_version); + + Ok(Some(StoredBatchData { + batch_info, + first_block_number: log.firstBlockNumber, + last_block_number: log.lastBlockNumber, + })) +} + +/// Finds and decodes stored batch data for batch `batch_number`. Returns `None` if there is none. +pub async fn find_stored_batch_data_by_batch_number( + zk_chain: &ZkChain, + batch_number: u64, + max_l1_blocks_to_scan: u64, +) -> anyhow::Result> { + let l1_block_with_commit = + find_l1_commit_block_by_batch_number(zk_chain.clone(), batch_number, max_l1_blocks_to_scan) + .await?; + fetch_stored_batch_data(zk_chain, l1_block_with_commit, batch_number).await +} + +/// Fetches and decodes batch commit transaction. Fails if transaction does not exist or is not +/// a valid commit transaction. +pub async fn fetch_commit_calldata( + zk_chain: &ZkChain, + tx_hash: TxHash, +) -> Result { + // todo: retry-backoff logic in case tx is missing + let tx = zk_chain + .provider() + .get_transaction_by_hash(tx_hash) + .await? + .expect("tx not found"); + let CommitCalldata { + commit_batch_info, .. + } = CommitCalldata::decode(tx.input()).map_err(L1WatcherError::Other)?; + + // L1 block where this batch got committed. + let l1_block_id = BlockId::number( + tx.block_number + .expect("mined transaction has no block number"), + ); + CommittedBatch::fetch(zk_chain, commit_batch_info, l1_block_id).await +} diff --git a/node/bin/src/lib.rs b/node/bin/src/lib.rs index cfaaff04..b0b948fe 100644 --- a/node/bin/src/lib.rs +++ b/node/bin/src/lib.rs @@ -254,7 +254,12 @@ pub async fn run(name: &'static str) -> impl Fn(Result (u64, u64, u64) { + max_l1_blocks_to_scan: u64, +) -> anyhow::Result<(u64, u64, u64)> { let last_committed_block = if l1_state.last_committed_batch == 0 { 0 } else { - batch_storage - .get_batch_with_proof(l1_state.last_committed_batch) - .await - .expect("Failed to get last committed block from proof storage") - .map(|envelope| envelope.batch.last_block_number) - .expect("Committed batch is not present in proof storage") + zksync_os_l1_watcher::util::find_stored_batch_data_by_batch_number( + &l1_state.diamond_proxy, + l1_state.last_committed_batch, + max_l1_blocks_to_scan, + ) + .await? + .context("could not find where last committed batch was committed")? + .last_block_number }; // only used to log on node startup let last_proved_block = if l1_state.last_proved_batch == 0 { 0 } else { - batch_storage - .get_batch_with_proof(l1_state.last_proved_batch) - .await - .expect("Failed to get last proved block from proof storage") - .map(|envelope| envelope.batch.last_block_number) - .expect("Proved batch is not present in proof storage") + zksync_os_l1_watcher::util::find_stored_batch_data_by_batch_number( + &l1_state.diamond_proxy, + l1_state.last_proved_batch, + max_l1_blocks_to_scan, + ) + .await? + .context("could not find where last proved batch was committed")? + .last_block_number }; let last_executed_block = if l1_state.last_executed_batch == 0 { 0 } else { - batch_storage - .get_batch_with_proof(l1_state.last_executed_batch) - .await - .expect("Failed to get last proved block from execute storage") - .map(|envelope| envelope.batch.last_block_number) - .expect("Execute batch is not present in proof storage") + zksync_os_l1_watcher::util::find_stored_batch_data_by_batch_number( + &l1_state.diamond_proxy, + l1_state.last_executed_batch, + max_l1_blocks_to_scan, + ) + .await? + .context("could not find where last executed batch was committed")? + .last_block_number }; - (last_committed_block, last_proved_block, last_executed_block) + Ok((last_committed_block, last_proved_block, last_executed_block)) } fn run_fake_snark_provers(