Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): impl scheduled program wakes; restore many-waits test #4273

Merged
merged 11 commits into from
Oct 2, 2024
10 changes: 6 additions & 4 deletions ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct CodeUploadInfo {
pub tx_hash: H256,
}

pub type Schedule = BTreeMap<u32, Vec<ScheduledTask>>;

pub trait BlockMetaStorage: Send + Sync {
fn block_header(&self, block_hash: H256) -> Option<BlockHeader>;
fn set_block_header(&self, block_hash: H256, header: BlockHeader);
Expand Down Expand Up @@ -76,11 +78,11 @@ pub trait BlockMetaStorage: Send + Sync {
fn latest_valid_block(&self) -> Option<(H256, BlockHeader)>;
fn set_latest_valid_block(&self, block_hash: H256, header: BlockHeader);

fn block_start_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>>;
fn set_block_start_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>);
fn block_start_schedule(&self, block_hash: H256) -> Option<Schedule>;
fn set_block_start_schedule(&self, block_hash: H256, map: Schedule);

fn block_end_schedule(&self, block_hash: H256) -> Option<BTreeMap<u32, Vec<ScheduledTask>>>;
fn set_block_end_schedule(&self, block_hash: H256, map: BTreeMap<u32, Vec<ScheduledTask>>);
fn block_end_schedule(&self, block_hash: H256) -> Option<Schedule>;
fn set_block_end_schedule(&self, block_hash: H256, map: Schedule);
}

pub trait CodesStorage: Send + Sync {
Expand Down
24 changes: 22 additions & 2 deletions ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,34 @@
use crate::Processor;
use anyhow::Result;
use ethexe_db::CodesStorage;
use ethexe_runtime_common::state::{ComplexStorage as _, Dispatch};
use ethexe_runtime_common::{
state::{ComplexStorage as _, Dispatch},
InBlockTransitions, ScheduleHandler,
};
use gprimitives::{CodeId, H256};

pub(crate) mod events;
pub(crate) mod run;
pub(crate) mod tasks;

impl Processor {
pub fn run_schedule(&mut self, in_block_transitions: &mut InBlockTransitions) {
let tasks = in_block_transitions.take_actual_tasks();

log::debug!(
"Running schedule for #{}: tasks are {tasks:?}",
in_block_transitions.block_number()
);

let mut handler = ScheduleHandler {
in_block_transitions,
storage: &self.db,
};

for task in tasks {
let _gas = task.process_with(&mut handler);
}
}

pub(crate) fn handle_message_queueing(
&mut self,
state_hash: H256,
Expand Down
23 changes: 2 additions & 21 deletions ethexe/processor/src/handling/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::host::{InstanceCreator, InstanceWrapper};
use core_processor::common::JournalNote;
use ethexe_db::{CodesStorage, Database};
use ethexe_runtime_common::{Handler, InBlockTransitions};
use ethexe_runtime_common::{InBlockTransitions, JournalHandler};
use gear_core::ids::ProgramId;
use gprimitives::H256;
use std::collections::BTreeMap;
Expand All @@ -31,12 +31,6 @@ enum Task {
state_hash: H256,
result_sender: oneshot::Sender<Vec<JournalNote>>,
},
#[allow(unused)] // TODO (breathx)
WakeMessages {
program_id: ProgramId,
state_hash: H256,
result_sender: oneshot::Sender<H256>,
},
}

pub fn run(
Expand Down Expand Up @@ -81,9 +75,6 @@ async fn run_in_async(
handles.push(handle);
}

// TODO (breathx): fix me ASAP.
// wake_messages(&task_senders, programs).await;

loop {
// Send tasks to process programs in workers, until all queues are empty.

Expand All @@ -101,7 +92,7 @@ async fn run_in_async(
}

for (program_id, journal) in super_journal {
let mut handler = Handler {
let mut handler = JournalHandler {
program_id,
in_block_transitions,
storage: &db,
Expand Down Expand Up @@ -138,16 +129,6 @@ async fn run_task(db: Database, executor: &mut InstanceWrapper, task: Task) {

result_sender.send(journal).unwrap();
}
Task::WakeMessages {
program_id,
state_hash,
result_sender,
} => {
let new_state_hash = executor
.wake_messages(db, program_id, state_hash)
.expect("Some error occurs while waking messages");
result_sender.send(new_state_hash).unwrap();
}
}
}

Expand Down
95 changes: 0 additions & 95 deletions ethexe/processor/src/handling/tasks.rs

This file was deleted.

12 changes: 0 additions & 12 deletions ethexe/processor/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,6 @@ impl InstanceWrapper {
self.call("run", arg.encode())
}

pub fn wake_messages(
&mut self,
db: Database,
program_id: ProgramId,
state_hash: H256,
) -> Result<H256> {
let chain_head = self.chain_head.expect("chain head must be set before wake");
threads::set(db, chain_head, state_hash);

self.call("wake_messages", (program_id, state_hash).encode())
}

fn call<D: Decode>(&mut self, name: &'static str, input: impl AsRef<[u8]>) -> Result<D> {
self.with_host_state(|instance_wrapper| {
let func = instance_wrapper
Expand Down
20 changes: 15 additions & 5 deletions ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,18 @@ impl Processor {
) -> Result<Vec<LocalOutcome>> {
log::debug!("Processing events for {block_hash:?}: {events:#?}");

let header = self.db.block_header(block_hash).ok_or_else(|| {
anyhow::anyhow!("failed to get block header for under-processing block")
})?;

let states = self
.db
.block_start_program_states(block_hash)
.unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?

let mut in_block_transitions = InBlockTransitions::new(states);
let schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?

let mut schedule = self.db.block_start_schedule(block_hash).unwrap_or_default(); // TODO (breathx): shouldn't it be a panic?
let mut in_block_transitions = InBlockTransitions::new(header.height, states, schedule);

// TODO (breathx): handle resulting addresses that were changed (e.g. top up balance wont be dumped as outcome).
for event in events {
Expand All @@ -101,10 +105,10 @@ impl Processor {
}
}

self.run_tasks(block_hash, &mut in_block_transitions, &mut schedule)?;
self.run_schedule(&mut in_block_transitions);
self.run(block_hash, &mut in_block_transitions);

let (transitions, states) = in_block_transitions.finalize();
let (transitions, states, schedule) = in_block_transitions.finalize();

self.db.set_block_end_program_states(block_hash, states);
self.db.set_block_end_schedule(block_hash, schedule);
Expand Down Expand Up @@ -144,13 +148,19 @@ impl OverlaidProcessor {
) -> Result<ReplyInfo> {
self.0.creator.set_chain_head(block_hash);

let header =
self.0.db.block_header(block_hash).ok_or_else(|| {
anyhow::anyhow!("failed to find block header for given block hash")
})?;

let states = self
.0
.db
.block_start_program_states(block_hash)
.unwrap_or_default();

let mut in_block_transitions = InBlockTransitions::new(states);
let mut in_block_transitions =
InBlockTransitions::new(header.height, states, Default::default());

let state_hash = in_block_transitions
.state_of(&program_id)
Expand Down
Loading
Loading