Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 7 additions & 87 deletions lib/l1_watcher/src/batch_range_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use crate::watcher::{L1Watcher, L1WatcherError};
use crate::{L1WatcherConfig, ProcessL1Event, util};
use alloy::consensus::Transaction;
use alloy::eips::BlockId;
use alloy::primitives::{Address, B256, BlockNumber, TxHash};
use alloy::primitives::{Address, B256, BlockNumber};
use alloy::providers::{DynProvider, Provider};
use alloy::rpc::types::{Filter, Log};
use alloy::sol_types::SolEvent;
use alloy::rpc::types::Log;
use anyhow::Context;
use tokio::sync::mpsc;
use zksync_os_batch_types::BatchInfo;
use zksync_os_contract_interface::IExecutor::ReportCommittedBatchRangeZKsyncOS;
use zksync_os_contract_interface::ZkChain;
use zksync_os_contract_interface::calldata::CommitCalldata;
use zksync_os_contract_interface::models::{CommitBatchInfo, StoredBatchInfo};
use zksync_os_types::ProtocolSemanticVersion;

Expand Down Expand Up @@ -76,9 +72,9 @@ impl BatchRangeWatcher {
last_committed_batch_on_startup: last_committed_batch,
batch_ranges_sender,
};
let last_executed_batch_data = this
.fetch_stored_batch_data(last_l1_block, last_executed_batch)
.await?;
let last_executed_batch_data =
util::fetch_stored_batch_data(&this.zk_chain, last_l1_block, last_executed_batch)
.await?;

let l1_watcher = L1Watcher::new(
provider,
Expand All @@ -95,82 +91,6 @@ impl BatchRangeWatcher {
last_executed_batch_data,
})
}

/// Fetches and decodes batch commit transaction. Fails if transaction does not exist or is not
/// a valid commit transaction.
async fn fetch_commit_calldata(
&self,
tx_hash: TxHash,
) -> Result<CommittedBatch, L1WatcherError> {
// todo: retry-backoff logic in case tx is missing
let tx = self
.zk_chain
.provider()
.get_transaction_by_hash(tx_hash)
.await?
.expect("tx not found");
let CommitCalldata {
commit_batch_info, ..
} = CommitCalldata::decode(tx.input()).map_err(L1WatcherError::Other)?;

// L1 block where this batch got committed.
let l1_block_id = BlockId::number(
tx.block_number
.expect("mined transaction has no block number"),
);
CommittedBatch::fetch(&self.zk_chain, commit_batch_info, l1_block_id).await
}

/// Fetches and decodes stored batch data for batch `batch_number` that is expected to have been
/// committed in `l1_block_number`. Returns `None` if requested batch has not been committed in
/// the given L1 block.
async fn fetch_stored_batch_data(
&self,
l1_block_number: BlockNumber,
batch_number: u64,
) -> anyhow::Result<Option<StoredBatchData>> {
let logs = self
.zk_chain
.provider()
.get_logs(
&Filter::new()
.address(*self.zk_chain.address())
.event_signature(ReportCommittedBatchRangeZKsyncOS::SIGNATURE_HASH)
.from_block(l1_block_number)
.to_block(l1_block_number),
)
.await?;
let Some((log, tx_hash)) = logs.into_iter().find_map(|log| {
let batch_log = ReportCommittedBatchRangeZKsyncOS::decode_log(&log.inner)
.expect("unable to decode `ReportCommittedBatchRangeZKsyncOS` log");
if batch_log.batchNumber == batch_number {
Some((
batch_log,
log.transaction_hash.expect("indexed log without tx hash"),
))
} else {
None
}
}) else {
return Ok(None);
};
let committed_batch = self.fetch_commit_calldata(tx_hash).await?;

// todo: stop using this struct once fully migrated from S3
let last_executed_batch_info = BatchInfo {
commit_info: committed_batch.commit_info,
chain_address: Default::default(),
upgrade_tx_hash: committed_batch.upgrade_tx_hash,
blob_sidecar: None,
};
let batch_info = last_executed_batch_info.into_stored(&committed_batch.protocol_version);

Ok(Some(StoredBatchData {
batch_info,
first_block_number: log.firstBlockNumber,
last_block_number: log.lastBlockNumber,
}))
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -212,7 +132,7 @@ impl ProcessL1Event for BatchRangeWatcher {
tracing::trace!(batch_number, "batch is outside of range of interest");
} else {
let tx_hash = log.transaction_hash.expect("indexed log without tx hash");
let committed_batch = self.fetch_commit_calldata(tx_hash).await?;
let committed_batch = util::fetch_commit_calldata(&self.zk_chain, tx_hash).await?;

if self.next_batch_number != committed_batch.commit_info.batch_number {
return Err(L1WatcherError::Other(anyhow::anyhow!(
Expand Down Expand Up @@ -254,7 +174,7 @@ pub struct CommittedBatch {
impl CommittedBatch {
/// Fetches extra information that is not available inside `CommitBatchInfo` from L1 to construct
/// `CommitedBatch`. Requires `l1_block_id` where the batch was committed.
async fn fetch(
pub async fn fetch(
zk_chain: &ZkChain<DynProvider>,
commit_batch_info: CommitBatchInfo,
l1_block_id: BlockId,
Expand Down
48 changes: 31 additions & 17 deletions lib/l1_watcher/src/commit_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ use crate::{L1WatcherConfig, ProcessL1Event, util};
use alloy::primitives::Address;
use alloy::providers::{DynProvider, Provider};
use alloy::rpc::types::Log;
use zksync_os_contract_interface::IExecutor::BlockCommit;
use zksync_os_contract_interface::IExecutor::{BlockCommit, ReportCommittedBatchRangeZKsyncOS};
use zksync_os_contract_interface::ZkChain;
use zksync_os_storage_api::{ReadBatch, WriteFinality};
use zksync_os_storage_api::WriteFinality;

pub struct L1CommitWatcher<Finality, BatchStorage> {
pub struct L1CommitWatcher<Finality> {
provider: DynProvider,
contract_address: Address,
next_batch_number: u64,
finality: Finality,
batch_storage: BatchStorage,
}

impl<Finality: WriteFinality, BatchStorage: ReadBatch> L1CommitWatcher<Finality, BatchStorage> {
impl<Finality: WriteFinality> L1CommitWatcher<Finality> {
pub async fn create_watcher(
config: L1WatcherConfig,
zk_chain: ZkChain<DynProvider>,
finality: Finality,
batch_storage: BatchStorage,
) -> anyhow::Result<L1Watcher> {
let current_l1_block = zk_chain.provider().get_block_number().await?;
let last_committed_batch = finality.get_finality_status().last_committed_batch;
Expand All @@ -40,10 +39,10 @@ impl<Finality: WriteFinality, BatchStorage: ReadBatch> L1CommitWatcher<Finality,
tracing::info!(last_l1_block, "resolved on L1");

let this = Self {
provider: zk_chain.provider().clone(),
contract_address: *zk_chain.address(),
next_batch_number: last_committed_batch + 1,
finality,
batch_storage,
};
let l1_watcher = L1Watcher::new(
zk_chain.provider().clone(),
Expand All @@ -60,9 +59,7 @@ impl<Finality: WriteFinality, BatchStorage: ReadBatch> L1CommitWatcher<Finality,
}

#[async_trait::async_trait]
impl<Finality: WriteFinality, BatchStorage: ReadBatch> ProcessL1Event
for L1CommitWatcher<Finality, BatchStorage>
{
impl<Finality: WriteFinality> ProcessL1Event for L1CommitWatcher<Finality> {
const NAME: &'static str = "block_commit";

type SolEvent = BlockCommit;
Expand All @@ -75,7 +72,7 @@ impl<Finality: WriteFinality, BatchStorage: ReadBatch> ProcessL1Event
async fn process_event(
&mut self,
batch_commit: BlockCommit,
_log: Log,
log: Log,
) -> Result<(), L1WatcherError> {
let batch_number = batch_commit.batchNumber.to::<u64>();
let batch_hash = batch_commit.batchHash;
Expand All @@ -94,12 +91,29 @@ impl<Finality: WriteFinality, BatchStorage: ReadBatch> ProcessL1Event
?batch_commitment,
"discovered committed batch"
);
let (_, last_committed_block) = self
.batch_storage
.get_batch_range_by_number(batch_number)
.await
.map_err(L1WatcherError::Batch)?
.expect("committed batch is missing");
let tx_hash = log
.transaction_hash
.expect("indexed log does not belong to any transaction");
// todo: retry-backoff logic in case tx is missing
let tx_receipt = self
.provider
.get_transaction_receipt(tx_hash)
.await?
.expect("tx not found");
let report = tx_receipt
.inner
.into_logs()
.into_iter()
.find_map(|log| {
let log = log.log_decode::<ReportCommittedBatchRangeZKsyncOS>().ok()?;
if log.inner.data.batchNumber == batch_number {
Some(log)
} else {
None
}
})
.expect("report range not found");
let last_committed_block = report.inner.lastBlockNumber;
self.finality.update_finality_status(|finality| {
assert!(
batch_number > finality.last_committed_batch,
Expand Down
40 changes: 22 additions & 18 deletions lib/l1_watcher/src/execute_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ use crate::{L1WatcherConfig, ProcessL1Event, util};
use alloy::primitives::Address;
use alloy::providers::{DynProvider, Provider};
use alloy::rpc::types::Log;
use anyhow::Context;
use zksync_os_contract_interface::IExecutor::BlockExecution;
use zksync_os_contract_interface::ZkChain;
use zksync_os_storage_api::{ReadBatch, WriteFinality};
use zksync_os_storage_api::WriteFinality;

pub struct L1ExecuteWatcher<Finality, BatchStorage> {
contract_address: Address,
pub struct L1ExecuteWatcher<Finality> {
zk_chain: ZkChain<DynProvider>,
next_batch_number: u64,
finality: Finality,
batch_storage: BatchStorage,
max_blocks_to_process: u64,
}

impl<Finality: WriteFinality, BatchStorage: ReadBatch> L1ExecuteWatcher<Finality, BatchStorage> {
impl<Finality: WriteFinality> L1ExecuteWatcher<Finality> {
pub async fn create_watcher(
config: L1WatcherConfig,
zk_chain: ZkChain<DynProvider>,
finality: Finality,
batch_storage: BatchStorage,
) -> anyhow::Result<L1Watcher> {
let current_l1_block = zk_chain.provider().get_block_number().await?;
let last_executed_batch = finality.get_finality_status().last_executed_batch;
Expand All @@ -37,10 +37,10 @@ impl<Finality: WriteFinality, BatchStorage: ReadBatch> L1ExecuteWatcher<Finality
tracing::info!(last_l1_block, "resolved on L1");

let this = Self {
contract_address: *zk_chain.address(),
zk_chain: zk_chain.clone(),
next_batch_number: last_executed_batch + 1,
finality,
batch_storage,
max_blocks_to_process: config.max_blocks_to_process,
};
let l1_watcher = L1Watcher::new(
zk_chain.provider().clone(),
Expand All @@ -57,16 +57,14 @@ impl<Finality: WriteFinality, BatchStorage: ReadBatch> L1ExecuteWatcher<Finality
}

#[async_trait::async_trait]
impl<Finality: WriteFinality, BatchStorage: ReadBatch> ProcessL1Event
for L1ExecuteWatcher<Finality, BatchStorage>
{
impl<Finality: WriteFinality> ProcessL1Event for L1ExecuteWatcher<Finality> {
const NAME: &'static str = "block_execution";

type SolEvent = BlockExecution;
type WatchedEvent = BlockExecution;

fn contract_address(&self) -> Address {
self.contract_address
*self.zk_chain.address()
}

async fn process_event(
Expand All @@ -85,12 +83,18 @@ impl<Finality: WriteFinality, BatchStorage: ReadBatch> ProcessL1Event
"skipping already processed executed batch",
);
} else {
let (_, last_executed_block) = self
.batch_storage
.get_batch_range_by_number(batch_number)
.await
.map_err(L1WatcherError::Batch)?
.expect("executed batch is missing");
// todo: This can take a while. For the majority of batches we have already pulled the
// range in commit watcher, we should find a way to reuse it.
let batch = util::find_stored_batch_data_by_batch_number(
&self.zk_chain,
batch_number,
self.max_blocks_to_process,
)
.await
.unwrap()
.context("could not find where batch was committed on L1")
.unwrap();
let last_executed_block = batch.last_block_number;
self.finality.update_finality_status(|finality| {
assert!(
batch_number > finality.last_executed_batch,
Expand Down
Loading
Loading