From 49b3dc14c02f81a5d3be5298b0bae4339dc676e5 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 28 Aug 2024 17:57:07 +0400 Subject: [PATCH 01/13] rpc placeholder for calculate reply --- Cargo.lock | 1 + ethexe/cli/src/service.rs | 3 +- ethexe/common/src/db.rs | 1 + ethexe/observer/src/query.rs | 2 ++ ethexe/processor/src/lib.rs | 6 ++++ ethexe/processor/src/tests.rs | 3 +- ethexe/rpc/Cargo.toml | 1 + ethexe/rpc/src/lib.rs | 55 ++++++++++++++++++++++++++--------- 8 files changed, 57 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bad745bd46e..763917298df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5197,6 +5197,7 @@ dependencies = [ "anyhow", "ethexe-common", "ethexe-db", + "ethexe-processor", "futures", "gprimitives", "hex", diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index d2edd57115e..f0ddd315ea1 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -144,7 +144,7 @@ impl Service { let rpc = config .rpc_port - .map(|port| ethexe_rpc::RpcService::new(port, db.clone())); + .map(|port| ethexe_rpc::RpcService::new(port, db.clone(), processor.clone())); Ok(Self { db, @@ -303,6 +303,7 @@ impl Service { height: block_data.block_number.try_into()?, timestamp: block_data.block_timestamp, parent_hash: block_data.parent_hash, + hash: block_data.block_hash, }, ); diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 160132b7268..7dce8a3b658 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -35,6 +35,7 @@ pub struct BlockHeader { pub height: u32, pub timestamp: u64, pub parent_hash: H256, + pub hash: H256, } #[derive(Debug, Clone, Default, Encode, Decode)] diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs index 0b19f922978..0a9c862b96e 100644 --- a/ethexe/observer/src/query.rs +++ b/ethexe/observer/src/query.rs @@ -131,6 +131,7 @@ impl Query { height, timestamp, parent_hash, + hash: block_hash, }; database.set_block_header(block_hash, header.clone()); @@ -328,6 +329,7 @@ impl Query { height, timestamp, parent_hash, + hash: block_hash, }; self.database.set_block_header(block_hash, meta.clone()); diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 141555e7993..f30a7bdd2ab 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -42,6 +42,7 @@ mod run; #[cfg(test)] mod tests; +#[derive(Clone)] pub struct Processor { db: Database, creator: InstanceCreator, @@ -109,6 +110,11 @@ impl Processor { Ok(true) } + pub fn execute_for_reply(&mut self, block_hash: H256) -> Result> { + self.creator.set_chain_head(block_hash); + Ok(Default::default()) + } + pub fn handle_new_program(&mut self, program_id: ProgramId, code_id: CodeId) -> Result<()> { if self.db.original_code(code_id).is_none() { anyhow::bail!("code existence should be checked on smart contract side"); diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index b4b25d09ee5..b139fd25607 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -26,7 +26,7 @@ use utils::*; use wabt::wat2wasm; fn init_new_block(processor: &mut Processor, meta: BlockHeader) -> H256 { - let chain_head = H256::random(); + let chain_head = meta.hash; processor.db.set_block_header(chain_head, meta); processor.creator.set_chain_head(chain_head); chain_head @@ -42,6 +42,7 @@ fn init_new_block_from_parent(processor: &mut Processor, parent_hash: H256) -> H height, timestamp, parent_hash, + hash: H256::random(), }, ); let parent_out_program_hashes = processor diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index 40c566ceb11..00a23e6331b 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -15,6 +15,7 @@ anyhow.workspace = true futures.workspace = true gprimitives.workspace = true ethexe-db.workspace = true +ethexe-processor.workspace = true jsonrpsee = { version = "0.24", features = ["server", "macros"] } tower = { version = "0.4.13", features = ["full"] } hyper = { version = "1.4.1", features = ["server"] } diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 0fa3282a04e..7432110a6fc 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -1,4 +1,5 @@ use ethexe_db::{BlockHeader, BlockMetaStorage, Database}; +use ethexe_processor::Processor; use futures::FutureExt; use gprimitives::H256; use jsonrpsee::{ @@ -8,7 +9,7 @@ use jsonrpsee::{ serve_with_graceful_shutdown, stop_channel, Server, ServerHandle, StopHandle, TowerServiceBuilder, }, - types::{ErrorCode, ErrorObject}, + types::ErrorObject, Methods, }; use std::net::SocketAddr; @@ -25,31 +26,57 @@ struct PerConnection { #[rpc(server)] pub trait RpcApi { #[method(name = "blockHeader")] - async fn block_header(&self, hash: H256) -> RpcResult; + async fn block_header(&self, hash: Option) -> RpcResult; + + #[method(name = "calculateReplyForHandle")] + async fn calculate_reply_for_handle(&self, at: Option) -> RpcResult>; } pub struct RpcModule { db: Database, + processor: Processor, } impl RpcModule { - pub fn new(db: Database) -> Self { - Self { db } + pub fn new(db: Database, processor: Processor) -> Self { + Self { db, processor } + } + + pub fn block_header_at_or_latest(&self, at: impl Into>) -> RpcResult { + if let Some(hash) = at.into() { + self.db + .block_header(hash) + .ok_or_else(|| db_err("Block header for requested hash wasn't found")) + } else { + // TODO (breathx): latest valid hash. + Some(Default::default()).ok_or_else(|| db_err("Latest block header wasn't found")) + } } } #[async_trait] impl RpcApiServer for RpcModule { - async fn block_header(&self, hash: H256) -> RpcResult { - // let db = db.lock().await; - self.db.block_header(hash).ok_or(ErrorObject::borrowed( - ErrorCode::InvalidParams.code(), - "Block not found", - None, - )) + async fn block_header(&self, hash: Option) -> RpcResult { + self.block_header_at_or_latest(hash) + } + + async fn calculate_reply_for_handle(&self, at: Option) -> RpcResult> { + let block_hash = self.block_header_at_or_latest(at)?.hash; + + let mut processor = self.processor.clone(); + + processor.execute_for_reply(block_hash).map_err(runtime_err) } } +fn db_err(err: &'static str) -> ErrorObject<'static> { + ErrorObject::owned(8000, "Database error", Some(err)) +} + +fn runtime_err(err: anyhow::Error) -> ErrorObject<'static> { + ErrorObject::owned(8000, "Runtime error", Some(format!("{err}"))) +} + pub struct RpcConfig { port: u16, db: Database, @@ -57,12 +84,14 @@ pub struct RpcConfig { pub struct RpcService { config: RpcConfig, + processor: Processor, } impl RpcService { - pub fn new(port: u16, db: Database) -> Self { + pub fn new(port: u16, db: Database, processor: Processor) -> Self { Self { config: RpcConfig { port, db }, + processor, } } @@ -71,7 +100,7 @@ impl RpcService { TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], self.config.port))).await?; let service_builder = Server::builder().to_service_builder(); - let module = RpcApiServer::into_rpc(RpcModule::new(self.config.db)); + let module = RpcApiServer::into_rpc(RpcModule::new(self.config.db, self.processor)); let (stop_handle, server_handle) = stop_channel(); From 92498c9b6b5f4c4ae2d8a06171b1c98c6a4a69ad Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 28 Aug 2024 20:15:48 +0400 Subject: [PATCH 02/13] replace latest valid height with header in storage --- ethexe/cli/src/service.rs | 3 +-- ethexe/common/src/db.rs | 5 ++--- ethexe/db/src/database.rs | 11 ++++++----- ethexe/observer/src/query.rs | 21 ++++++++++++--------- ethexe/processor/src/tests.rs | 3 +-- ethexe/rpc/src/lib.rs | 17 +++++++++++------ 6 files changed, 33 insertions(+), 27 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index f0ddd315ea1..441e78cf253 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -285,7 +285,7 @@ impl Service { db.set_block_end_state_is_valid(block_hash, true); let header = db.block_header(block_hash).expect("must be set; qed"); - db.set_latest_valid_block_height(header.height); + db.set_latest_valid_block(block_hash, header); Ok(transition_outcomes) } @@ -303,7 +303,6 @@ impl Service { height: block_data.block_number.try_into()?, timestamp: block_data.block_timestamp, parent_hash: block_data.parent_hash, - hash: block_data.block_hash, }, ); diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 7dce8a3b658..025387bf305 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -35,7 +35,6 @@ pub struct BlockHeader { pub height: u32, pub timestamp: u64, pub parent_hash: H256, - pub hash: H256, } #[derive(Debug, Clone, Default, Encode, Decode)] @@ -72,8 +71,8 @@ pub trait BlockMetaStorage: Send + Sync { fn block_outcome(&self, block_hash: H256) -> Option>; fn set_block_outcome(&self, block_hash: H256, outcome: Vec); - fn latest_valid_block_height(&self) -> Option; - fn set_latest_valid_block_height(&self, block_height: u32); + fn latest_valid_block(&self) -> Option<(H256, BlockHeader)>; + fn set_latest_valid_block(&self, block_hash: H256, header: BlockHeader); } pub trait CodesStorage: Send + Sync { diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index b4d76730aa7..d4af42eac0f 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -252,18 +252,19 @@ impl BlockMetaStorage for Database { ); } - fn latest_valid_block_height(&self) -> Option { + fn latest_valid_block(&self) -> Option<(H256, BlockHeader)> { self.kv .get(&KeyPrefix::LatestValidBlock.one(self.router_address)) - .map(|block_height| { - u32::from_le_bytes(block_height.try_into().expect("must be correct; qed")) + .map(|data| { + <(H256, BlockHeader)>::decode(&mut data.as_slice()) + .expect("Failed to decode data into `(H256, BlockHeader)`") }) } - fn set_latest_valid_block_height(&self, block_height: u32) { + fn set_latest_valid_block(&self, block_hash: H256, header: BlockHeader) { self.kv.put( &KeyPrefix::LatestValidBlock.one(self.router_address), - block_height.to_le_bytes().to_vec(), + (block_hash, header).encode(), ); } } diff --git a/ethexe/observer/src/query.rs b/ethexe/observer/src/query.rs index 0a9c862b96e..a655fdcc5a8 100644 --- a/ethexe/observer/src/query.rs +++ b/ethexe/observer/src/query.rs @@ -71,10 +71,9 @@ impl Query { .set_block_end_program_states(hash, Default::default()); // set latest valid if empty. - if self.database.latest_valid_block_height().is_none() { + if self.database.latest_valid_block().is_none() { let genesis_header = self.get_block_header_meta(hash).await?; - self.database - .set_latest_valid_block_height(genesis_header.height); + self.database.set_latest_valid_block(hash, genesis_header); } Ok(()) @@ -131,7 +130,6 @@ impl Query { height, timestamp, parent_hash, - hash: block_hash, }; database.set_block_header(block_hash, header.clone()); @@ -165,7 +163,8 @@ impl Query { let current_block = self.get_block_header_meta(block_hash).await?; let latest_valid_block_height = self .database - .latest_valid_block_height() + .latest_valid_block() + .map(|(_, header)| header.height) .expect("genesis by default; qed"); if current_block.height >= latest_valid_block_height @@ -201,7 +200,7 @@ impl Query { // Continue loading chain by parent hashes from the current block to the latest valid block. let mut hash = block_hash; - let mut height = current_block.height; + while hash != self.genesis_block_hash { // If the block's end state is valid, set it as the latest valid block if self @@ -209,7 +208,13 @@ impl Query { .block_end_state_is_valid(hash) .unwrap_or(false) { - self.database.set_latest_valid_block_height(height); + let header = match headers_map.get(&hash) { + Some(header) => header.clone(), + None => self.get_block_header_meta(hash).await?, + }; + + self.database.set_latest_valid_block(hash, header); + log::trace!("Nearest valid in db block found: {hash}"); break; } @@ -223,7 +228,6 @@ impl Query { Some(header) => header.parent_hash, None => self.get_block_parent_hash(hash).await?, }; - height -= 1; } let mut actual_commitment_queue: VecDeque = self @@ -329,7 +333,6 @@ impl Query { height, timestamp, parent_hash, - hash: block_hash, }; self.database.set_block_header(block_hash, meta.clone()); diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index b139fd25607..b4b25d09ee5 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -26,7 +26,7 @@ use utils::*; use wabt::wat2wasm; fn init_new_block(processor: &mut Processor, meta: BlockHeader) -> H256 { - let chain_head = meta.hash; + let chain_head = H256::random(); processor.db.set_block_header(chain_head, meta); processor.creator.set_chain_head(chain_head); chain_head @@ -42,7 +42,6 @@ fn init_new_block_from_parent(processor: &mut Processor, parent_hash: H256) -> H height, timestamp, parent_hash, - hash: H256::random(), }, ); let parent_out_program_hashes = processor diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 7432110a6fc..54667b2c468 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -26,7 +26,7 @@ struct PerConnection { #[rpc(server)] pub trait RpcApi { #[method(name = "blockHeader")] - async fn block_header(&self, hash: Option) -> RpcResult; + async fn block_header(&self, hash: Option) -> RpcResult<(H256, BlockHeader)>; #[method(name = "calculateReplyForHandle")] async fn calculate_reply_for_handle(&self, at: Option) -> RpcResult>; @@ -42,26 +42,31 @@ impl RpcModule { Self { db, processor } } - pub fn block_header_at_or_latest(&self, at: impl Into>) -> RpcResult { + pub fn block_header_at_or_latest( + &self, + at: impl Into>, + ) -> RpcResult<(H256, BlockHeader)> { if let Some(hash) = at.into() { self.db .block_header(hash) + .map(|header| (hash, header)) .ok_or_else(|| db_err("Block header for requested hash wasn't found")) } else { - // TODO (breathx): latest valid hash. - Some(Default::default()).ok_or_else(|| db_err("Latest block header wasn't found")) + self.db + .latest_valid_block() + .ok_or_else(|| db_err("Latest block header wasn't found")) } } } #[async_trait] impl RpcApiServer for RpcModule { - async fn block_header(&self, hash: Option) -> RpcResult { + async fn block_header(&self, hash: Option) -> RpcResult<(H256, BlockHeader)> { self.block_header_at_or_latest(hash) } async fn calculate_reply_for_handle(&self, at: Option) -> RpcResult> { - let block_hash = self.block_header_at_or_latest(at)?.hash; + let block_hash = self.block_header_at_or_latest(at)?.0; let mut processor = self.processor.clone(); From 0c883a4b1d1c1f279b3d23aacb51a65733037bd9 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Wed, 28 Aug 2024 20:16:08 +0400 Subject: [PATCH 03/13] impl runtime interface --- ethexe/runtime/src/wasm/api/calculate.rs | 83 ++++++++++++++++++++++++ ethexe/runtime/src/wasm/api/mod.rs | 17 +++++ 2 files changed, 100 insertions(+) create mode 100644 ethexe/runtime/src/wasm/api/calculate.rs diff --git a/ethexe/runtime/src/wasm/api/calculate.rs b/ethexe/runtime/src/wasm/api/calculate.rs new file mode 100644 index 00000000000..3c75bad2d29 --- /dev/null +++ b/ethexe/runtime/src/wasm/api/calculate.rs @@ -0,0 +1,83 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::wasm::{ + interface::database_ri, + storage::{NativeRuntimeInterface, RuntimeInterfaceStorage}, +}; +use alloc::vec::Vec; +use core_processor::{configs::BlockInfo, Ext}; +use ethexe_runtime_common::{ + state::{ActiveProgram, Program, ProgramState, Storage}, + RuntimeInterface, +}; +use gear_core::{code::InstrumentedCode, ids::ProgramId}; +use gprimitives::H256; + +pub fn reply_for_handle( + program_id: ProgramId, + state_root: H256, + instrumented_code: InstrumentedCode, + payload: Vec, +) -> Result, String> { + log::debug!("You're calling 'calculate::reply_for_handle(..)'"); + + let block_info = BlockInfo { + height: database_ri::get_block_height(), + timestamp: database_ri::get_block_timestamp(), + }; + + let ri = NativeRuntimeInterface { + block_info, + storage: RuntimeInterfaceStorage, + }; + + let gas_allowance = 1_000_000_000; + + let ProgramState { + program: + Program::Active(ActiveProgram { + allocations_hash, + memory_infix, + initialized: true, + .. + }), + .. + } = ri.storage().read_state(state_root).unwrap() + else { + return Err(String::from("Program is not active and/or initialized")); + }; + + let allocations = + allocations_hash.with_hash_or_default(|hash| ri.storage().read_allocations(hash)); + + let program_info = Some((program_id, memory_infix)); + + core_processor::informational::execute_for_reply::< + Ext<>::LazyPages>, + String, + >( + String::from("handle"), + instrumented_code, + allocations, + program_info, + payload, + gas_allowance, + block_info, + ) +} diff --git a/ethexe/runtime/src/wasm/api/mod.rs b/ethexe/runtime/src/wasm/api/mod.rs index 2e82667a0ba..c2314dcb25f 100644 --- a/ethexe/runtime/src/wasm/api/mod.rs +++ b/ethexe/runtime/src/wasm/api/mod.rs @@ -19,6 +19,7 @@ use alloc::{boxed::Box, vec::Vec}; use parity_scale_codec::{Decode, Encode}; +mod calculate; mod instrument; mod run; @@ -56,6 +57,22 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { return_val(res) } +#[cfg(target_arch = "wasm32")] +#[no_mangle] +extern "C" fn calculate_reply_for_handle(arg_ptr: i32, arg_len: i32) -> i64 { + _calculate_reply_for_handle(arg_ptr, arg_len) +} + +#[cfg_attr(not(target_arch = "wasm32"), allow(unused))] +fn _calculate_reply_for_handle(arg_ptr: i32, arg_len: i32) -> i64 { + let (program_id, state_root, instrumented_code, payload) = + Decode::decode(&mut get_slice(arg_ptr, arg_len)).unwrap(); + + let res = calculate::reply_for_handle(program_id, state_root, instrumented_code, payload); + + return_val(res) +} + #[cfg(target_arch = "wasm32")] #[no_mangle] extern "C" fn wake_messages(arg_ptr: i32, arg_len: i32) -> i64 { From 03fe66823fb322f5185ff147c327e548a191c013 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Mon, 16 Sep 2024 15:17:46 +0400 Subject: [PATCH 04/13] impl overlay --- ethexe/db/src/database.rs | 15 +++++- ethexe/db/src/lib.rs | 1 + ethexe/db/src/overlay.rs | 105 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 ethexe/db/src/overlay.rs diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index da230f4646a..7c5ff42e78a 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -20,7 +20,10 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use crate::{CASDatabase, KVDatabase}; +use crate::{ + overlay::{CASOverlay, KVOverlay}, + CASDatabase, KVDatabase, +}; use ethexe_common::{ db::{BlockHeader, BlockMetaStorage, CodesStorage}, router::StateTransition, @@ -384,6 +387,16 @@ impl Database { } } + /// # Safety + /// Not ready for using in prod. Intended to be for rpc calls only. + pub unsafe fn overlaid(self) -> Self { + Self { + cas: Box::new(CASOverlay::new(self.cas)), + kv: Box::new(KVOverlay::new(self.kv)), + router_address: self.router_address, + } + } + // TODO: temporary solution for MVP runtime-interfaces db access. pub fn read_by_hash(&self, hash: H256) -> Option> { self.cas.read(&hash) diff --git a/ethexe/db/src/lib.rs b/ethexe/db/src/lib.rs index a5da5d0ff87..7a7d93bc44a 100644 --- a/ethexe/db/src/lib.rs +++ b/ethexe/db/src/lib.rs @@ -23,6 +23,7 @@ use gprimitives::H256; mod database; mod mem; +mod overlay; mod rocks; pub use database::Database; diff --git a/ethexe/db/src/overlay.rs b/ethexe/db/src/overlay.rs new file mode 100644 index 00000000000..58c24315aad --- /dev/null +++ b/ethexe/db/src/overlay.rs @@ -0,0 +1,105 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{CASDatabase, KVDatabase, MemDb}; +use gear_core::ids::hash; +use gprimitives::H256; +use std::collections::HashSet; + +pub struct CASOverlay { + db: Box, + mem: MemDb, +} + +impl CASOverlay { + pub fn new(db: Box) -> Self { + Self { + db, + mem: MemDb::default(), + } + } +} + +impl CASDatabase for CASOverlay { + fn clone_boxed(&self) -> Box { + Box::new(Self { + db: self.db.clone_boxed(), + mem: self.mem.clone(), + }) + } + + fn read(&self, hash: &H256) -> Option> { + self.mem.read(hash).or_else(|| self.db.read(hash)) + } + + fn write_by_hash(&self, hash: &H256, data: &[u8]) { + self.mem.write_by_hash(hash, data) + } +} + +pub struct KVOverlay { + db: Box, + mem: MemDb, +} + +impl KVOverlay { + pub fn new(db: Box) -> Self { + Self { + db, + mem: MemDb::default(), + } + } +} + +impl KVDatabase for KVOverlay { + fn clone_boxed_kv(&self) -> Box { + Box::new(Self { + db: self.db.clone_boxed_kv(), + mem: self.mem.clone(), + }) + } + + fn get(&self, key: &[u8]) -> Option> { + self.mem.get(key).or_else(|| self.db.get(key)) + } + + fn take(&self, _key: &[u8]) -> Option> { + unimplemented!() + } + + fn put(&self, key: &[u8], value: Vec) { + self.mem.put(key, value) + } + + fn iter_prefix<'a>( + &'a self, + prefix: &'a [u8], + ) -> Box, Vec)> + '_> { + let mem_iter = self.mem.iter_prefix(prefix); + let db_iter = self.db.iter_prefix(prefix); + + let full_iter = mem_iter.chain(db_iter); + + let mut known_keys = HashSet::new(); + + let filtered_iter = + full_iter.filter_map(move |(k, v)| known_keys.insert(hash(&k)).then_some((k, v))); + + Box::new(filtered_iter) + } +} From 7033fa5d812f2f207d3d7b3fc812b2c763bcecd4 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Mon, 16 Sep 2024 15:53:36 +0400 Subject: [PATCH 05/13] remove db from intance wrapper/creator --- ethexe/processor/src/host/mod.rs | 26 +++++++++--------------- ethexe/processor/src/lib.rs | 4 ++-- ethexe/processor/src/run.rs | 33 ++++++++++++++++-------------- ethexe/processor/src/tests.rs | 35 +++++++++++++++++++++++++++----- 4 files changed, 60 insertions(+), 38 deletions(-) diff --git a/ethexe/processor/src/host/mod.rs b/ethexe/processor/src/host/mod.rs index 1d3d8fb1315..846ada121cc 100644 --- a/ethexe/processor/src/host/mod.rs +++ b/ethexe/processor/src/host/mod.rs @@ -43,7 +43,6 @@ pub type Store = wasmtime::Store; #[derive(Clone)] pub(crate) struct InstanceCreator { - db: Database, engine: wasmtime::Engine, instance_pre: Arc>, @@ -54,7 +53,7 @@ pub(crate) struct InstanceCreator { } impl InstanceCreator { - pub fn new(db: Database, runtime: Vec) -> Result { + pub fn new(runtime: Vec) -> Result { gear_runtime_interface::sandbox_init(); let engine = wasmtime::Engine::default(); @@ -72,7 +71,6 @@ impl InstanceCreator { let instance_pre = Arc::new(instance_pre); Ok(Self { - db, engine, instance_pre, chain_head: None, @@ -87,7 +85,6 @@ impl InstanceCreator { let mut instance_wrapper = InstanceWrapper { instance, store, - db: self.db().clone(), chain_head: self.chain_head, }; @@ -100,10 +97,6 @@ impl InstanceCreator { Ok(instance_wrapper) } - pub fn db(&self) -> &Database { - &self.db - } - pub fn set_chain_head(&mut self, chain_head: H256) { self.chain_head = Some(chain_head); } @@ -112,15 +105,10 @@ impl InstanceCreator { pub(crate) struct InstanceWrapper { instance: wasmtime::Instance, store: Store, - db: Database, chain_head: Option, } impl InstanceWrapper { - pub fn db(&self) -> &Database { - &self.db - } - #[allow(unused)] pub fn data(&self) -> &StoreData { self.store.data() @@ -139,13 +127,14 @@ impl InstanceWrapper { pub fn run( &mut self, + db: Database, program_id: ProgramId, original_code_id: CodeId, state_hash: H256, maybe_instrumented_code: Option, ) -> Result> { let chain_head = self.chain_head.expect("chain head must be set before run"); - threads::set(self.db.clone(), chain_head, state_hash); + threads::set(db, chain_head, state_hash); let arg = ( program_id, @@ -157,9 +146,14 @@ impl InstanceWrapper { self.call("run", arg.encode()) } - pub fn wake_messages(&mut self, program_id: ProgramId, state_hash: H256) -> Result { + pub fn wake_messages( + &mut self, + db: Database, + program_id: ProgramId, + state_hash: H256, + ) -> Result { let chain_head = self.chain_head.expect("chain head must be set before wake"); - threads::set(self.db.clone(), chain_head, state_hash); + threads::set(db, chain_head, state_hash); self.call("wake_messages", (program_id, state_hash).encode()) } diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 578f79a0ab1..60f206dc2cd 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -64,7 +64,7 @@ pub enum LocalOutcome { /// Maybe impl `struct EventProcessor`. impl Processor { pub fn new(db: Database) -> Result { - let creator = InstanceCreator::new(db.clone(), host::runtime())?; + let creator = InstanceCreator::new(host::runtime())?; Ok(Self { db, creator }) } @@ -212,7 +212,7 @@ impl Processor { log::debug!("{programs:?}"); - let messages_and_outcomes = run::run(8, self.creator.clone(), programs); + let messages_and_outcomes = run::run(8, self.db.clone(), self.creator.clone(), programs); Ok(messages_and_outcomes.1) } diff --git a/ethexe/processor/src/run.rs b/ethexe/processor/src/run.rs index 325d6742ab5..c54cc32809b 100644 --- a/ethexe/processor/src/run.rs +++ b/ethexe/processor/src/run.rs @@ -22,7 +22,7 @@ use crate::{ }; use core_processor::common::JournalNote; use ethexe_common::router::{OutgoingMessage, StateTransition}; -use ethexe_db::CodesStorage; +use ethexe_db::{CodesStorage, Database}; use ethexe_runtime_common::Handler; use gear_core::{ ids::{ActorId, ProgramId}, @@ -47,6 +47,7 @@ enum Task { pub fn run( threads_amount: usize, + db: Database, instance_creator: InstanceCreator, programs: &mut BTreeMap, ) -> (Vec, Vec) { @@ -57,13 +58,14 @@ pub fn run( .build() .unwrap(); - rt.block_on(async { run_in_async(instance_creator, programs).await }) + rt.block_on(async { run_in_async(db, instance_creator, programs).await }) }) } // TODO: Returning Vec is a temporary solution. // In future need to send all messages to users and all state hashes changes to sequencer. async fn run_in_async( + db: Database, instance_creator: InstanceCreator, programs: &mut BTreeMap, ) -> (Vec, Vec) { @@ -79,7 +81,12 @@ async fn run_in_async( for id in 0..num_workers { let (task_sender, task_receiver) = mpsc::channel(100); task_senders.push(task_sender); - let handle = tokio::spawn(worker(id, instance_creator.clone(), task_receiver)); + let handle = tokio::spawn(worker( + id, + db.clone(), + instance_creator.clone(), + task_receiver, + )); handles.push(handle); } @@ -105,7 +112,7 @@ async fn run_in_async( let mut handler = Handler { program_id, program_states: programs, - storage: instance_creator.db(), + storage: &db, block_info: Default::default(), results: Default::default(), to_users_messages: Default::default(), @@ -175,24 +182,19 @@ async fn run_in_async( (to_users_messages, outcomes) } -async fn run_task(executor: &mut InstanceWrapper, task: Task) { +async fn run_task(db: Database, executor: &mut InstanceWrapper, task: Task) { match task { Task::Run { program_id, state_hash, result_sender, } => { - let code_id = executor - .db() - .program_code_id(program_id) - .expect("Code ID must be set"); + let code_id = db.program_code_id(program_id).expect("Code ID must be set"); - let instrumented_code = executor - .db() - .instrumented_code(ethexe_runtime::VERSION, code_id); + let instrumented_code = db.instrumented_code(ethexe_runtime::VERSION, code_id); let journal = executor - .run(program_id, code_id, state_hash, instrumented_code) + .run(db, program_id, code_id, state_hash, instrumented_code) .expect("Some error occurs while running program in instance"); result_sender.send(journal).unwrap(); @@ -203,7 +205,7 @@ async fn run_task(executor: &mut InstanceWrapper, task: Task) { result_sender, } => { let new_state_hash = executor - .wake_messages(program_id, state_hash) + .wake_messages(db, program_id, state_hash) .expect("Some error occurs while waking messages"); result_sender.send(new_state_hash).unwrap(); } @@ -212,6 +214,7 @@ async fn run_task(executor: &mut InstanceWrapper, task: Task) { async fn worker( id: usize, + db: Database, instance_creator: InstanceCreator, mut task_receiver: mpsc::Receiver, ) { @@ -222,7 +225,7 @@ async fn worker( .expect("Failed to instantiate executor"); while let Some(task) = task_receiver.recv().await { - run_task(&mut executor, task).await; + run_task(db.clone(), &mut executor, task).await; } } diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index c7fa00d82a3..49258d70168 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -250,7 +250,12 @@ fn ping_pong() { let mut programs = BTreeMap::from_iter([(program_id, state_hash)]); - let (to_users, _) = run::run(8, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + 8, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), 2); @@ -376,7 +381,12 @@ fn async_and_ping() { let mut programs = BTreeMap::from_iter([(ping_id, ping_state_hash), (async_id, async_state_hash)]); - let (to_users, _) = run::run(8, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + 8, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), 3); @@ -457,7 +467,12 @@ fn many_waits() { programs.insert(program_id, state_hash); } - let (to_users, _) = run::run(threads_amount, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + threads_amount, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), amount as usize); for (_pid, state_hash) in programs.iter_mut() { @@ -468,7 +483,12 @@ fn many_waits() { *state_hash = new_state_hash; } - let (to_users, _) = run::run(threads_amount, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + threads_amount, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), 0); init_new_block( @@ -480,7 +500,12 @@ fn many_waits() { }, ); - let (to_users, _) = run::run(threads_amount, processor.creator.clone(), &mut programs); + let (to_users, _) = run::run( + threads_amount, + processor.db.clone(), + processor.creator.clone(), + &mut programs, + ); assert_eq!(to_users.len(), amount as usize); From f47e713b7013f10a9292c1c4205624eb0cd577e7 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Mon, 16 Sep 2024 16:00:37 +0400 Subject: [PATCH 06/13] remove processor from rpc server --- ethexe/cli/src/service.rs | 2 +- ethexe/rpc/src/lib.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index 2ea0de26ef8..669cfcda158 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -179,7 +179,7 @@ impl Service { let rpc = config .rpc_port - .map(|port| ethexe_rpc::RpcService::new(port, db.clone(), processor.clone())); + .map(|port| ethexe_rpc::RpcService::new(port, db.clone())); Ok(Self { db, diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 54667b2c468..e4c516382c7 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -34,12 +34,11 @@ pub trait RpcApi { pub struct RpcModule { db: Database, - processor: Processor, } impl RpcModule { - pub fn new(db: Database, processor: Processor) -> Self { - Self { db, processor } + pub fn new(db: Database) -> Self { + Self { db } } pub fn block_header_at_or_latest( @@ -68,7 +67,10 @@ impl RpcApiServer for RpcModule { async fn calculate_reply_for_handle(&self, at: Option) -> RpcResult> { let block_hash = self.block_header_at_or_latest(at)?.0; - let mut processor = self.processor.clone(); + let db_overlay = unsafe { self.db.clone().overlaid() }; + + // TODO (breathx): optimize here instantiation if matches actual runtime. + let mut processor = Processor::new(db_overlay).map_err(runtime_err)?; processor.execute_for_reply(block_hash).map_err(runtime_err) } @@ -89,14 +91,12 @@ pub struct RpcConfig { pub struct RpcService { config: RpcConfig, - processor: Processor, } impl RpcService { - pub fn new(port: u16, db: Database, processor: Processor) -> Self { + pub fn new(port: u16, db: Database) -> Self { Self { config: RpcConfig { port, db }, - processor, } } @@ -105,7 +105,7 @@ impl RpcService { TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], self.config.port))).await?; let service_builder = Server::builder().to_service_builder(); - let module = RpcApiServer::into_rpc(RpcModule::new(self.config.db, self.processor)); + let module = RpcApiServer::into_rpc(RpcModule::new(self.config.db)); let (stop_handle, server_handle) = stop_channel(); From 7fb7eb18107f1e4ae2e8eb17a54f043f203ba5b4 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Mon, 16 Sep 2024 16:34:49 +0400 Subject: [PATCH 07/13] refine order of calls to processor; impl OverlaidProcessor --- ethexe/processor/src/lib.rs | 23 +++++++++++++++++------ ethexe/rpc/src/lib.rs | 28 +++++++++++++++++++++------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 60f206dc2cd..1d8dcc3362e 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -31,7 +31,7 @@ use gear_core::{ ids::{prelude::CodeIdExt, ProgramId}, message::{DispatchKind, Payload}, }; -use gprimitives::{CodeId, H256}; +use gprimitives::{ActorId, CodeId, H256}; use host::InstanceCreator; use parity_scale_codec::{Decode, Encode}; use std::collections::{BTreeMap, VecDeque}; @@ -48,6 +48,16 @@ pub struct Processor { creator: InstanceCreator, } +#[derive(Clone)] +pub struct OverlaidProcessor(Processor); + +impl OverlaidProcessor { + pub fn execute_for_reply(&mut self, block_hash: H256, _program_id: ActorId) -> Result> { + self.0.creator.set_chain_head(block_hash); + Ok(Default::default()) + } +} + /// Local changes that can be committed to the network or local signer. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum LocalOutcome { @@ -68,6 +78,12 @@ impl Processor { Ok(Self { db, creator }) } + pub fn overlaid(mut self) -> OverlaidProcessor { + self.db = unsafe { self.db.overlaid() }; + + OverlaidProcessor(self) + } + /// Returns some CodeId in case of settlement and new code accepting. pub fn handle_new_code(&mut self, original_code: impl AsRef<[u8]>) -> Result> { let mut executor = self.creator.instantiate()?; @@ -110,11 +126,6 @@ impl Processor { Ok(true) } - pub fn execute_for_reply(&mut self, block_hash: H256) -> Result> { - self.creator.set_chain_head(block_hash); - Ok(Default::default()) - } - pub fn handle_new_program(&mut self, program_id: ProgramId, code_id: CodeId) -> Result<()> { if self.db.original_code(code_id).is_none() { anyhow::bail!("code existence should be checked on smart contract side"); diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index e4c516382c7..9b7b28c52a0 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -1,7 +1,7 @@ use ethexe_db::{BlockHeader, BlockMetaStorage, Database}; use ethexe_processor::Processor; use futures::FutureExt; -use gprimitives::H256; +use gprimitives::{ActorId, H256}; use jsonrpsee::{ core::{async_trait, RpcResult}, proc_macros::rpc, @@ -29,7 +29,11 @@ pub trait RpcApi { async fn block_header(&self, hash: Option) -> RpcResult<(H256, BlockHeader)>; #[method(name = "calculateReplyForHandle")] - async fn calculate_reply_for_handle(&self, at: Option) -> RpcResult>; + async fn calculate_reply_for_handle( + &self, + at: Option, + program_id: ActorId, + ) -> RpcResult>; } pub struct RpcModule { @@ -64,15 +68,21 @@ impl RpcApiServer for RpcModule { self.block_header_at_or_latest(hash) } - async fn calculate_reply_for_handle(&self, at: Option) -> RpcResult> { + async fn calculate_reply_for_handle( + &self, + at: Option, + program_id: ActorId, + ) -> RpcResult> { let block_hash = self.block_header_at_or_latest(at)?.0; - let db_overlay = unsafe { self.db.clone().overlaid() }; - // TODO (breathx): optimize here instantiation if matches actual runtime. - let mut processor = Processor::new(db_overlay).map_err(runtime_err)?; + let mut overlaid_processor = Processor::new(self.db.clone()) + .map_err(|_| internal())? + .overlaid(); - processor.execute_for_reply(block_hash).map_err(runtime_err) + overlaid_processor + .execute_for_reply(block_hash, program_id) + .map_err(runtime_err) } } @@ -84,6 +94,10 @@ fn runtime_err(err: anyhow::Error) -> ErrorObject<'static> { ErrorObject::owned(8000, "Runtime error", Some(format!("{err}"))) } +fn internal() -> ErrorObject<'static> { + ErrorObject::owned(8000, "Internal error", None::<&str>) +} + pub struct RpcConfig { port: u16, db: Database, From c1f0e261ef1683235baaeb7f0492fa6c2c2bec1c Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Mon, 16 Sep 2024 17:10:22 +0400 Subject: [PATCH 08/13] impl logic for execute_for_reply() --- ethexe/processor/src/lib.rs | 57 ++++++++++++++++++++++++++++++++++--- ethexe/rpc/src/lib.rs | 15 +++++++--- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 1d8dcc3362e..743fdd94642 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -31,7 +31,7 @@ use gear_core::{ ids::{prelude::CodeIdExt, ProgramId}, message::{DispatchKind, Payload}, }; -use gprimitives::{ActorId, CodeId, H256}; +use gprimitives::{ActorId, CodeId, MessageId, H256}; use host::InstanceCreator; use parity_scale_codec::{Decode, Encode}; use std::collections::{BTreeMap, VecDeque}; @@ -52,9 +52,58 @@ pub struct Processor { pub struct OverlaidProcessor(Processor); impl OverlaidProcessor { - pub fn execute_for_reply(&mut self, block_hash: H256, _program_id: ActorId) -> Result> { - self.0.creator.set_chain_head(block_hash); - Ok(Default::default()) + // TODO (breathx): optimize for one single program. + pub fn execute_for_reply( + &mut self, + block_hash: H256, + source: ActorId, + program_id: ActorId, + payload: Vec, + value: u128, + ) -> Result> { + let mut states = self + .0 + .db + .block_start_program_states(block_hash) + .unwrap_or_default(); + + let Some(&state_hash) = states.get(&program_id) else { + return Err(anyhow::anyhow!("unknown program at specified block hash")); + }; + + let state = + self.0.db.read_state(state_hash).ok_or_else(|| { + anyhow::anyhow!("unreachable: state partially presents in storage") + })?; + + anyhow::ensure!( + !state.requires_init_message(), + "program isn't yet initialized" + ); + + self.0.handle_mirror_event( + &mut states, + program_id, + MirrorEvent::MessageQueueingRequested { + id: MessageId::zero(), + source, + payload, + value, + }, + )?; + + let (messages, _) = run::run(8, self.0.db.clone(), self.0.creator.clone(), &mut states); + + let res = messages + .into_iter() + .find_map(|v| { + v.reply_details().and_then(|d| { + (d.to_message_id() == MessageId::zero()).then_some(v.into_parts().3.into_vec()) + }) + }) + .ok_or_else(|| anyhow::anyhow!("reply wasn't found"))?; + + Ok(res) } } diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 9b7b28c52a0..d363eac7b46 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -32,7 +32,10 @@ pub trait RpcApi { async fn calculate_reply_for_handle( &self, at: Option, + source: ActorId, program_id: ActorId, + payload: Vec, + value: u128, ) -> RpcResult>; } @@ -71,17 +74,21 @@ impl RpcApiServer for RpcModule { async fn calculate_reply_for_handle( &self, at: Option, + source: ActorId, program_id: ActorId, + payload: Vec, + value: u128, ) -> RpcResult> { let block_hash = self.block_header_at_or_latest(at)?.0; + // TODO (breathx): spawn in a new thread and catch panics. (?) Generally catch runtime panics (?). // TODO (breathx): optimize here instantiation if matches actual runtime. - let mut overlaid_processor = Processor::new(self.db.clone()) - .map_err(|_| internal())? - .overlaid(); + let processor = Processor::new(self.db.clone()).map_err(|_| internal())?; + + let mut overlaid_processor = processor.overlaid(); overlaid_processor - .execute_for_reply(block_hash, program_id) + .execute_for_reply(block_hash, source, program_id, payload, value) .map_err(runtime_err) } } From 5d98be6960d7dad09a30e4043fcb198d35e90b29 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Mon, 16 Sep 2024 17:14:52 +0400 Subject: [PATCH 09/13] remove unnecessary api --- ethexe/runtime/src/wasm/api/calculate.rs | 83 ------------------------ ethexe/runtime/src/wasm/api/mod.rs | 17 ----- 2 files changed, 100 deletions(-) delete mode 100644 ethexe/runtime/src/wasm/api/calculate.rs diff --git a/ethexe/runtime/src/wasm/api/calculate.rs b/ethexe/runtime/src/wasm/api/calculate.rs deleted file mode 100644 index 3c75bad2d29..00000000000 --- a/ethexe/runtime/src/wasm/api/calculate.rs +++ /dev/null @@ -1,83 +0,0 @@ -// This file is part of Gear. -// -// Copyright (C) 2024 Gear Technologies Inc. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use crate::wasm::{ - interface::database_ri, - storage::{NativeRuntimeInterface, RuntimeInterfaceStorage}, -}; -use alloc::vec::Vec; -use core_processor::{configs::BlockInfo, Ext}; -use ethexe_runtime_common::{ - state::{ActiveProgram, Program, ProgramState, Storage}, - RuntimeInterface, -}; -use gear_core::{code::InstrumentedCode, ids::ProgramId}; -use gprimitives::H256; - -pub fn reply_for_handle( - program_id: ProgramId, - state_root: H256, - instrumented_code: InstrumentedCode, - payload: Vec, -) -> Result, String> { - log::debug!("You're calling 'calculate::reply_for_handle(..)'"); - - let block_info = BlockInfo { - height: database_ri::get_block_height(), - timestamp: database_ri::get_block_timestamp(), - }; - - let ri = NativeRuntimeInterface { - block_info, - storage: RuntimeInterfaceStorage, - }; - - let gas_allowance = 1_000_000_000; - - let ProgramState { - program: - Program::Active(ActiveProgram { - allocations_hash, - memory_infix, - initialized: true, - .. - }), - .. - } = ri.storage().read_state(state_root).unwrap() - else { - return Err(String::from("Program is not active and/or initialized")); - }; - - let allocations = - allocations_hash.with_hash_or_default(|hash| ri.storage().read_allocations(hash)); - - let program_info = Some((program_id, memory_infix)); - - core_processor::informational::execute_for_reply::< - Ext<>::LazyPages>, - String, - >( - String::from("handle"), - instrumented_code, - allocations, - program_info, - payload, - gas_allowance, - block_info, - ) -} diff --git a/ethexe/runtime/src/wasm/api/mod.rs b/ethexe/runtime/src/wasm/api/mod.rs index c2314dcb25f..2e82667a0ba 100644 --- a/ethexe/runtime/src/wasm/api/mod.rs +++ b/ethexe/runtime/src/wasm/api/mod.rs @@ -19,7 +19,6 @@ use alloc::{boxed::Box, vec::Vec}; use parity_scale_codec::{Decode, Encode}; -mod calculate; mod instrument; mod run; @@ -57,22 +56,6 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { return_val(res) } -#[cfg(target_arch = "wasm32")] -#[no_mangle] -extern "C" fn calculate_reply_for_handle(arg_ptr: i32, arg_len: i32) -> i64 { - _calculate_reply_for_handle(arg_ptr, arg_len) -} - -#[cfg_attr(not(target_arch = "wasm32"), allow(unused))] -fn _calculate_reply_for_handle(arg_ptr: i32, arg_len: i32) -> i64 { - let (program_id, state_root, instrumented_code, payload) = - Decode::decode(&mut get_slice(arg_ptr, arg_len)).unwrap(); - - let res = calculate::reply_for_handle(program_id, state_root, instrumented_code, payload); - - return_val(res) -} - #[cfg(target_arch = "wasm32")] #[no_mangle] extern "C" fn wake_messages(arg_ptr: i32, arg_len: i32) -> i64 { From 987ad544244fda969e8ff35c50faa8fc601e63f6 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Tue, 17 Sep 2024 18:36:01 +0400 Subject: [PATCH 10/13] review remarks: set chain head for execute for reply --- ethexe/processor/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 743fdd94642..b5d5414b3ff 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -61,6 +61,8 @@ impl OverlaidProcessor { payload: Vec, value: u128, ) -> Result> { + self.0.creator.set_chain_head(block_hash); + let mut states = self .0 .db From d7180b97de3a9def7a04f8005cb9a27ee972ed22 Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Tue, 17 Sep 2024 18:37:52 +0400 Subject: [PATCH 11/13] serialize reply info payload bytes as hex --- Cargo.lock | 1 + Cargo.toml | 1 + core/Cargo.toml | 3 ++- core/src/message/mod.rs | 1 + 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 94f432f30f8..7d7c64deafc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6534,6 +6534,7 @@ dependencies = [ "gwasm-instrument", "hashbrown 0.14.5", "hex", + "impl-serde", "log", "num-traits", "numerated", diff --git a/Cargo.toml b/Cargo.toml index a12c7c8740d..e5d6d7ab9fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,6 +143,7 @@ hashbrown = "0.14.5" hex = { version = "0.4.3", default-features = false } hex-literal = "0.4.1" impl-trait-for-tuples = "0.2.2" +impl-serde = "0.4.0" jsonrpsee = { version = "^0.16" } libc = { version = "0.2", default-features = false } log = { version = "0.4.22", default-features = false } diff --git a/core/Cargo.toml b/core/Cargo.toml index cf07180c2ed..d38b4e2a262 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -38,6 +38,7 @@ primitive-types = { workspace = true, features = ["scale-info"] } # Optional dependencies serde = { workspace = true, features = ["derive"], optional = true } +impl-serde = { workspace = true, optional = true } [dev-dependencies] wabt.workspace = true @@ -49,4 +50,4 @@ numerated = { workspace = true, features = ["mock"] } [features] default = [] strict = [] -std = ["serde/std", "wasmparser/std", "gear-core-errors/serde"] +std = ["serde/std", "dep:impl-serde", "wasmparser/std", "gear-core-errors/serde"] diff --git a/core/src/message/mod.rs b/core/src/message/mod.rs index dd1285f8760..52d40f28225 100644 --- a/core/src/message/mod.rs +++ b/core/src/message/mod.rs @@ -239,6 +239,7 @@ pub trait Packet { #[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct ReplyInfo { /// Payload of the reply. + #[cfg_attr(feature = "std", serde(with = "impl_serde::serialize"))] pub payload: Vec, /// Value sent with reply. pub value: u128, From c251545b4ca0f5f8cae537cd2e5e161e392c469c Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Tue, 17 Sep 2024 18:50:05 +0400 Subject: [PATCH 12/13] review remarks: change inputs/outputs to properly serializible; return ReplyInfo from RPC call --- Cargo.lock | 2 ++ ethexe/processor/src/lib.rs | 14 +++++++++++--- ethexe/rpc/Cargo.toml | 2 ++ ethexe/rpc/src/lib.rs | 28 ++++++++++++++++++---------- 4 files changed, 33 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d7c64deafc..7cf2497d903 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5278,12 +5278,14 @@ dependencies = [ "ethexe-db", "ethexe-processor", "futures", + "gear-core", "gprimitives", "hex", "hyper 1.4.1", "jsonrpsee 0.24.0", "log", "parity-scale-codec", + "sp-core", "tokio", "tower", ] diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index b5d5414b3ff..105d54c03b2 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -29,7 +29,7 @@ use ethexe_db::{BlockMetaStorage, CodesStorage, Database}; use ethexe_runtime_common::state::{Dispatch, HashAndLen, MaybeHash, Storage}; use gear_core::{ ids::{prelude::CodeIdExt, ProgramId}, - message::{DispatchKind, Payload}, + message::{DispatchKind, Payload, ReplyInfo}, }; use gprimitives::{ActorId, CodeId, MessageId, H256}; use host::InstanceCreator; @@ -60,7 +60,7 @@ impl OverlaidProcessor { program_id: ActorId, payload: Vec, value: u128, - ) -> Result> { + ) -> Result { self.0.creator.set_chain_head(block_hash); let mut states = self @@ -100,7 +100,15 @@ impl OverlaidProcessor { .into_iter() .find_map(|v| { v.reply_details().and_then(|d| { - (d.to_message_id() == MessageId::zero()).then_some(v.into_parts().3.into_vec()) + (d.to_message_id() == MessageId::zero()).then(|| { + let parts = v.into_parts(); + + ReplyInfo { + payload: parts.3.into_vec(), + value: parts.5, + code: d.to_reply_code(), + } + }) }) }) .ok_or_else(|| anyhow::anyhow!("reply wasn't found"))?; diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index 00a23e6331b..94a8625296c 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -23,3 +23,5 @@ log.workspace = true parity-scale-codec.workspace = true hex.workspace = true ethexe-common.workspace = true +sp-core = { workspace = true, features = ["serde"] } +gear-core = { workspace = true, features = ["std"] } diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index d363eac7b46..ea91cdd8955 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -1,7 +1,8 @@ use ethexe_db::{BlockHeader, BlockMetaStorage, Database}; use ethexe_processor::Processor; use futures::FutureExt; -use gprimitives::{ActorId, H256}; +use gear_core::message::ReplyInfo; +use gprimitives::{H160, H256}; use jsonrpsee::{ core::{async_trait, RpcResult}, proc_macros::rpc, @@ -12,6 +13,7 @@ use jsonrpsee::{ types::ErrorObject, Methods, }; +use sp_core::Bytes; use std::net::SocketAddr; use tokio::net::TcpListener; use tower::Service; @@ -32,11 +34,11 @@ pub trait RpcApi { async fn calculate_reply_for_handle( &self, at: Option, - source: ActorId, - program_id: ActorId, - payload: Vec, + source: H160, + program_id: H160, + payload: Bytes, value: u128, - ) -> RpcResult>; + ) -> RpcResult; } pub struct RpcModule { @@ -74,11 +76,11 @@ impl RpcApiServer for RpcModule { async fn calculate_reply_for_handle( &self, at: Option, - source: ActorId, - program_id: ActorId, - payload: Vec, + source: H160, + program_id: H160, + payload: Bytes, value: u128, - ) -> RpcResult> { + ) -> RpcResult { let block_hash = self.block_header_at_or_latest(at)?.0; // TODO (breathx): spawn in a new thread and catch panics. (?) Generally catch runtime panics (?). @@ -88,7 +90,13 @@ impl RpcApiServer for RpcModule { let mut overlaid_processor = processor.overlaid(); overlaid_processor - .execute_for_reply(block_hash, source, program_id, payload, value) + .execute_for_reply( + block_hash, + source.into(), + program_id.into(), + payload.0, + value, + ) .map_err(runtime_err) } } From 1eb8fd412e934227718926591ca4db45bd336c5f Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Mon, 23 Sep 2024 11:45:59 +0400 Subject: [PATCH 13/13] review remarks: extend local naming --- ethexe/processor/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 105d54c03b2..73672b3cec8 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -98,15 +98,15 @@ impl OverlaidProcessor { let res = messages .into_iter() - .find_map(|v| { - v.reply_details().and_then(|d| { - (d.to_message_id() == MessageId::zero()).then(|| { - let parts = v.into_parts(); + .find_map(|message| { + message.reply_details().and_then(|details| { + (details.to_message_id() == MessageId::zero()).then(|| { + let parts = message.into_parts(); ReplyInfo { payload: parts.3.into_vec(), value: parts.5, - code: d.to_reply_code(), + code: details.to_reply_code(), } }) })