diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index 8a504058951..0f84723d8c1 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -65,7 +65,7 @@ pub(crate) type ExeFiller = JoinFill< WalletFiller, >; -pub(crate) fn decode_log(log: Log) -> Result { +pub(crate) fn decode_log(log: &Log) -> Result { E::decode_raw_log(log.topics(), &log.data().data, false).map_err(Into::into) } diff --git a/ethexe/ethereum/src/mirror/events.rs b/ethexe/ethereum/src/mirror/events.rs index d9dd2d63f92..0ab535f76c8 100644 --- a/ethexe/ethereum/src/mirror/events.rs +++ b/ethexe/ethereum/src/mirror/events.rs @@ -50,7 +50,7 @@ pub mod signatures { ]; } -pub fn try_extract_event(log: Log) -> Result> { +pub fn try_extract_event(log: &Log) -> Result> { use crate::decode_log; use signatures::*; diff --git a/ethexe/ethereum/src/mirror/mod.rs b/ethexe/ethereum/src/mirror/mod.rs index e1152674f63..dc9b5e60c42 100644 --- a/ethexe/ethereum/src/mirror/mod.rs +++ b/ethexe/ethereum/src/mirror/mod.rs @@ -70,7 +70,7 @@ impl Mirror { if log.topic0().map(|v| v.0) == Some(signatures::MESSAGE_QUEUEING_REQUESTED.to_fixed_bytes()) { - let event = crate::decode_log::(log.clone())?; + let event = crate::decode_log::(log)?; message_id = Some((*event.id).into()); diff --git a/ethexe/ethereum/src/router/events.rs b/ethexe/ethereum/src/router/events.rs index 9d37f019a5a..441b7d4c08b 100644 --- a/ethexe/ethereum/src/router/events.rs +++ b/ethexe/ethereum/src/router/events.rs @@ -48,7 +48,7 @@ pub mod signatures { ]; } -pub fn try_extract_event(log: Log) -> Result> { +pub fn try_extract_event(log: &Log) -> Result> { use crate::decode_log; use signatures::*; diff --git a/ethexe/ethereum/src/router/mod.rs b/ethexe/ethereum/src/router/mod.rs index 4503c96ca98..c6941db6112 100644 --- a/ethexe/ethereum/src/router/mod.rs +++ b/ethexe/ethereum/src/router/mod.rs @@ -135,7 +135,7 @@ impl Router { while let Some(log) = router_events.next().await { match log.topic0().map(|v| H256(v.0)) { Some(b) if b == signatures::CODE_GOT_VALIDATED => { - let event = crate::decode_log::(log)?; + let event = crate::decode_log::(&log)?; if event.id == code_id { return Ok(event.valid); @@ -170,7 +170,7 @@ impl Router { for log in receipt.inner.logs() { if log.topic0().map(|v| v.0) == Some(signatures::PROGRAM_CREATED.to_fixed_bytes()) { - let event = crate::decode_log::(log.clone())?; + let event = crate::decode_log::(log)?; actor_id = Some((*event.actorId.into_word()).into()); diff --git a/ethexe/ethereum/src/wvara/events.rs b/ethexe/ethereum/src/wvara/events.rs index 0e72a470c89..6d3f3cf941e 100644 --- a/ethexe/ethereum/src/wvara/events.rs +++ b/ethexe/ethereum/src/wvara/events.rs @@ -33,7 +33,7 @@ pub mod signatures { pub const ALL: [H256; 2] = [TRANSFER, APPROVAL]; } -pub fn try_extract_event(log: Log) -> Result> { +pub fn try_extract_event(log: &Log) -> Result> { use crate::decode_log; use signatures::*; diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index dd24a2b7cef..e0ac96532e0 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -1,6 +1,6 @@ use crate::{BlobReader, BlockData, Event}; use alloy::{ - primitives::{Address as AlloyAddress, B256}, + primitives::Address as AlloyAddress, providers::{Provider, ProviderBuilder, RootProvider}, rpc::types::eth::{Filter, Topic}, transports::BoxTransport, @@ -20,7 +20,7 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::watch; /// Max number of blocks to query in alloy. -pub(crate) const MAX_QUERY_BLOCK_RANGE: u32 = 100_000; +pub(crate) const MAX_QUERY_BLOCK_RANGE: u64 = 100_000; pub(crate) type ObserverProvider = RootProvider; @@ -188,76 +188,13 @@ pub(crate) async fn read_block_events( let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); let wvara_address = router_query.wvara_address().await?; - let router_events_filter = Filter::new() - .at_block_hash(block_hash.0) - .address(router_address) - .event_signature(Topic::from_iter( - router::events::signatures::ALL - .iter() - .map(|hash| B256::new(hash.to_fixed_bytes())), - )); - - let router_logs_fut = provider.get_logs(&router_events_filter); - - let wvara_events_filter = Filter::new() - .at_block_hash(block_hash.0) - .address(wvara_address) - .event_signature(Topic::from_iter( - wvara::events::signatures::ALL - .iter() - .map(|hash| B256::new(hash.to_fixed_bytes())), - )); - - let wvara_logs_fut = provider.get_logs(&wvara_events_filter); - - let mirrors_events_filter = - Filter::new() - .at_block_hash(block_hash.0) - .event_signature(Topic::from_iter( - mirror::events::signatures::ALL - .iter() - .map(|hash| B256::new(hash.to_fixed_bytes())), - )); - - let mirrors_logs_fut = provider.get_logs(&mirrors_events_filter); - - let (router_logs, wvara_logs, mirrors_logs) = - future::join3(router_logs_fut, wvara_logs_fut, mirrors_logs_fut).await; - let (router_logs, wvara_logs, mirrors_logs) = (router_logs?, wvara_logs?, mirrors_logs?); - - let mut block_events = - Vec::with_capacity(router_logs.len() + wvara_logs.len() + mirrors_logs.len()); - - for router_log in router_logs { - let Some(router_event) = router::events::try_extract_event(router_log)? else { - continue; - }; - - block_events.push(router_event.into()) - } - - for wvara_log in wvara_logs { - let Some(wvara_log) = wvara::events::try_extract_event(wvara_log)? else { - continue; - }; - - block_events.push(wvara_log.into()) - } - - for mirror_log in mirrors_logs { - let address = (*mirror_log.address().into_word()).into(); - - let Some(mirror_event) = mirror::events::try_extract_event(mirror_log)? else { - continue; - }; - - block_events.push(BlockEvent::mirror(address, mirror_event)); - } + let filter = Filter::new().at_block_hash(block_hash.to_fixed_bytes()); - Ok(block_events) + read_events_impl(router_address, wvara_address, provider, filter) + .await + .map(|v| v.into_values().next().unwrap_or_default()) } -// TODO (breathx): simplify code between two query funcs. pub(crate) async fn read_block_events_batch( from_block: u32, to_block: u32, @@ -267,107 +204,91 @@ pub(crate) async fn read_block_events_batch( let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); let wvara_address = router_query.wvara_address().await?; - let mut events_map: HashMap<_, Vec<_>> = HashMap::new(); + let mut res = HashMap::new(); + + let mut start_block = from_block as u64; + let to_block = to_block as u64; - let mut start_block = from_block; while start_block <= to_block { - let end_block = std::cmp::min(start_block + MAX_QUERY_BLOCK_RANGE - 1, to_block); - let router_events_filter = Filter::new() - .from_block(start_block as u64) - .to_block(end_block as u64) - .address(router_address) - .event_signature(Topic::from_iter( - router::events::signatures::ALL - .iter() - .map(|hash| B256::new(hash.to_fixed_bytes())), - )); - - let router_logs_fut = provider.get_logs(&router_events_filter); - - let wvara_events_filter = Filter::new() - .from_block(start_block as u64) - .to_block(end_block as u64) - .address(wvara_address) - .event_signature(Topic::from_iter( - wvara::events::signatures::ALL - .iter() - .map(|hash| B256::new(hash.to_fixed_bytes())), - )); - - let wvara_logs_fut = provider.get_logs(&wvara_events_filter); - - let mirrors_events_filter = Filter::new() - .from_block(start_block as u64) - .to_block(end_block as u64) - .event_signature(Topic::from_iter( - mirror::events::signatures::ALL - .iter() - .map(|hash| B256::new(hash.to_fixed_bytes())), - )); - - let mirrors_logs_fut = provider.get_logs(&mirrors_events_filter); - - let (router_logs, wvara_logs, mirrors_logs) = - future::join3(router_logs_fut, wvara_logs_fut, mirrors_logs_fut).await; - let (router_logs, wvara_logs, mirrors_logs) = (router_logs?, wvara_logs?, mirrors_logs?); - - for router_log in router_logs { - let block_hash = router_log - .block_hash - .ok_or(anyhow!("Block hash is missing"))? - .0 - .into(); - - let Some(router_event) = router::events::try_extract_event(router_log)? else { - continue; - }; - - events_map - .entry(block_hash) - .or_default() - .push(router_event.into()); - } + let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1); - for wvara_log in wvara_logs { - let block_hash = wvara_log - .block_hash - .ok_or(anyhow!("Block hash is missing"))? - .0 - .into(); + let filter = Filter::new().from_block(start_block).to_block(end_block); - let Some(wvara_event) = wvara::events::try_extract_event(wvara_log)? else { - continue; - }; + let iter_res = read_events_impl(router_address, wvara_address, provider, filter).await?; - events_map - .entry(block_hash) - .or_default() - .push(wvara_event.into()); + res.extend(iter_res.into_iter()); + + start_block = end_block + 1; + } + + Ok(res) +} + +async fn read_events_impl( + router_address: AlloyAddress, + wvara_address: AlloyAddress, + provider: &ObserverProvider, + filter: Filter, +) -> Result>> { + let router_and_wvara_topic = Topic::from_iter( + router::events::signatures::ALL + .into_iter() + .chain(wvara::events::signatures::ALL) + .map(|v| v.to_fixed_bytes().into()), + ); + + let router_and_wvara_filter = filter + .clone() + .address(vec![router_address, wvara_address]) + .event_signature(router_and_wvara_topic); + + let mirror_filter = filter.event_signature(Topic::from_iter( + mirror::events::signatures::ALL.map(|v| v.to_fixed_bytes().into()), + )); + + let (router_and_wvara_logs, mirrors_logs) = future::try_join( + provider.get_logs(&router_and_wvara_filter), + provider.get_logs(&mirror_filter), + ) + .await?; + + let block_hash_of = |log: &alloy::rpc::types::Log| -> Result { + log.block_hash + .map(|v| v.0.into()) + .ok_or(anyhow!("Block hash is missing")) + }; + + let mut res: HashMap<_, Vec<_>> = HashMap::new(); + + for router_or_wvara_log in router_and_wvara_logs { + let block_hash = block_hash_of(&router_or_wvara_log)?; + + let maybe_block_event = if router_or_wvara_log.address() == router_address { + router::events::try_extract_event(&router_or_wvara_log)?.map(Into::into) + } else { + wvara::events::try_extract_event(&router_or_wvara_log)?.map(Into::into) + }; + + if let Some(block_event) = maybe_block_event { + res.entry(block_hash).or_default().push(block_event); } + } - for mirror_log in mirrors_logs { - let block_hash = mirror_log - .block_hash - .ok_or(anyhow!("Block hash is missing"))? - .0 - .into(); + for mirror_log in mirrors_logs { + let block_hash = block_hash_of(&mirror_log)?; - let address = (*mirror_log.address().into_word()).into(); + let address = (*mirror_log.address().into_word()).into(); - let Some(mirror_event) = mirror::events::try_extract_event(mirror_log)? else { - continue; - }; + // TODO (breathx): if address is unknown, then continue. - events_map - .entry(block_hash) + if let Some(event) = mirror::events::try_extract_event(&mirror_log)? { + res.entry(block_hash) .or_default() - .push(BlockEvent::mirror(address, mirror_event)); + .push(BlockEvent::mirror(address, event)); } - - start_block = end_block + 1; } - Ok(events_map) + Ok(res) } #[cfg(test)]