Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): use real block header in handling; impl Mailbox timeouts; remove scheduled tasks on actions #4283

Merged
merged 13 commits into from
Oct 9, 2024
2 changes: 1 addition & 1 deletion core-processor/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ impl<LP: LazyPagesInterface> Externalities for Ext<LP> {
}

fn system_reserve_gas(&mut self, amount: u64) -> Result<(), Self::FallibleError> {
// TODO: use `NonZeroU64` after issue #1838 is fixed
// TODO: use `NonZero<u64>` after issue #1838 is fixed
if amount == 0 {
return Err(ReservationError::ZeroReservationAmount.into());
}
Expand Down
25 changes: 8 additions & 17 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ethexe_common::{
},
BlockRequestEvent,
};
use ethexe_db::{BlockHeader, BlockMetaStorage, CodesStorage, Database};
use ethexe_db::{BlockMetaStorage, CodesStorage, Database};
use ethexe_ethereum::router::RouterQuery;
use ethexe_network::NetworkReceiverEvent;
use ethexe_observer::{RequestBlockData, RequestEvent};
Expand Down Expand Up @@ -333,20 +333,11 @@ impl Service {
processor: &mut ethexe_processor::Processor,
block_data: RequestBlockData,
) -> Result<Vec<BlockCommitment>> {
db.set_block_events(block_data.block_hash, block_data.events);
db.set_block_header(
block_data.block_hash,
BlockHeader {
height: block_data.block_number.try_into()?,
timestamp: block_data.block_timestamp,
parent_hash: block_data.parent_hash,
},
);
db.set_block_events(block_data.hash, block_data.events);
db.set_block_header(block_data.hash, block_data.header);

let mut commitments = vec![];
let last_committed_chain = query
.get_last_committed_chain(block_data.block_hash)
.await?;
let last_committed_chain = query.get_last_committed_chain(block_data.hash).await?;
for block_hash in last_committed_chain.into_iter().rev() {
let transitions = Self::process_one_block(db, query, processor, block_hash).await?;

Expand All @@ -357,7 +348,7 @@ impl Service {

commitments.push(BlockCommitment {
block_hash,
pred_block_hash: block_data.block_hash,
pred_block_hash: block_data.hash,
prev_commitment_hash: db
.block_prev_commitment(block_hash)
.ok_or_else(|| anyhow!("Prev commitment not found"))?,
Expand All @@ -383,9 +374,9 @@ impl Service {
RequestEvent::Block(block_data) => {
log::info!(
"📦 receive a new block {}, hash {}, parent hash {}",
block_data.block_number,
block_data.block_hash,
block_data.parent_hash
block_data.header.height,
block_data.hash,
block_data.header.parent_hash
);

let commitments =
Expand Down
75 changes: 62 additions & 13 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use anyhow::Result;
use ethexe_common::{
db::CodesStorage, mirror::Event as MirrorEvent, router::Event as RouterEvent, BlockEvent,
};
use ethexe_db::{Database, MemDb};
use ethexe_db::{BlockMetaStorage, Database, MemDb, ScheduledTask};
use ethexe_ethereum::{router::RouterQuery, Ethereum};
use ethexe_observer::{Event, MockBlobReader, Observer, Query};
use ethexe_processor::Processor;
Expand All @@ -42,7 +42,11 @@ use gear_core::{
};
use gprimitives::{ActorId, CodeId, MessageId, H160, H256};
use parity_scale_codec::Encode;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::Duration,
};
use tokio::{
sync::oneshot,
task::{self, JoinHandle},
Expand Down Expand Up @@ -183,12 +187,13 @@ async fn mailbox() {
.await
.unwrap();

let mid_expected_message = MessageId::generate_outgoing(res.message_id, 0);
let ping_expected_message = MessageId::generate_outgoing(res.message_id, 1);
let original_mid = res.message_id;
let mid_expected_message = MessageId::generate_outgoing(original_mid, 0);
let ping_expected_message = MessageId::generate_outgoing(original_mid, 1);

let mut listener = env.events_publisher().subscribe().await;
listener
.apply_until_block_event(|event| match event {
let block_data = listener
.apply_until_block_event_with_header(|event, block_data| match event {
BlockEvent::Mirror { address, event } if address == pid => {
if let MirrorEvent::Message {
id,
Expand All @@ -204,7 +209,7 @@ async fn mailbox() {
Ok(None)
} else if id == ping_expected_message {
assert_eq!(payload, b"PING");
Ok(Some(()))
Ok(Some(block_data.clone()))
} else {
unreachable!()
}
Expand All @@ -217,9 +222,37 @@ async fn mailbox() {
.await
.unwrap();

// -1 bcs execution took place in previous block, not the one that emits events.
let wake_expiry = block_data.header.height - 1 + 100; // 100 is default wait for.
let expiry = block_data.header.height - 1 + ethexe_runtime_common::state::MAILBOX_VALIDITY;

let expected_schedule = BTreeMap::from_iter([
(
wake_expiry,
BTreeSet::from_iter([ScheduledTask::WakeMessage(pid, original_mid)]),
),
(
expiry,
BTreeSet::from_iter([
ScheduledTask::RemoveFromMailbox((pid, env.sender_id), mid_expected_message),
ScheduledTask::RemoveFromMailbox((pid, env.sender_id), ping_expected_message),
]),
),
]);

let schedule = node
.db
.block_end_schedule(block_data.header.parent_hash)
.expect("must exist");

assert_eq!(schedule, expected_schedule);

let expected_mailbox = BTreeMap::from_iter([(
env.sender_id,
BTreeMap::from_iter([(mid_expected_message, 0), (ping_expected_message, 0)]),
BTreeMap::from_iter([
(mid_expected_message, (0, expiry)),
(ping_expected_message, (0, expiry)),
]),
)]);
let mirror = env.ethereum.mirror(pid.try_into().unwrap());
let state_hash = mirror.query().state_hash().await.unwrap();
Expand Down Expand Up @@ -255,20 +288,20 @@ async fn mailbox() {

let expected_mailbox = BTreeMap::from_iter([(
env.sender_id,
BTreeMap::from_iter([(mid_expected_message, 0)]),
BTreeMap::from_iter([(mid_expected_message, (0, expiry))]),
)]);

assert_eq!(mailbox, expected_mailbox);

mirror.claim_value(mid_expected_message).await.unwrap();

listener
.apply_until_block_event(|event| match event {
let block_data = listener
.apply_until_block_event_with_header(|event, block_data| match event {
BlockEvent::Mirror { address, event } if address == pid => match event {
MirrorEvent::ValueClaimed { claimed_id, .. }
if claimed_id == mid_expected_message =>
{
Ok(Some(()))
Ok(Some(block_data.clone()))
}
_ => Ok(None),
},
Expand All @@ -281,6 +314,12 @@ async fn mailbox() {

let state = node.db.read_state(state_hash).unwrap();
assert!(state.mailbox_hash.is_empty());

let schedule = node
.db
.block_end_schedule(block_data.header.parent_hash)
.expect("must exist");
assert!(schedule.is_empty(), "{:?}", schedule);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -646,6 +685,7 @@ async fn multiple_validators() {

mod utils {
use super::*;
use ethexe_observer::SimpleBlockData;
use futures::StreamExt;
use gear_core::message::ReplyCode;
use tokio::sync::{broadcast::Sender, Mutex};
Expand Down Expand Up @@ -1020,6 +1060,13 @@ mod utils {
pub async fn apply_until_block_event<R: Sized>(
&mut self,
mut f: impl FnMut(BlockEvent) -> Result<Option<R>>,
) -> Result<R> {
self.apply_until_block_event_with_header(|e, _h| f(e)).await
}

pub async fn apply_until_block_event_with_header<R: Sized>(
&mut self,
mut f: impl FnMut(BlockEvent, &SimpleBlockData) -> Result<Option<R>>,
) -> Result<R> {
loop {
let event = self.next_event().await?;
Expand All @@ -1028,8 +1075,10 @@ mod utils {
continue;
};

let block_data = block.as_simple();

for event in block.events {
if let Some(res) = f(event)? {
if let Some(res) = f(event, &block_data)? {
return Ok(res);
}
}
Expand Down
18 changes: 16 additions & 2 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use gear_core::{
use gprimitives::H256;
use parity_scale_codec::{Decode, Encode};

pub type ScheduledTask = gear_core::tasks::ScheduledTask<ActorId>;
/// NOTE: key for actor id is (program_id, user_id). only used for mailbox.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<(ProgramId, ActorId)>;

#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize)]
pub struct BlockHeader {
Expand All @@ -39,13 +40,26 @@ pub struct BlockHeader {
pub parent_hash: H256,
}

impl BlockHeader {
pub fn dummy(height: u32) -> Self {
let mut parent_hash = [0; 32];
parent_hash[..4].copy_from_slice(&height.to_le_bytes());

Self {
height,
timestamp: height as u64 * 12,
parent_hash: parent_hash.into(),
}
}
}

#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct CodeUploadInfo {
pub origin: ActorId,
pub tx_hash: H256,
}

pub type Schedule = BTreeMap<u32, Vec<ScheduledTask>>;
pub type Schedule = BTreeMap<u32, BTreeSet<ScheduledTask>>;

pub trait BlockMetaStorage: Send + Sync {
fn block_header(&self, block_hash: H256) -> Option<BlockHeader>;
Expand Down
2 changes: 1 addition & 1 deletion ethexe/ethereum/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Router {
}
}

Err(anyhow::anyhow!("Failed to define if code is validated"))
Err(anyhow!("Failed to define if code is validated"))
}

pub async fn create_program(
Expand Down
8 changes: 4 additions & 4 deletions ethexe/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod export {
pub use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
}

use anyhow::Context;
use anyhow::{anyhow, Context};
use ethexe_db::Database;
use ethexe_signer::{PublicKey, Signer};
use futures::future::Either;
Expand Down Expand Up @@ -544,18 +544,18 @@ impl Behaviour {
gossipsub::MessageId::from(hasher.finish().to_be_bytes())
})
.build()
.map_err(|e| anyhow::anyhow!("`gossipsub::ConfigBuilder::build()` error: {e}"))?;
.map_err(|e| anyhow!("`gossipsub::ConfigBuilder::build()` error: {e}"))?;
let mut gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossip_config,
)
.map_err(|e| anyhow::anyhow!("`gossipsub::Behaviour` error: {e}"))?;
.map_err(|e| anyhow!("`gossipsub::Behaviour` error: {e}"))?;
gossipsub
.with_peer_score(
gossipsub::PeerScoreParams::default(),
gossipsub::PeerScoreThresholds::default(),
)
.map_err(|e| anyhow::anyhow!("`gossipsub` scoring parameters error: {e}"))?;
.map_err(|e| anyhow!("`gossipsub` scoring parameters error: {e}"))?;

gossipsub.subscribe(&gpu_commitments_topic())?;

Expand Down
37 changes: 29 additions & 8 deletions ethexe/observer/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use ethexe_common::{BlockEvent, BlockRequestEvent};
use ethexe_db::BlockHeader;
use gprimitives::{CodeId, H256};
use parity_scale_codec::{Decode, Encode};

Expand All @@ -16,18 +17,38 @@ pub enum Event {

#[derive(Debug, Clone, Encode, Decode)]
pub struct RequestBlockData {
pub parent_hash: H256,
pub block_hash: H256,
pub block_number: u64,
pub block_timestamp: u64,
pub hash: H256,
pub header: BlockHeader,
pub events: Vec<BlockRequestEvent>,
}

impl RequestBlockData {
pub fn as_simple(&self) -> SimpleBlockData {
SimpleBlockData {
hash: self.hash,
header: self.header.clone(),
}
}
}

#[derive(Debug, Clone, Encode, Decode)]
pub struct BlockData {
pub parent_hash: H256,
pub block_hash: H256,
pub block_number: u64,
pub block_timestamp: u64,
pub hash: H256,
pub header: BlockHeader,
pub events: Vec<BlockEvent>,
}

impl BlockData {
pub fn as_simple(&self) -> SimpleBlockData {
SimpleBlockData {
hash: self.hash,
header: self.header.clone(),
}
}
}

#[derive(Debug, Clone, Encode, Decode)]
pub struct SimpleBlockData {
pub hash: H256,
pub header: BlockHeader,
}
2 changes: 1 addition & 1 deletion ethexe/observer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ mod observer;
mod query;

pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader};
pub use event::{BlockData, Event, RequestBlockData, RequestEvent};
pub use event::{BlockData, Event, RequestBlockData, RequestEvent, SimpleBlockData};
pub use observer::{Observer, ObserverStatus};
pub use query::Query;
Loading
Loading