Skip to content

Commit

Permalink
refactor(ethexe-observer): simplify events querying (#4185)
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx authored Aug 23, 2024
1 parent 05cc997 commit e0a7e97
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 163 deletions.
2 changes: 1 addition & 1 deletion ethexe/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub(crate) type ExeFiller = JoinFill<
WalletFiller<EthereumWallet>,
>;

pub(crate) fn decode_log<E: SolEvent>(log: Log) -> Result<E> {
pub(crate) fn decode_log<E: SolEvent>(log: &Log) -> Result<E> {
E::decode_raw_log(log.topics(), &log.data().data, false).map_err(Into::into)
}

Expand Down
2 changes: 1 addition & 1 deletion ethexe/ethereum/src/mirror/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub mod signatures {
];
}

pub fn try_extract_event(log: Log) -> Result<Option<mirror::Event>> {
pub fn try_extract_event(log: &Log) -> Result<Option<mirror::Event>> {
use crate::decode_log;
use signatures::*;

Expand Down
2 changes: 1 addition & 1 deletion ethexe/ethereum/src/mirror/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Mirror {
if log.topic0().map(|v| v.0)
== Some(signatures::MESSAGE_QUEUEING_REQUESTED.to_fixed_bytes())
{
let event = crate::decode_log::<IMirror::MessageQueueingRequested>(log.clone())?;
let event = crate::decode_log::<IMirror::MessageQueueingRequested>(log)?;

message_id = Some((*event.id).into());

Expand Down
2 changes: 1 addition & 1 deletion ethexe/ethereum/src/router/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub mod signatures {
];
}

pub fn try_extract_event(log: Log) -> Result<Option<router::Event>> {
pub fn try_extract_event(log: &Log) -> Result<Option<router::Event>> {
use crate::decode_log;
use signatures::*;

Expand Down
4 changes: 2 additions & 2 deletions ethexe/ethereum/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl Router {
while let Some(log) = router_events.next().await {
match log.topic0().map(|v| H256(v.0)) {
Some(b) if b == signatures::CODE_GOT_VALIDATED => {
let event = crate::decode_log::<IRouter::CodeGotValidated>(log)?;
let event = crate::decode_log::<IRouter::CodeGotValidated>(&log)?;

if event.id == code_id {
return Ok(event.valid);
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Router {

for log in receipt.inner.logs() {
if log.topic0().map(|v| v.0) == Some(signatures::PROGRAM_CREATED.to_fixed_bytes()) {
let event = crate::decode_log::<IRouter::ProgramCreated>(log.clone())?;
let event = crate::decode_log::<IRouter::ProgramCreated>(log)?;

actor_id = Some((*event.actorId.into_word()).into());

Expand Down
2 changes: 1 addition & 1 deletion ethexe/ethereum/src/wvara/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub mod signatures {
pub const ALL: [H256; 2] = [TRANSFER, APPROVAL];
}

pub fn try_extract_event(log: Log) -> Result<Option<wvara::Event>> {
pub fn try_extract_event(log: &Log) -> Result<Option<wvara::Event>> {
use crate::decode_log;
use signatures::*;

Expand Down
233 changes: 77 additions & 156 deletions ethexe/observer/src/observer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{BlobReader, BlockData, Event};
use alloy::{
primitives::{Address as AlloyAddress, B256},
primitives::Address as AlloyAddress,
providers::{Provider, ProviderBuilder, RootProvider},
rpc::types::eth::{Filter, Topic},
transports::BoxTransport,
Expand All @@ -20,7 +20,7 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::watch;

/// Max number of blocks to query in alloy.
pub(crate) const MAX_QUERY_BLOCK_RANGE: u32 = 100_000;
pub(crate) const MAX_QUERY_BLOCK_RANGE: u64 = 100_000;

pub(crate) type ObserverProvider = RootProvider<BoxTransport>;

Expand Down Expand Up @@ -188,76 +188,13 @@ pub(crate) async fn read_block_events(
let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone()));
let wvara_address = router_query.wvara_address().await?;

let router_events_filter = Filter::new()
.at_block_hash(block_hash.0)
.address(router_address)
.event_signature(Topic::from_iter(
router::events::signatures::ALL
.iter()
.map(|hash| B256::new(hash.to_fixed_bytes())),
));

let router_logs_fut = provider.get_logs(&router_events_filter);

let wvara_events_filter = Filter::new()
.at_block_hash(block_hash.0)
.address(wvara_address)
.event_signature(Topic::from_iter(
wvara::events::signatures::ALL
.iter()
.map(|hash| B256::new(hash.to_fixed_bytes())),
));

let wvara_logs_fut = provider.get_logs(&wvara_events_filter);

let mirrors_events_filter =
Filter::new()
.at_block_hash(block_hash.0)
.event_signature(Topic::from_iter(
mirror::events::signatures::ALL
.iter()
.map(|hash| B256::new(hash.to_fixed_bytes())),
));

let mirrors_logs_fut = provider.get_logs(&mirrors_events_filter);

let (router_logs, wvara_logs, mirrors_logs) =
future::join3(router_logs_fut, wvara_logs_fut, mirrors_logs_fut).await;
let (router_logs, wvara_logs, mirrors_logs) = (router_logs?, wvara_logs?, mirrors_logs?);

let mut block_events =
Vec::with_capacity(router_logs.len() + wvara_logs.len() + mirrors_logs.len());

for router_log in router_logs {
let Some(router_event) = router::events::try_extract_event(router_log)? else {
continue;
};

block_events.push(router_event.into())
}

for wvara_log in wvara_logs {
let Some(wvara_log) = wvara::events::try_extract_event(wvara_log)? else {
continue;
};

block_events.push(wvara_log.into())
}

for mirror_log in mirrors_logs {
let address = (*mirror_log.address().into_word()).into();

let Some(mirror_event) = mirror::events::try_extract_event(mirror_log)? else {
continue;
};

block_events.push(BlockEvent::mirror(address, mirror_event));
}
let filter = Filter::new().at_block_hash(block_hash.to_fixed_bytes());

Ok(block_events)
read_events_impl(router_address, wvara_address, provider, filter)
.await
.map(|v| v.into_values().next().unwrap_or_default())
}

// TODO (breathx): simplify code between two query funcs.
pub(crate) async fn read_block_events_batch(
from_block: u32,
to_block: u32,
Expand All @@ -267,107 +204,91 @@ pub(crate) async fn read_block_events_batch(
let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone()));
let wvara_address = router_query.wvara_address().await?;

let mut events_map: HashMap<_, Vec<_>> = HashMap::new();
let mut res = HashMap::new();

let mut start_block = from_block as u64;
let to_block = to_block as u64;

let mut start_block = from_block;
while start_block <= to_block {
let end_block = std::cmp::min(start_block + MAX_QUERY_BLOCK_RANGE - 1, to_block);
let router_events_filter = Filter::new()
.from_block(start_block as u64)
.to_block(end_block as u64)
.address(router_address)
.event_signature(Topic::from_iter(
router::events::signatures::ALL
.iter()
.map(|hash| B256::new(hash.to_fixed_bytes())),
));

let router_logs_fut = provider.get_logs(&router_events_filter);

let wvara_events_filter = Filter::new()
.from_block(start_block as u64)
.to_block(end_block as u64)
.address(wvara_address)
.event_signature(Topic::from_iter(
wvara::events::signatures::ALL
.iter()
.map(|hash| B256::new(hash.to_fixed_bytes())),
));

let wvara_logs_fut = provider.get_logs(&wvara_events_filter);

let mirrors_events_filter = Filter::new()
.from_block(start_block as u64)
.to_block(end_block as u64)
.event_signature(Topic::from_iter(
mirror::events::signatures::ALL
.iter()
.map(|hash| B256::new(hash.to_fixed_bytes())),
));

let mirrors_logs_fut = provider.get_logs(&mirrors_events_filter);

let (router_logs, wvara_logs, mirrors_logs) =
future::join3(router_logs_fut, wvara_logs_fut, mirrors_logs_fut).await;
let (router_logs, wvara_logs, mirrors_logs) = (router_logs?, wvara_logs?, mirrors_logs?);

for router_log in router_logs {
let block_hash = router_log
.block_hash
.ok_or(anyhow!("Block hash is missing"))?
.0
.into();

let Some(router_event) = router::events::try_extract_event(router_log)? else {
continue;
};

events_map
.entry(block_hash)
.or_default()
.push(router_event.into());
}
let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1);

for wvara_log in wvara_logs {
let block_hash = wvara_log
.block_hash
.ok_or(anyhow!("Block hash is missing"))?
.0
.into();
let filter = Filter::new().from_block(start_block).to_block(end_block);

let Some(wvara_event) = wvara::events::try_extract_event(wvara_log)? else {
continue;
};
let iter_res = read_events_impl(router_address, wvara_address, provider, filter).await?;

events_map
.entry(block_hash)
.or_default()
.push(wvara_event.into());
res.extend(iter_res.into_iter());

start_block = end_block + 1;
}

Ok(res)
}

async fn read_events_impl(
router_address: AlloyAddress,
wvara_address: AlloyAddress,
provider: &ObserverProvider,
filter: Filter,
) -> Result<HashMap<H256, Vec<BlockEvent>>> {
let router_and_wvara_topic = Topic::from_iter(
router::events::signatures::ALL
.into_iter()
.chain(wvara::events::signatures::ALL)
.map(|v| v.to_fixed_bytes().into()),
);

let router_and_wvara_filter = filter
.clone()
.address(vec![router_address, wvara_address])
.event_signature(router_and_wvara_topic);

let mirror_filter = filter.event_signature(Topic::from_iter(
mirror::events::signatures::ALL.map(|v| v.to_fixed_bytes().into()),
));

let (router_and_wvara_logs, mirrors_logs) = future::try_join(
provider.get_logs(&router_and_wvara_filter),
provider.get_logs(&mirror_filter),
)
.await?;

let block_hash_of = |log: &alloy::rpc::types::Log| -> Result<H256> {
log.block_hash
.map(|v| v.0.into())
.ok_or(anyhow!("Block hash is missing"))
};

let mut res: HashMap<_, Vec<_>> = HashMap::new();

for router_or_wvara_log in router_and_wvara_logs {
let block_hash = block_hash_of(&router_or_wvara_log)?;

let maybe_block_event = if router_or_wvara_log.address() == router_address {
router::events::try_extract_event(&router_or_wvara_log)?.map(Into::into)
} else {
wvara::events::try_extract_event(&router_or_wvara_log)?.map(Into::into)
};

if let Some(block_event) = maybe_block_event {
res.entry(block_hash).or_default().push(block_event);
}
}

for mirror_log in mirrors_logs {
let block_hash = mirror_log
.block_hash
.ok_or(anyhow!("Block hash is missing"))?
.0
.into();
for mirror_log in mirrors_logs {
let block_hash = block_hash_of(&mirror_log)?;

let address = (*mirror_log.address().into_word()).into();
let address = (*mirror_log.address().into_word()).into();

let Some(mirror_event) = mirror::events::try_extract_event(mirror_log)? else {
continue;
};
// TODO (breathx): if address is unknown, then continue.

events_map
.entry(block_hash)
if let Some(event) = mirror::events::try_extract_event(&mirror_log)? {
res.entry(block_hash)
.or_default()
.push(BlockEvent::mirror(address, mirror_event));
.push(BlockEvent::mirror(address, event));
}

start_block = end_block + 1;
}

Ok(events_map)
Ok(res)
}

#[cfg(test)]
Expand Down

0 comments on commit e0a7e97

Please sign in to comment.