diff --git a/core/src/message/context.rs b/core/src/message/context.rs index 27359b2411a..3734a26adc6 100644 --- a/core/src/message/context.rs +++ b/core/src/message/context.rs @@ -438,7 +438,7 @@ impl MessageContext { return Err(Error::OutOfBoundsInputSliceOffset); } - // Check `len` for the current `offset` doesn't refer to the slice out of intput bounds. + // Check `len` for the current `offset` doesn't refer to the slice out of input bounds. let available_len = input.len() - offset; if len > available_len { return Err(Error::OutOfBoundsInputSliceLength); diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 8bd5d63cbd0..f4e3fb6ccd7 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -24,13 +24,15 @@ use crate::{ }; use anyhow::{anyhow, Ok, Result}; use ethexe_common::{ - router::{BlockCommitment, CodeCommitment, Event as RouterEvent, StateTransition}, - BlockEvent, + router::{ + BlockCommitment, CodeCommitment, RequestEvent as RouterRequestEvent, StateTransition, + }, + BlockRequestEvent, }; use ethexe_db::{BlockHeader, BlockMetaStorage, CodesStorage, Database}; use ethexe_ethereum::router::RouterQuery; use ethexe_network::NetworkReceiverEvent; -use ethexe_observer::{BlockData, Event as ObserverEvent}; +use ethexe_observer::{RequestBlockData, RequestEvent}; use ethexe_processor::LocalOutcome; use ethexe_sequencer::agro::AggregatedCommitments; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; @@ -240,17 +242,19 @@ impl Service { processor: &mut ethexe_processor::Processor, block_hash: H256, ) -> Result<()> { - let events = query.get_block_events(block_hash).await?; + let events = query.get_block_request_events(block_hash).await?; for event in events { match event { - BlockEvent::Router(RouterEvent::CodeValidationRequested { + BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested { code_id, blob_tx_hash, }) => { db.set_code_blob_tx(code_id, blob_tx_hash); } - BlockEvent::Router(RouterEvent::ProgramCreated { code_id, .. }) => { + BlockRequestEvent::Router(RouterRequestEvent::ProgramCreated { + code_id, .. + }) => { if db.original_code(code_id).is_some() { continue; } @@ -286,9 +290,9 @@ impl Service { Self::process_upload_codes(db, query, processor, block_hash).await?; - let block_events = query.get_block_events(block_hash).await?; + let block_request_events = query.get_block_request_events(block_hash).await?; - let block_outcomes = processor.process_block_events(block_hash, &block_events)?; + let block_outcomes = processor.process_block_events(block_hash, block_request_events)?; let transition_outcomes: Vec<_> = block_outcomes .into_iter() @@ -327,9 +331,9 @@ impl Service { db: &Database, query: &mut ethexe_observer::Query, processor: &mut ethexe_processor::Processor, - block_data: BlockData, + block_data: RequestBlockData, ) -> Result> { - db.set_block_events(block_data.block_hash, block_data.events.clone()); + db.set_block_events(block_data.block_hash, block_data.events); db.set_block_header( block_data.block_hash, BlockHeader { @@ -369,14 +373,14 @@ impl Service { query: &mut ethexe_observer::Query, processor: &mut ethexe_processor::Processor, maybe_sequencer: &mut Option, - observer_event: ethexe_observer::Event, + observer_event: RequestEvent, ) -> Result<(Vec, Vec)> { if let Some(sequencer) = maybe_sequencer { sequencer.process_observer_event(&observer_event)?; } match observer_event { - ObserverEvent::Block(block_data) => { + RequestEvent::Block(block_data) => { log::info!( "📦 receive a new block {}, hash {}, parent hash {}", block_data.block_number, @@ -389,7 +393,7 @@ impl Service { Ok((Vec::new(), commitments)) } - ethexe_observer::Event::CodeLoaded { code_id, code } => { + RequestEvent::CodeLoaded { code_id, code } => { let outcomes = processor.process_upload_code(code_id, code.as_slice())?; let commitments: Vec<_> = outcomes .into_iter() @@ -425,7 +429,7 @@ impl Service { )); } - let observer_events = observer.events(); + let observer_events = observer.request_events(); futures::pin_mut!(observer_events); let (mut network_sender, mut network_receiver, mut network_handle) = @@ -467,7 +471,7 @@ impl Service { break; }; - let is_block_event = matches!(observer_event, ObserverEvent::Block(_)); + let is_block_event = matches!(observer_event, RequestEvent::Block(_)); let (code_commitments, block_commitments) = Self::process_observer_event( &db, diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 403e2942afd..06b792a8b40 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -58,7 +58,7 @@ impl Listener { let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>(); let _handle = task::spawn(async move { - let observer_events = observer.events(); + let observer_events = observer.events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index c0b48b95e40..aad3baa0803 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -18,7 +18,7 @@ //! ethexe common db types and traits. -use crate::{router::StateTransition, BlockEvent}; +use crate::{router::StateTransition, BlockRequestEvent}; use alloc::{ collections::{BTreeMap, BTreeSet, VecDeque}, vec::Vec, @@ -65,8 +65,8 @@ pub trait BlockMetaStorage: Send + Sync { fn block_end_program_states(&self, block_hash: H256) -> Option>; fn set_block_end_program_states(&self, block_hash: H256, map: BTreeMap); - fn block_events(&self, block_hash: H256) -> Option>; - fn set_block_events(&self, block_hash: H256, events: Vec); + fn block_events(&self, block_hash: H256) -> Option>; + fn set_block_events(&self, block_hash: H256, events: Vec); fn block_outcome(&self, block_hash: H256) -> Option>; fn set_block_outcome(&self, block_hash: H256, outcome: Vec); diff --git a/ethexe/common/src/lib.rs b/ethexe/common/src/lib.rs index d0ed6d1f2b9..94e187d023c 100644 --- a/ethexe/common/src/lib.rs +++ b/ethexe/common/src/lib.rs @@ -60,3 +60,31 @@ impl From for BlockEvent { Self::WVara(value) } } + +#[derive(Clone, Debug, Encode, Decode)] +pub enum BlockRequestEvent { + Router(router::RequestEvent), + Mirror { + address: ActorId, + event: mirror::RequestEvent, + }, + WVara(wvara::RequestEvent), +} + +impl BlockRequestEvent { + pub fn mirror(address: ActorId, event: mirror::RequestEvent) -> Self { + Self::Mirror { address, event } + } +} + +impl From for BlockRequestEvent { + fn from(value: router::RequestEvent) -> Self { + Self::Router(value) + } +} + +impl From for BlockRequestEvent { + fn from(value: wvara::RequestEvent) -> Self { + Self::WVara(value) + } +} diff --git a/ethexe/common/src/mirror.rs b/ethexe/common/src/mirror.rs index 2e9954b9b95..83ab29e55c0 100644 --- a/ethexe/common/src/mirror.rs +++ b/ethexe/common/src/mirror.rs @@ -64,3 +64,65 @@ pub enum Event { source: ActorId, }, } + +impl Event { + pub fn as_request(self) -> Option { + Some(match self { + Self::ExecutableBalanceTopUpRequested { value } => { + RequestEvent::ExecutableBalanceTopUpRequested { value } + } + Self::MessageQueueingRequested { + id, + source, + payload, + value, + } => RequestEvent::MessageQueueingRequested { + id, + source, + payload, + value, + }, + Self::ReplyQueueingRequested { + replied_to, + source, + payload, + value, + } => RequestEvent::ReplyQueueingRequested { + replied_to, + source, + payload, + value, + }, + Self::ValueClaimingRequested { claimed_id, source } => { + RequestEvent::ValueClaimingRequested { claimed_id, source } + } + Self::StateChanged { .. } + | Self::ValueClaimed { .. } + | Self::Message { .. } + | Self::Reply { .. } => return None, + }) + } +} + +#[derive(Clone, Debug, Encode, Decode)] +pub enum RequestEvent { + ExecutableBalanceTopUpRequested { + value: u128, + }, + MessageQueueingRequested { + id: MessageId, + source: ActorId, + payload: Vec, + value: u128, + }, + ReplyQueueingRequested { + replied_to: MessageId, + source: ActorId, + payload: Vec, + value: u128, + }, + ValueClaimingRequested { + claimed_id: MessageId, + source: ActorId, + }, +} diff --git a/ethexe/common/src/router.rs b/ethexe/common/src/router.rs index 69a380a021f..640efaa1d74 100644 --- a/ethexe/common/src/router.rs +++ b/ethexe/common/src/router.rs @@ -101,3 +101,51 @@ pub enum Event { value_per_weight: u128, }, } + +impl Event { + pub fn as_request(self) -> Option { + Some(match self { + Self::BaseWeightChanged { base_weight } => { + RequestEvent::BaseWeightChanged { base_weight } + } + Self::CodeValidationRequested { + code_id, + blob_tx_hash, + } => RequestEvent::CodeValidationRequested { + code_id, + blob_tx_hash, + }, + Self::ProgramCreated { actor_id, code_id } => { + RequestEvent::ProgramCreated { actor_id, code_id } + } + Self::StorageSlotChanged => RequestEvent::StorageSlotChanged, + Self::ValidatorsSetChanged => RequestEvent::ValidatorsSetChanged, + Self::ValuePerWeightChanged { value_per_weight } => { + RequestEvent::ValuePerWeightChanged { value_per_weight } + } + Self::BlockCommitted { .. } | Self::CodeGotValidated { .. } => return None, + }) + } +} + +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] +pub enum RequestEvent { + BaseWeightChanged { + base_weight: u64, + }, + CodeValidationRequested { + code_id: CodeId, + // TODO (breathx): replace with `code: Vec` + /// This field is replaced with tx hash in case of zero. + blob_tx_hash: H256, + }, + ProgramCreated { + actor_id: ActorId, + code_id: CodeId, + }, + StorageSlotChanged, + ValidatorsSetChanged, + ValuePerWeightChanged { + value_per_weight: u128, + }, +} diff --git a/ethexe/common/src/wvara.rs b/ethexe/common/src/wvara.rs index 6e61b1ea3d5..a529e86cb5a 100644 --- a/ethexe/common/src/wvara.rs +++ b/ethexe/common/src/wvara.rs @@ -34,3 +34,37 @@ pub enum Event { value: U256, }, } + +impl Event { + pub fn as_request(self) -> Option { + Some(match self { + Self::Transfer { from, to, value } => RequestEvent::Transfer { from, to, value }, + Self::Approval { .. } => return None, + }) + } +} + +#[derive(Clone, Debug, Encode, Decode)] +pub enum RequestEvent { + Transfer { + /// Never router, wvara or zero address. + from: ActorId, + /// Never router, wvara or zero address. + to: ActorId, + value: u128, + }, +} + +impl RequestEvent { + pub fn involves_address(&self, address: &ActorId) -> bool { + match self { + Self::Transfer { from, to, .. } => from == address || to == address, + } + } + + pub fn involves_addresses(&self, addresses: &[ActorId]) -> bool { + match self { + Self::Transfer { from, to, .. } => addresses.contains(from) || addresses.contains(to), + } + } +} diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index ea53ccf06e9..ff0688f7032 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -24,7 +24,7 @@ use crate::{CASDatabase, KVDatabase}; use ethexe_common::{ db::{BlockHeader, BlockMetaStorage, CodesStorage}, router::StateTransition, - BlockEvent, + BlockRequestEvent, }; use ethexe_runtime_common::state::{ Allocations, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist, @@ -221,16 +221,16 @@ impl BlockMetaStorage for Database { ); } - fn block_events(&self, block_hash: H256) -> Option> { + fn block_events(&self, block_hash: H256) -> Option> { self.kv .get(&KeyPrefix::BlockEvents.two(self.router_address, block_hash)) .map(|data| { - Vec::::decode(&mut data.as_slice()) + Vec::::decode(&mut data.as_slice()) .expect("Failed to decode data into `Vec`") }) } - fn set_block_events(&self, block_hash: H256, events: Vec) { + fn set_block_events(&self, block_hash: H256, events: Vec) { self.kv.put( &KeyPrefix::BlockEvents.two(self.router_address, block_hash), events.encode(), @@ -300,8 +300,8 @@ impl CodesStorage for Database { self.kv .iter_prefix(&key_prefix) .map(|#[allow(unused_variables)] (key, code_id)| { - let (splitted_key_prefix, program_id) = key.split_at(key_prefix.len()); - debug_assert_eq!(splitted_key_prefix, key_prefix); + let (split_key_prefix, program_id) = key.split_at(key_prefix.len()); + debug_assert_eq!(split_key_prefix, key_prefix); let program_id = ProgramId::try_from(program_id).expect("Failed to decode key into `ProgramId`"); diff --git a/ethexe/ethereum/src/lib.rs b/ethexe/ethereum/src/lib.rs index f87b6824128..15bedc1656c 100644 --- a/ethexe/ethereum/src/lib.rs +++ b/ethexe/ethereum/src/lib.rs @@ -60,10 +60,6 @@ type AlloyProvider = pub(crate) type ExeFiller = JoinFill>; -pub(crate) fn decode_log(log: &Log) -> Result { - E::decode_raw_log(log.topics(), &log.data().data, false).map_err(Into::into) -} - pub struct Ethereum { router_address: Address, wvara_address: Address, @@ -308,3 +304,22 @@ impl TryGetReceipt for PendingTransactio )) } } + +pub(crate) fn decode_log(log: &Log) -> Result { + E::decode_raw_log(log.topics(), &log.data().data, false).map_err(Into::into) +} + +macro_rules! signatures_consts { + ( + $type_name:ident; + $( $const_name:ident: $name:ident, )* + ) => { + $( + pub const $const_name: alloy::primitives::B256 = $type_name::$name::SIGNATURE_HASH; + )* + + pub const ALL: &[alloy::primitives::B256] = &[$($const_name,)*]; + }; +} + +pub(crate) use signatures_consts; diff --git a/ethexe/ethereum/src/mirror/events.rs b/ethexe/ethereum/src/mirror/events.rs index 0ab535f76c8..2a9c6dcc0fb 100644 --- a/ethexe/ethereum/src/mirror/events.rs +++ b/ethexe/ethereum/src/mirror/events.rs @@ -16,68 +16,65 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::IMirror; -use alloy::{rpc::types::eth::Log, sol_types::SolEvent}; +use crate::{decode_log, IMirror}; +use alloy::{primitives::B256, rpc::types::eth::Log, sol_types::SolEvent}; use anyhow::Result; use ethexe_common::mirror; -use gprimitives::H256; +use signatures::*; pub mod signatures { - use super::{IMirror, SolEvent, H256}; + use super::*; - pub const EXECUTABLE_BALANCE_TOP_UP_REQUESTED: H256 = - H256(IMirror::ExecutableBalanceTopUpRequested::SIGNATURE_HASH.0); - pub const MESSAGE_QUEUEING_REQUESTED: H256 = - H256(IMirror::MessageQueueingRequested::SIGNATURE_HASH.0); - pub const MESSAGE: H256 = H256(IMirror::Message::SIGNATURE_HASH.0); - pub const REPLY_QUEUEING_REQUESTED: H256 = - H256(IMirror::ReplyQueueingRequested::SIGNATURE_HASH.0); - pub const REPLY: H256 = H256(IMirror::Reply::SIGNATURE_HASH.0); - pub const STATE_CHANGED: H256 = H256(IMirror::StateChanged::SIGNATURE_HASH.0); - pub const VALUE_CLAIMED: H256 = H256(IMirror::ValueClaimed::SIGNATURE_HASH.0); - pub const VALUE_CLAIMING_REQUESTED: H256 = - H256(IMirror::ValueClaimingRequested::SIGNATURE_HASH.0); + crate::signatures_consts! { + IMirror; + EXECUTABLE_BALANCE_TOP_UP_REQUESTED: ExecutableBalanceTopUpRequested, + MESSAGE_QUEUEING_REQUESTED: MessageQueueingRequested, + MESSAGE: Message, + REPLY_QUEUEING_REQUESTED: ReplyQueueingRequested, + REPLY: Reply, + STATE_CHANGED: StateChanged, + VALUE_CLAIMED: ValueClaimed, + VALUE_CLAIMING_REQUESTED: ValueClaimingRequested, + } - pub const ALL: [H256; 8] = [ + pub const REQUESTS: &[B256] = &[ EXECUTABLE_BALANCE_TOP_UP_REQUESTED, MESSAGE_QUEUEING_REQUESTED, - MESSAGE, REPLY_QUEUEING_REQUESTED, - REPLY, - STATE_CHANGED, - VALUE_CLAIMED, VALUE_CLAIMING_REQUESTED, ]; } pub fn try_extract_event(log: &Log) -> Result> { - use crate::decode_log; - use signatures::*; - - let Some(topic0) = log.topic0().map(|v| H256(v.0)) else { + let Some(topic0) = log.topic0().filter(|&v| ALL.contains(v)) else { return Ok(None); }; - // TODO (breathx): pattern matching issue for primitive_types::H256... ???? - let event = match topic0 { - b if b == EXECUTABLE_BALANCE_TOP_UP_REQUESTED => { + let event = match *topic0 { + EXECUTABLE_BALANCE_TOP_UP_REQUESTED => { decode_log::(log)?.into() } - b if b == MESSAGE_QUEUEING_REQUESTED => { - decode_log::(log)?.into() - } - b if b == MESSAGE => decode_log::(log)?.into(), - b if b == REPLY_QUEUEING_REQUESTED => { - decode_log::(log)?.into() - } - b if b == REPLY => decode_log::(log)?.into(), - b if b == STATE_CHANGED => decode_log::(log)?.into(), - b if b == VALUE_CLAIMED => decode_log::(log)?.into(), - b if b == VALUE_CLAIMING_REQUESTED => { - decode_log::(log)?.into() - } - _ => return Ok(None), + MESSAGE_QUEUEING_REQUESTED => decode_log::(log)?.into(), + MESSAGE => decode_log::(log)?.into(), + REPLY_QUEUEING_REQUESTED => decode_log::(log)?.into(), + REPLY => decode_log::(log)?.into(), + STATE_CHANGED => decode_log::(log)?.into(), + VALUE_CLAIMED => decode_log::(log)?.into(), + VALUE_CLAIMING_REQUESTED => decode_log::(log)?.into(), + _ => unreachable!("filtered above"), }; Ok(Some(event)) } + +pub fn try_extract_request_event(log: &Log) -> Result> { + if log.topic0().filter(|&v| REQUESTS.contains(v)).is_none() { + return Ok(None); + } + + let request_event = try_extract_event(log)? + .and_then(|v| v.as_request()) + .expect("filtered above"); + + Ok(Some(request_event)) +} diff --git a/ethexe/ethereum/src/mirror/mod.rs b/ethexe/ethereum/src/mirror/mod.rs index 9c8d7dec4d3..366f37dc31f 100644 --- a/ethexe/ethereum/src/mirror/mod.rs +++ b/ethexe/ethereum/src/mirror/mod.rs @@ -65,9 +65,7 @@ impl Mirror { let mut message_id = None; for log in receipt.inner.logs() { - if log.topic0().map(|v| v.0) - == Some(signatures::MESSAGE_QUEUEING_REQUESTED.to_fixed_bytes()) - { + if log.topic0() == Some(&signatures::MESSAGE_QUEUEING_REQUESTED) { let event = crate::decode_log::(log)?; message_id = Some((*event.id).into()); diff --git a/ethexe/ethereum/src/router/events.rs b/ethexe/ethereum/src/router/events.rs index 83b3c50b535..5dea9c8f4e2 100644 --- a/ethexe/ethereum/src/router/events.rs +++ b/ethexe/ethereum/src/router/events.rs @@ -16,30 +16,29 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::IRouter; -use alloy::{rpc::types::eth::Log, sol_types::SolEvent}; +use crate::{decode_log, IRouter}; +use alloy::{primitives::B256, rpc::types::eth::Log, sol_types::SolEvent}; use anyhow::{anyhow, Result}; use ethexe_common::router; -use gprimitives::H256; +use signatures::*; pub mod signatures { - use super::{IRouter, SolEvent, H256}; + use super::*; - pub const BASE_WEIGHT_CHANGED: H256 = H256(IRouter::BaseWeightChanged::SIGNATURE_HASH.0); - pub const BLOCK_COMMITTED: H256 = H256(IRouter::BlockCommitted::SIGNATURE_HASH.0); - pub const CODE_GOT_VALIDATED: H256 = H256(IRouter::CodeGotValidated::SIGNATURE_HASH.0); - pub const CODE_VALIDATION_REQUESTED: H256 = - H256(IRouter::CodeValidationRequested::SIGNATURE_HASH.0); - pub const PROGRAM_CREATED: H256 = H256(IRouter::ProgramCreated::SIGNATURE_HASH.0); - pub const STORAGE_SLOT_CHANGED: H256 = H256(IRouter::StorageSlotChanged::SIGNATURE_HASH.0); - pub const VALIDATORS_SET_CHANGED: H256 = H256(IRouter::ValidatorsSetChanged::SIGNATURE_HASH.0); - pub const VALUE_PER_WEIGHT_CHANGED: H256 = - H256(IRouter::ValuePerWeightChanged::SIGNATURE_HASH.0); + crate::signatures_consts! { + IRouter; + BASE_WEIGHT_CHANGED: BaseWeightChanged, + BLOCK_COMMITTED: BlockCommitted, + CODE_GOT_VALIDATED: CodeGotValidated, + CODE_VALIDATION_REQUESTED: CodeValidationRequested, + PROGRAM_CREATED: ProgramCreated, + STORAGE_SLOT_CHANGED: StorageSlotChanged, + VALIDATORS_SET_CHANGED: ValidatorsSetChanged, + VALUE_PER_WEIGHT_CHANGED: ValuePerWeightChanged, + } - pub const ALL: [H256; 8] = [ + pub const REQUESTS: &[B256] = &[ BASE_WEIGHT_CHANGED, - BLOCK_COMMITTED, - CODE_GOT_VALIDATED, CODE_VALIDATION_REQUESTED, PROGRAM_CREATED, STORAGE_SLOT_CHANGED, @@ -49,19 +48,15 @@ pub mod signatures { } pub fn try_extract_event(log: &Log) -> Result> { - use crate::decode_log; - use signatures::*; - - let Some(topic0) = log.topic0().map(|v| H256(v.0)) else { + let Some(topic0) = log.topic0().filter(|&v| ALL.contains(v)) else { return Ok(None); }; - // TODO (breathx): pattern matching issue for primitive_types::H256... ???? - let event = match topic0 { - b if b == BASE_WEIGHT_CHANGED => decode_log::(log)?.into(), - b if b == BLOCK_COMMITTED => decode_log::(log)?.into(), - b if b == CODE_GOT_VALIDATED => decode_log::(log)?.into(), - b if b == CODE_VALIDATION_REQUESTED => { + let event = match *topic0 { + BASE_WEIGHT_CHANGED => decode_log::(log)?.into(), + BLOCK_COMMITTED => decode_log::(log)?.into(), + CODE_GOT_VALIDATED => decode_log::(log)?.into(), + CODE_VALIDATION_REQUESTED => { let tx_hash = log .transaction_hash .ok_or_else(|| anyhow!("Tx hash not found"))?; @@ -74,16 +69,24 @@ pub fn try_extract_event(log: &Log) -> Result> { event.into() } - b if b == PROGRAM_CREATED => decode_log::(log)?.into(), - b if b == STORAGE_SLOT_CHANGED => decode_log::(log)?.into(), - b if b == VALIDATORS_SET_CHANGED => { - decode_log::(log)?.into() - } - b if b == VALUE_PER_WEIGHT_CHANGED => { - decode_log::(log)?.into() - } - _ => return Ok(None), + PROGRAM_CREATED => decode_log::(log)?.into(), + STORAGE_SLOT_CHANGED => decode_log::(log)?.into(), + VALIDATORS_SET_CHANGED => decode_log::(log)?.into(), + VALUE_PER_WEIGHT_CHANGED => decode_log::(log)?.into(), + _ => unreachable!("filtered above"), }; Ok(Some(event)) } + +pub fn try_extract_request_event(log: &Log) -> Result> { + if log.topic0().filter(|&v| REQUESTS.contains(v)).is_none() { + return Ok(None); + } + + let request_event = try_extract_event(log)? + .and_then(|v| v.as_request()) + .expect("filtered above"); + + Ok(Some(request_event)) +} diff --git a/ethexe/ethereum/src/router/mod.rs b/ethexe/ethereum/src/router/mod.rs index 9b603010be5..7cc5339f59d 100644 --- a/ethexe/ethereum/src/router/mod.rs +++ b/ethexe/ethereum/src/router/mod.rs @@ -127,15 +127,12 @@ impl Router { let code_id = code_id.into_bytes(); while let Some(log) = router_events.next().await { - match log.topic0().map(|v| H256(v.0)) { - Some(b) if b == signatures::CODE_GOT_VALIDATED => { - let event = crate::decode_log::(&log)?; + if let Some(signatures::CODE_GOT_VALIDATED) = log.topic0().cloned() { + let event = crate::decode_log::(&log)?; - if event.id == code_id { - return Ok(event.valid); - } + if event.id == code_id { + return Ok(event.valid); } - _ => (), } } @@ -161,7 +158,7 @@ impl Router { let mut actor_id = None; for log in receipt.inner.logs() { - if log.topic0().map(|v| v.0) == Some(signatures::PROGRAM_CREATED.to_fixed_bytes()) { + if log.topic0().cloned() == Some(signatures::PROGRAM_CREATED) { let event = crate::decode_log::(log)?; actor_id = Some((*event.actorId.into_word()).into()); diff --git a/ethexe/ethereum/src/wvara/events.rs b/ethexe/ethereum/src/wvara/events.rs index 6d3f3cf941e..e73db0a2689 100644 --- a/ethexe/ethereum/src/wvara/events.rs +++ b/ethexe/ethereum/src/wvara/events.rs @@ -18,35 +18,46 @@ #![allow(unused)] -use crate::IWrappedVara; -use alloy::{rpc::types::eth::Log, sol_types::SolEvent}; +use crate::{decode_log, IWrappedVara}; +use alloy::{primitives::B256, rpc::types::eth::Log, sol_types::SolEvent}; use anyhow::{anyhow, Result}; use ethexe_common::wvara; -use gprimitives::H256; +use signatures::*; pub mod signatures { - use super::{IWrappedVara, SolEvent, H256}; + use super::*; - pub const TRANSFER: H256 = H256(IWrappedVara::Transfer::SIGNATURE_HASH.0); - pub const APPROVAL: H256 = H256(IWrappedVara::Approval::SIGNATURE_HASH.0); + crate::signatures_consts! { + IWrappedVara; + TRANSFER: Transfer, + APPROVAL: Approval, + } - pub const ALL: [H256; 2] = [TRANSFER, APPROVAL]; + pub const REQUESTS: &[B256] = &[TRANSFER]; } pub fn try_extract_event(log: &Log) -> Result> { - use crate::decode_log; - use signatures::*; - - let Some(topic0) = log.topic0().map(|v| H256(v.0)) else { + let Some(topic0) = log.topic0().filter(|&v| ALL.contains(v)) else { return Ok(None); }; - // TODO (breathx): pattern matching issue for primitive_types::H256... ???? - let event = match topic0 { - b if b == TRANSFER => decode_log::(log)?.into(), - b if b == APPROVAL => decode_log::(log)?.into(), - _ => return Ok(None), + let event = match *topic0 { + TRANSFER => decode_log::(log)?.into(), + APPROVAL => decode_log::(log)?.into(), + _ => unreachable!("filtered above"), }; Ok(Some(event)) } + +pub fn try_extract_request_event(log: &Log) -> Result> { + if log.topic0().filter(|&v| REQUESTS.contains(v)).is_none() { + return Ok(None); + } + + let request_event = try_extract_event(log)? + .and_then(|v| v.as_request()) + .expect("filtered above"); + + Ok(Some(request_event)) +} diff --git a/ethexe/observer/src/event.rs b/ethexe/observer/src/event.rs index 74e1f383d8a..3ee5a855c38 100644 --- a/ethexe/observer/src/event.rs +++ b/ethexe/observer/src/event.rs @@ -1,13 +1,28 @@ -use ethexe_common::BlockEvent; +use ethexe_common::{BlockEvent, BlockRequestEvent}; use gprimitives::{CodeId, H256}; use parity_scale_codec::{Decode, Encode}; +#[derive(Debug, Clone, Encode, Decode)] +pub enum RequestEvent { + Block(RequestBlockData), + CodeLoaded { code_id: CodeId, code: Vec }, +} + #[derive(Debug, Clone, Encode, Decode)] pub enum Event { Block(BlockData), CodeLoaded { code_id: CodeId, code: Vec }, } +#[derive(Debug, Clone, Encode, Decode)] +pub struct RequestBlockData { + pub parent_hash: H256, + pub block_hash: H256, + pub block_number: u64, + pub block_timestamp: u64, + pub events: Vec, +} + #[derive(Debug, Clone, Encode, Decode)] pub struct BlockData { pub parent_hash: H256, diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index a58f541b1f6..65a116e825e 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -24,6 +24,6 @@ mod observer; mod query; pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader}; -pub use event::{BlockData, Event}; +pub use event::{BlockData, Event, RequestBlockData, RequestEvent}; pub use observer::{Observer, ObserverStatus}; pub use query::Query; diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 313aed11725..26318ba15e3 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -1,4 +1,7 @@ -use crate::{BlobReader, BlockData, Event}; +use crate::{ + event::{BlockData, Event, RequestBlockData, RequestEvent}, + BlobReader, +}; use alloy::{ primitives::Address as AlloyAddress, providers::{Provider, ProviderBuilder, RootProvider}, @@ -6,7 +9,10 @@ use alloy::{ transports::BoxTransport, }; use anyhow::{anyhow, Result}; -use ethexe_common::{router::Event as RouterEvent, BlockEvent}; +use ethexe_common::{ + router::{Event as RouterEvent, RequestEvent as RouterRequestEvent}, + BlockEvent, BlockRequestEvent, +}; use ethexe_ethereum::{ mirror, router::{self, RouterQuery}, @@ -15,7 +21,7 @@ use ethexe_ethereum::{ use ethexe_signer::Address; use futures::{future, stream::FuturesUnordered, Stream, StreamExt}; use gear_core::ids::prelude::*; -use gprimitives::{CodeId, H256}; +use gprimitives::{ActorId, CodeId, H256}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::watch; @@ -71,7 +77,7 @@ impl Observer { &self.provider } - pub fn events(&mut self) -> impl Stream + '_ { + pub fn events_all(&mut self) -> impl Stream + '_ { async_stream::stream! { let block_subscription = self .provider @@ -158,6 +164,95 @@ impl Observer { } } } + + pub fn request_events(&mut self) -> impl Stream + '_ { + async_stream::stream! { + let block_subscription = self + .provider + .subscribe_blocks() + .await + .expect("failed to subscribe to blocks"); + let mut block_stream = block_subscription.into_stream(); + let mut futures = FuturesUnordered::new(); + + loop { + tokio::select! { + block = block_stream.next() => { + let Some(block) = block else { + log::info!("Block stream ended"); + break; + }; + + log::trace!("Received block: {:?}", block.header.hash); + + let block_hash = (*block.header.hash).into(); + let parent_hash = (*block.header.parent_hash).into(); + let block_number = block.header.number; + let block_timestamp = block.header.timestamp; + + let events = match read_block_request_events(block_hash, &self.provider, self.router_address).await { + Ok(events) => events, + Err(err) => { + log::error!("failed to read events: {err}"); + continue; + } + }; + + let mut codes_len = 0; + + // Create futures to load codes + // TODO (breathx): remove me from here mb + for event in events.iter() { + if let BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested { code_id, blob_tx_hash }) = event { + codes_len += 1; + + let blob_reader = self.blob_reader.clone(); + + let code_id = *code_id; + let blob_tx_hash = *blob_tx_hash; + + futures.push(async move { + let attempts = Some(3); + + read_code_from_tx_hash( + blob_reader, + code_id, + blob_tx_hash, + attempts, + ).await + }); + } + } + + self.update_status(|status| { + status.eth_block_number = block_number; + if codes_len > 0 { + status.last_router_state = block_number; + } + status.pending_upload_code = codes_len as u64; + }); + + let block_data = RequestBlockData { + block_hash, + parent_hash, + block_number, + block_timestamp, + events, + }; + + yield RequestEvent::Block(block_data); + }, + future = futures.next(), if !futures.is_empty() => { + match future { + Some(Ok((code_id, code))) => yield RequestEvent::CodeLoaded { code_id, code }, + Some(Err(err)) => log::error!("failed to handle upload code event: {err}"), + None => continue, + } + } + }; + } + } + } } pub(crate) async fn read_code_from_tx_hash( @@ -180,6 +275,7 @@ pub(crate) async fn read_code_from_tx_hash( // TODO (breathx): only read events that require some activity. // TODO (breathx): don't store not our events. +#[allow(unused)] // TODO (breathx). pub(crate) async fn read_block_events( block_hash: H256, provider: &ObserverProvider, @@ -195,6 +291,7 @@ pub(crate) async fn read_block_events( .map(|v| v.into_values().next().unwrap_or_default()) } +#[allow(unused)] // TODO (breathx) pub(crate) async fn read_block_events_batch( from_block: u32, to_block: u32, @@ -232,9 +329,9 @@ async fn read_events_impl( ) -> Result>> { let router_and_wvara_topic = Topic::from_iter( router::events::signatures::ALL - .into_iter() + .iter() .chain(wvara::events::signatures::ALL) - .map(|v| v.to_fixed_bytes().into()), + .cloned(), ); let router_and_wvara_filter = filter @@ -243,7 +340,7 @@ async fn read_events_impl( .event_signature(router_and_wvara_topic); let mirror_filter = filter.event_signature(Topic::from_iter( - mirror::events::signatures::ALL.map(|v| v.to_fixed_bytes().into()), + mirror::events::signatures::ALL.iter().cloned(), )); let (router_and_wvara_logs, mirrors_logs) = future::try_join( @@ -291,6 +388,128 @@ async fn read_events_impl( Ok(res) } +// TODO (breathx): only read events that require some activity. +// TODO (breathx): don't store not our events. +pub(crate) async fn read_block_request_events( + block_hash: H256, + provider: &ObserverProvider, + router_address: AlloyAddress, +) -> Result> { + let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); + let wvara_address = router_query.wvara_address().await?; + + let filter = Filter::new().at_block_hash(block_hash.to_fixed_bytes()); + + read_request_events_impl(router_address, wvara_address, provider, filter) + .await + .map(|v| v.into_values().next().unwrap_or_default()) +} + +pub(crate) async fn read_block_request_events_batch( + from_block: u32, + to_block: u32, + provider: &ObserverProvider, + router_address: AlloyAddress, +) -> Result>> { + let router_query = RouterQuery::from_provider(router_address, Arc::new(provider.clone())); + let wvara_address = router_query.wvara_address().await?; + + let mut res = HashMap::new(); + + let mut start_block = from_block as u64; + let to_block = to_block as u64; + + while start_block <= to_block { + let end_block = to_block.min(start_block + MAX_QUERY_BLOCK_RANGE - 1); + + let filter = Filter::new().from_block(start_block).to_block(end_block); + + let iter_res = + read_request_events_impl(router_address, wvara_address, provider, filter).await?; + + res.extend(iter_res.into_iter()); + + start_block = end_block + 1; + } + + Ok(res) +} + +async fn read_request_events_impl( + router_address: AlloyAddress, + wvara_address: AlloyAddress, + provider: &ObserverProvider, + filter: Filter, +) -> Result>> { + let router_and_wvara_topic = Topic::from_iter( + router::events::signatures::REQUESTS + .iter() + .chain(wvara::events::signatures::REQUESTS) + .cloned(), + ); + + let router_and_wvara_filter = filter + .clone() + .address(vec![router_address, wvara_address]) + .event_signature(router_and_wvara_topic); + + let mirror_filter = filter.event_signature(Topic::from_iter( + mirror::events::signatures::REQUESTS.iter().cloned(), + )); + + let (router_and_wvara_logs, mirrors_logs) = future::try_join( + provider.get_logs(&router_and_wvara_filter), + provider.get_logs(&mirror_filter), + ) + .await?; + + let block_hash_of = |log: &alloy::rpc::types::Log| -> Result { + log.block_hash + .map(|v| v.0.into()) + .ok_or(anyhow!("Block hash is missing")) + }; + + let out_of_scope_addresses = [ + (*router_address.into_word()).into(), + (*wvara_address.into_word()).into(), + ActorId::zero(), + ]; + + let mut res: HashMap<_, Vec<_>> = HashMap::new(); + + for router_or_wvara_log in router_and_wvara_logs { + let block_hash = block_hash_of(&router_or_wvara_log)?; + + let maybe_block_request_event = if router_or_wvara_log.address() == router_address { + router::events::try_extract_request_event(&router_or_wvara_log)?.map(Into::into) + } else { + wvara::events::try_extract_request_event(&router_or_wvara_log)? + .filter(|v| !v.involves_addresses(&out_of_scope_addresses)) + .map(Into::into) + }; + + if let Some(block_request_event) = maybe_block_request_event { + res.entry(block_hash).or_default().push(block_request_event); + } + } + + for mirror_log in mirrors_logs { + let block_hash = block_hash_of(&mirror_log)?; + + let address = (*mirror_log.address().into_word()).into(); + + // TODO (breathx): if address is unknown, then continue. + + if let Some(request_event) = mirror::events::try_extract_request_event(&mirror_log)? { + res.entry(block_hash) + .or_default() + .push(BlockRequestEvent::mirror(address, request_event)); + } + } + + Ok(res) +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -343,7 +562,7 @@ mod tests { .await .expect("failed to create observer"); - let observer_events = observer.events(); + let observer_events = observer.events_all(); futures::pin_mut!(observer_events); send_subscription_created.send(()).unwrap(); diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs index db5059a34e6..7f9aa3e3580 100644 --- a/ethexe/observer/src/query.rs +++ b/ethexe/observer/src/query.rs @@ -5,7 +5,8 @@ use std::{ use crate::{ observer::{ - read_block_events, read_block_events_batch, read_code_from_tx_hash, ObserverProvider, + read_block_events, read_block_request_events, read_block_request_events_batch, + read_code_from_tx_hash, ObserverProvider, }, BlobReader, }; @@ -19,7 +20,7 @@ use anyhow::{anyhow, Result}; use ethexe_common::{ db::{BlockHeader, BlockMetaStorage}, router::Event as RouterEvent, - BlockEvent, + BlockEvent, BlockRequestEvent, }; use ethexe_signer::Address; use gprimitives::{CodeId, H256}; @@ -81,6 +82,7 @@ impl Query { } async fn get_committed_blocks(&mut self, block_hash: H256) -> Result> { + // TODO (breathx): optimize me ASAP. Ok(self .get_block_events(block_hash) .await? @@ -130,9 +132,13 @@ impl Query { }); // Fetch events in block range. - let mut blocks_events = - read_block_events_batch(from_block, to_block, &self.provider, self.router_address) - .await?; + let mut blocks_events = read_block_request_events_batch( + from_block, + to_block, + &self.provider, + self.router_address, + ) + .await?; // Collect results let mut block_headers = HashMap::new(); @@ -322,8 +328,9 @@ impl Query { // Populate block events in db. let events = - read_block_events(block_hash, &self.provider, self.router_address).await?; - self.database.set_block_events(block_hash, events.clone()); + read_block_request_events(block_hash, &self.provider, self.router_address) + .await?; + self.database.set_block_events(block_hash, events); Ok(meta) } @@ -335,11 +342,19 @@ impl Query { } pub async fn get_block_events(&mut self, block_hash: H256) -> Result> { + read_block_events(block_hash, &self.provider, self.router_address).await + } + + pub async fn get_block_request_events( + &mut self, + block_hash: H256, + ) -> Result> { if let Some(events) = self.database.block_events(block_hash) { return Ok(events); } - let events = read_block_events(block_hash, &self.provider, self.router_address).await?; + let events = + read_block_request_events(block_hash, &self.provider, self.router_address).await?; self.database.set_block_events(block_hash, events.clone()); Ok(events) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 88a67444b41..0d91f9413c1 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -20,10 +20,10 @@ use anyhow::Result; use ethexe_common::{ - mirror::Event as MirrorEvent, - router::{Event as RouterEvent, StateTransition}, - wvara::Event as WVaraEvent, - BlockEvent, + mirror::RequestEvent as MirrorEvent, + router::{RequestEvent as RouterEvent, StateTransition}, + wvara::RequestEvent as WVaraEvent, + BlockRequestEvent, }; use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage}; @@ -227,8 +227,7 @@ impl Processor { pub fn process_block_events( &mut self, block_hash: H256, - // TODO (breathx): accept not ref? - events: &[BlockEvent], + events: Vec, ) -> Result> { log::debug!("Processing events for {block_hash:?}: {events:#?}"); @@ -239,14 +238,14 @@ impl Processor { for event in events { match event { - BlockEvent::Router(event) => { - self.process_router_event(&mut states, event.clone())?; + BlockRequestEvent::Router(event) => { + self.handle_router_event(&mut states, event)?; } - BlockEvent::Mirror { address, event } => { - self.process_mirror_event(&mut states, *address, event.clone())?; + BlockRequestEvent::Mirror { address, event } => { + self.handle_mirror_event(&mut states, address, event)?; } - BlockEvent::WVara(event) => { - self.process_wvara_event(&mut states, event.clone())?; + BlockRequestEvent::WVara(event) => { + self.handle_wvara_event(&mut states, event)?; } } } @@ -258,7 +257,7 @@ impl Processor { Ok(outcomes) } - fn process_router_event( + fn handle_router_event( &mut self, states: &mut BTreeMap, event: RouterEvent, @@ -277,16 +276,12 @@ impl Processor { log::debug!("Handler not yet implemented: {event:?}"); return Ok(()); } - RouterEvent::BlockCommitted { .. } | RouterEvent::CodeGotValidated { .. } => { - log::debug!("Informational events are noop for processing: {event:?}"); - return Ok(()); - } }; Ok(()) } - fn process_mirror_event( + fn handle_mirror_event( &mut self, states: &mut BTreeMap, actor_id: ProgramId, @@ -338,13 +333,6 @@ impl Processor { log::debug!("Handler not yet implemented: {event:?}"); return Ok(()); } - MirrorEvent::StateChanged { .. } - | MirrorEvent::ValueClaimed { .. } - | MirrorEvent::Message { .. } - | MirrorEvent::Reply { .. } => { - log::debug!("Informational events are noop for processing: {event:?}"); - return Ok(()); - } }; states.insert(actor_id, new_state_hash); @@ -352,7 +340,7 @@ impl Processor { Ok(()) } - fn process_wvara_event( + fn handle_wvara_event( &mut self, _states: &mut BTreeMap, event: WVaraEvent, @@ -362,10 +350,6 @@ impl Processor { log::debug!("Handler not yet implemented: {event:?}"); Ok(()) } - WVaraEvent::Approval { .. } => { - log::debug!("Informational events are noop for processing: {event:?}"); - Ok(()) - } } } } diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index b4b25d09ee5..c7fa00d82a3 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -17,7 +17,9 @@ // along with this program. If not, see . use crate::*; -use ethexe_common::{mirror::Event as MirrorEvent, router::Event as RouterEvent, BlockEvent}; +use ethexe_common::{ + mirror::RequestEvent as MirrorEvent, router::RequestEvent as RouterEvent, BlockRequestEvent, +}; use ethexe_db::{BlockHeader, BlockMetaStorage, CodesStorage, MemDb}; use gear_core::{ids::prelude::CodeIdExt, message::DispatchKind}; use gprimitives::{ActorId, MessageId}; @@ -84,14 +86,14 @@ fn process_observer_event() { let actor_id = ActorId::from(42); let create_program_events = vec![ - BlockEvent::Router(RouterEvent::ProgramCreated { actor_id, code_id }), - BlockEvent::mirror( + BlockRequestEvent::Router(RouterEvent::ProgramCreated { actor_id, code_id }), + BlockRequestEvent::mirror( actor_id, MirrorEvent::ExecutableBalanceTopUpRequested { value: 10_000_000_000, }, ), - BlockEvent::mirror( + BlockRequestEvent::mirror( actor_id, MirrorEvent::MessageQueueingRequested { id: H256::random().0.into(), @@ -103,14 +105,14 @@ fn process_observer_event() { ]; let outcomes = processor - .process_block_events(ch1, create_program_events.as_slice()) + .process_block_events(ch1, create_program_events) .expect("failed to process create program"); log::debug!("\n\nCreate program outcomes: {outcomes:?}\n\n"); let ch2 = init_new_block_from_parent(&mut processor, ch1); - let send_message_event = BlockEvent::mirror( + let send_message_event = BlockRequestEvent::mirror( actor_id, MirrorEvent::MessageQueueingRequested { id: H256::random().0.into(), @@ -121,7 +123,7 @@ fn process_observer_event() { ); let outcomes = processor - .process_block_events(ch2, &[send_message_event]) + .process_block_events(ch2, vec![send_message_event]) .expect("failed to process send message"); log::debug!("\n\nSend message outcomes: {outcomes:?}\n\n"); diff --git a/ethexe/sequencer/src/lib.rs b/ethexe/sequencer/src/lib.rs index a3e726c3d3f..b6f5a082153 100644 --- a/ethexe/sequencer/src/lib.rs +++ b/ethexe/sequencer/src/lib.rs @@ -24,7 +24,7 @@ use agro::{AggregatedCommitments, MultisignedCommitmentDigests, MultisignedCommi use anyhow::{anyhow, Result}; use ethexe_common::router::{BlockCommitment, CodeCommitment}; use ethexe_ethereum::Ethereum; -use ethexe_observer::Event; +use ethexe_observer::{RequestBlockData, RequestEvent}; use ethexe_signer::{Address, Digest, PublicKey, Signature, Signer, ToDigest}; use gprimitives::H256; use indexmap::IndexSet; @@ -105,8 +105,8 @@ impl Sequencer { } // This function should never block. - pub fn process_observer_event(&mut self, event: &Event) -> Result<()> { - if let Event::Block(block_data) = event { + pub fn process_observer_event(&mut self, event: &RequestEvent) -> Result<()> { + if let RequestEvent::Block(RequestBlockData { block_hash, .. }) = event { // Reset status, candidates and chain-head each block event self.update_status(|status| { @@ -115,7 +115,7 @@ impl Sequencer { self.codes_candidate = None; self.blocks_candidate = None; - self.chain_head = Some(block_data.block_hash); + self.chain_head = Some(*block_hash); } Ok(()) diff --git a/runtime-interface/src/lib.rs b/runtime-interface/src/lib.rs index 0a52a6cd29c..4c099d6f685 100644 --- a/runtime-interface/src/lib.rs +++ b/runtime-interface/src/lib.rs @@ -185,7 +185,7 @@ pub trait GearRI { lazy_pages_detail::write_accessed_pages() } - /* Bellow goes deprecated runtime interface functions. */ + /* Below goes deprecated runtime interface functions. */ fn pre_process_memory_accesses( reads: &[MemoryInterval], writes: &[MemoryInterval],