Skip to content

Commit

Permalink
feat(ethexe): Only handle non-informational events (#4190)
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx authored Sep 9, 2024
1 parent 057da36 commit f22e7df
Show file tree
Hide file tree
Showing 23 changed files with 627 additions and 195 deletions.
2 changes: 1 addition & 1 deletion core/src/message/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl MessageContext {
return Err(Error::OutOfBoundsInputSliceOffset);
}

// Check `len` for the current `offset` doesn't refer to the slice out of intput bounds.
// Check `len` for the current `offset` doesn't refer to the slice out of input bounds.
let available_len = input.len() - offset;
if len > available_len {
return Err(Error::OutOfBoundsInputSliceLength);
Expand Down
34 changes: 19 additions & 15 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use crate::{
};
use anyhow::{anyhow, Ok, Result};
use ethexe_common::{
router::{BlockCommitment, CodeCommitment, Event as RouterEvent, StateTransition},
BlockEvent,
router::{
BlockCommitment, CodeCommitment, RequestEvent as RouterRequestEvent, StateTransition,
},
BlockRequestEvent,
};
use ethexe_db::{BlockHeader, BlockMetaStorage, CodesStorage, Database};
use ethexe_ethereum::router::RouterQuery;
use ethexe_network::NetworkReceiverEvent;
use ethexe_observer::{BlockData, Event as ObserverEvent};
use ethexe_observer::{RequestBlockData, RequestEvent};
use ethexe_processor::LocalOutcome;
use ethexe_sequencer::agro::AggregatedCommitments;
use ethexe_signer::{Digest, PublicKey, Signature, Signer};
Expand Down Expand Up @@ -240,17 +242,19 @@ impl Service {
processor: &mut ethexe_processor::Processor,
block_hash: H256,
) -> Result<()> {
let events = query.get_block_events(block_hash).await?;
let events = query.get_block_request_events(block_hash).await?;

for event in events {
match event {
BlockEvent::Router(RouterEvent::CodeValidationRequested {
BlockRequestEvent::Router(RouterRequestEvent::CodeValidationRequested {
code_id,
blob_tx_hash,
}) => {
db.set_code_blob_tx(code_id, blob_tx_hash);
}
BlockEvent::Router(RouterEvent::ProgramCreated { code_id, .. }) => {
BlockRequestEvent::Router(RouterRequestEvent::ProgramCreated {
code_id, ..
}) => {
if db.original_code(code_id).is_some() {
continue;
}
Expand Down Expand Up @@ -286,9 +290,9 @@ impl Service {

Self::process_upload_codes(db, query, processor, block_hash).await?;

let block_events = query.get_block_events(block_hash).await?;
let block_request_events = query.get_block_request_events(block_hash).await?;

let block_outcomes = processor.process_block_events(block_hash, &block_events)?;
let block_outcomes = processor.process_block_events(block_hash, block_request_events)?;

let transition_outcomes: Vec<_> = block_outcomes
.into_iter()
Expand Down Expand Up @@ -327,9 +331,9 @@ impl Service {
db: &Database,
query: &mut ethexe_observer::Query,
processor: &mut ethexe_processor::Processor,
block_data: BlockData,
block_data: RequestBlockData,
) -> Result<Vec<BlockCommitment>> {
db.set_block_events(block_data.block_hash, block_data.events.clone());
db.set_block_events(block_data.block_hash, block_data.events);
db.set_block_header(
block_data.block_hash,
BlockHeader {
Expand Down Expand Up @@ -369,14 +373,14 @@ impl Service {
query: &mut ethexe_observer::Query,
processor: &mut ethexe_processor::Processor,
maybe_sequencer: &mut Option<ethexe_sequencer::Sequencer>,
observer_event: ethexe_observer::Event,
observer_event: RequestEvent,
) -> Result<(Vec<CodeCommitment>, Vec<BlockCommitment>)> {
if let Some(sequencer) = maybe_sequencer {
sequencer.process_observer_event(&observer_event)?;
}

match observer_event {
ObserverEvent::Block(block_data) => {
RequestEvent::Block(block_data) => {
log::info!(
"📦 receive a new block {}, hash {}, parent hash {}",
block_data.block_number,
Expand All @@ -389,7 +393,7 @@ impl Service {

Ok((Vec::new(), commitments))
}
ethexe_observer::Event::CodeLoaded { code_id, code } => {
RequestEvent::CodeLoaded { code_id, code } => {
let outcomes = processor.process_upload_code(code_id, code.as_slice())?;
let commitments: Vec<_> = outcomes
.into_iter()
Expand Down Expand Up @@ -425,7 +429,7 @@ impl Service {
));
}

let observer_events = observer.events();
let observer_events = observer.request_events();
futures::pin_mut!(observer_events);

let (mut network_sender, mut network_receiver, mut network_handle) =
Expand Down Expand Up @@ -467,7 +471,7 @@ impl Service {
break;
};

let is_block_event = matches!(observer_event, ObserverEvent::Block(_));
let is_block_event = matches!(observer_event, RequestEvent::Block(_));

let (code_commitments, block_commitments) = Self::process_observer_event(
&db,
Expand Down
2 changes: 1 addition & 1 deletion ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Listener {

let (send_subscription_created, receive_subscription_created) = oneshot::channel::<()>();
let _handle = task::spawn(async move {
let observer_events = observer.events();
let observer_events = observer.events_all();
futures::pin_mut!(observer_events);

send_subscription_created.send(()).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! ethexe common db types and traits.

use crate::{router::StateTransition, BlockEvent};
use crate::{router::StateTransition, BlockRequestEvent};
use alloc::{
collections::{BTreeMap, BTreeSet, VecDeque},
vec::Vec,
Expand Down Expand Up @@ -65,8 +65,8 @@ pub trait BlockMetaStorage: Send + Sync {
fn block_end_program_states(&self, block_hash: H256) -> Option<BTreeMap<ActorId, H256>>;
fn set_block_end_program_states(&self, block_hash: H256, map: BTreeMap<ActorId, H256>);

fn block_events(&self, block_hash: H256) -> Option<Vec<BlockEvent>>;
fn set_block_events(&self, block_hash: H256, events: Vec<BlockEvent>);
fn block_events(&self, block_hash: H256) -> Option<Vec<BlockRequestEvent>>;
fn set_block_events(&self, block_hash: H256, events: Vec<BlockRequestEvent>);

fn block_outcome(&self, block_hash: H256) -> Option<Vec<StateTransition>>;
fn set_block_outcome(&self, block_hash: H256, outcome: Vec<StateTransition>);
Expand Down
28 changes: 28 additions & 0 deletions ethexe/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,31 @@ impl From<wvara::Event> for BlockEvent {
Self::WVara(value)
}
}

#[derive(Clone, Debug, Encode, Decode)]
pub enum BlockRequestEvent {
Router(router::RequestEvent),
Mirror {
address: ActorId,
event: mirror::RequestEvent,
},
WVara(wvara::RequestEvent),
}

impl BlockRequestEvent {
pub fn mirror(address: ActorId, event: mirror::RequestEvent) -> Self {
Self::Mirror { address, event }
}
}

impl From<router::RequestEvent> for BlockRequestEvent {
fn from(value: router::RequestEvent) -> Self {
Self::Router(value)
}
}

impl From<wvara::RequestEvent> for BlockRequestEvent {
fn from(value: wvara::RequestEvent) -> Self {
Self::WVara(value)
}
}
62 changes: 62 additions & 0 deletions ethexe/common/src/mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,65 @@ pub enum Event {
source: ActorId,
},
}

impl Event {
pub fn as_request(self) -> Option<RequestEvent> {
Some(match self {
Self::ExecutableBalanceTopUpRequested { value } => {
RequestEvent::ExecutableBalanceTopUpRequested { value }
}
Self::MessageQueueingRequested {
id,
source,
payload,
value,
} => RequestEvent::MessageQueueingRequested {
id,
source,
payload,
value,
},
Self::ReplyQueueingRequested {
replied_to,
source,
payload,
value,
} => RequestEvent::ReplyQueueingRequested {
replied_to,
source,
payload,
value,
},
Self::ValueClaimingRequested { claimed_id, source } => {
RequestEvent::ValueClaimingRequested { claimed_id, source }
}
Self::StateChanged { .. }
| Self::ValueClaimed { .. }
| Self::Message { .. }
| Self::Reply { .. } => return None,
})
}
}

#[derive(Clone, Debug, Encode, Decode)]
pub enum RequestEvent {
ExecutableBalanceTopUpRequested {
value: u128,
},
MessageQueueingRequested {
id: MessageId,
source: ActorId,
payload: Vec<u8>,
value: u128,
},
ReplyQueueingRequested {
replied_to: MessageId,
source: ActorId,
payload: Vec<u8>,
value: u128,
},
ValueClaimingRequested {
claimed_id: MessageId,
source: ActorId,
},
}
48 changes: 48 additions & 0 deletions ethexe/common/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,51 @@ pub enum Event {
value_per_weight: u128,
},
}

impl Event {
pub fn as_request(self) -> Option<RequestEvent> {
Some(match self {
Self::BaseWeightChanged { base_weight } => {
RequestEvent::BaseWeightChanged { base_weight }
}
Self::CodeValidationRequested {
code_id,
blob_tx_hash,
} => RequestEvent::CodeValidationRequested {
code_id,
blob_tx_hash,
},
Self::ProgramCreated { actor_id, code_id } => {
RequestEvent::ProgramCreated { actor_id, code_id }
}
Self::StorageSlotChanged => RequestEvent::StorageSlotChanged,
Self::ValidatorsSetChanged => RequestEvent::ValidatorsSetChanged,
Self::ValuePerWeightChanged { value_per_weight } => {
RequestEvent::ValuePerWeightChanged { value_per_weight }
}
Self::BlockCommitted { .. } | Self::CodeGotValidated { .. } => return None,
})
}
}

#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)]
pub enum RequestEvent {
BaseWeightChanged {
base_weight: u64,
},
CodeValidationRequested {
code_id: CodeId,
// TODO (breathx): replace with `code: Vec<u8>`
/// This field is replaced with tx hash in case of zero.
blob_tx_hash: H256,
},
ProgramCreated {
actor_id: ActorId,
code_id: CodeId,
},
StorageSlotChanged,
ValidatorsSetChanged,
ValuePerWeightChanged {
value_per_weight: u128,
},
}
34 changes: 34 additions & 0 deletions ethexe/common/src/wvara.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,37 @@ pub enum Event {
value: U256,
},
}

impl Event {
pub fn as_request(self) -> Option<RequestEvent> {
Some(match self {
Self::Transfer { from, to, value } => RequestEvent::Transfer { from, to, value },
Self::Approval { .. } => return None,
})
}
}

#[derive(Clone, Debug, Encode, Decode)]
pub enum RequestEvent {
Transfer {
/// Never router, wvara or zero address.
from: ActorId,
/// Never router, wvara or zero address.
to: ActorId,
value: u128,
},
}

impl RequestEvent {
pub fn involves_address(&self, address: &ActorId) -> bool {
match self {
Self::Transfer { from, to, .. } => from == address || to == address,
}
}

pub fn involves_addresses(&self, addresses: &[ActorId]) -> bool {
match self {
Self::Transfer { from, to, .. } => addresses.contains(from) || addresses.contains(to),
}
}
}
12 changes: 6 additions & 6 deletions ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{CASDatabase, KVDatabase};
use ethexe_common::{
db::{BlockHeader, BlockMetaStorage, CodesStorage},
router::StateTransition,
BlockEvent,
BlockRequestEvent,
};
use ethexe_runtime_common::state::{
Allocations, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist,
Expand Down Expand Up @@ -221,16 +221,16 @@ impl BlockMetaStorage for Database {
);
}

fn block_events(&self, block_hash: H256) -> Option<Vec<BlockEvent>> {
fn block_events(&self, block_hash: H256) -> Option<Vec<BlockRequestEvent>> {
self.kv
.get(&KeyPrefix::BlockEvents.two(self.router_address, block_hash))
.map(|data| {
Vec::<BlockEvent>::decode(&mut data.as_slice())
Vec::<BlockRequestEvent>::decode(&mut data.as_slice())
.expect("Failed to decode data into `Vec<BlockEvent>`")
})
}

fn set_block_events(&self, block_hash: H256, events: Vec<BlockEvent>) {
fn set_block_events(&self, block_hash: H256, events: Vec<BlockRequestEvent>) {
self.kv.put(
&KeyPrefix::BlockEvents.two(self.router_address, block_hash),
events.encode(),
Expand Down Expand Up @@ -300,8 +300,8 @@ impl CodesStorage for Database {
self.kv
.iter_prefix(&key_prefix)
.map(|#[allow(unused_variables)] (key, code_id)| {
let (splitted_key_prefix, program_id) = key.split_at(key_prefix.len());
debug_assert_eq!(splitted_key_prefix, key_prefix);
let (split_key_prefix, program_id) = key.split_at(key_prefix.len());
debug_assert_eq!(split_key_prefix, key_prefix);
let program_id =
ProgramId::try_from(program_id).expect("Failed to decode key into `ProgramId`");

Expand Down
Loading

0 comments on commit f22e7df

Please sign in to comment.