diff --git a/ethexe/cli/src/args.rs b/ethexe/cli/src/args.rs index 573f67cd631..2870dbfed76 100644 --- a/ethexe/cli/src/args.rs +++ b/ethexe/cli/src/args.rs @@ -80,6 +80,11 @@ pub struct Args { #[arg(long = "max-depth")] pub max_commitment_depth: Option, + /// Block time in seconds (approximate). + /// Ethexe uses it to estimate inner timeouts. + #[arg(long, default_value = "12")] + pub block_time: u64, + /// Run a temporary node. /// /// A temporary directory will be created to store the configuration and will be deleted @@ -96,7 +101,7 @@ pub struct Args { #[allow(missing_docs)] #[clap(flatten)] - pub prometheus_params: PrometheusParams, + pub prometheus_params: Option, #[command(subcommand)] pub extra_command: Option, diff --git a/ethexe/cli/src/config.rs b/ethexe/cli/src/config.rs index 28b90e7b6ec..257a3648d33 100644 --- a/ethexe/cli/src/config.rs +++ b/ethexe/cli/src/config.rs @@ -25,7 +25,7 @@ use directories::ProjectDirs; use ethexe_network::NetworkConfiguration; use ethexe_prometheus_endpoint::Registry; use ethexe_signer::PublicKey; -use std::{iter, net::SocketAddr, path::PathBuf}; +use std::{iter, net::SocketAddr, path::PathBuf, time::Duration}; use tempfile::TempDir; const DEFAULT_PROMETHEUS_PORT: u16 = 9635; @@ -79,9 +79,12 @@ pub struct Config { /// Address of Ethereum Router contract pub ethereum_router_address: String, - // Max depth to discover last commitment. + /// Max depth to discover last commitment. pub max_commitment_depth: u32, + /// Block production time. + pub block_time: Duration, + /// Network path pub network_path: PathBuf, @@ -159,10 +162,11 @@ impl TryFrom for Config { .ethereum_router_address .unwrap_or(chain_spec.ethereum_router_address), max_commitment_depth: args.max_commitment_depth.unwrap_or(1000), + block_time: Duration::from_secs(args.block_time), net_config, - prometheus_config: args - .prometheus_params - .prometheus_config(DEFAULT_PROMETHEUS_PORT, "ethexe-dev".to_string()), + prometheus_config: args.prometheus_params.and_then(|params| { + params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "ethexe-dev".to_string()) + }), database_path: base_path.join("db"), network_path: base_path.join("net"), key_path: base_path.join("key"), diff --git a/ethexe/cli/src/service.rs b/ethexe/cli/src/service.rs index c4e0c5e2892..3b767397e62 100644 --- a/ethexe/cli/src/service.rs +++ b/ethexe/cli/src/service.rs @@ -32,7 +32,7 @@ use ethexe_validator::Commitment; use futures::{future, stream::StreamExt, FutureExt}; use gprimitives::H256; use parity_scale_codec::Decode; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use tokio::time; /// ethexe service. @@ -47,6 +47,7 @@ pub struct Service { validator: Option, metrics_service: Option, rpc: ethexe_rpc::RpcService, + block_time: Duration, } async fn maybe_sleep(maybe_timer: &mut Option) { @@ -66,6 +67,7 @@ impl Service { ethexe_observer::ConsensusLayerBlobReader::new( &config.ethereum_rpc, &config.ethereum_beacon_rpc, + config.block_time, ) .await?, ); @@ -148,6 +150,7 @@ impl Service { validator, metrics_service, rpc, + block_time: config.block_time, }) } @@ -164,6 +167,7 @@ impl Service { validator: Option, metrics_service: Option, rpc: ethexe_rpc::RpcService, + block_time: Duration, ) -> Self { Self { db, @@ -176,6 +180,7 @@ impl Service { validator, metrics_service, rpc, + block_time, } } @@ -366,6 +371,7 @@ impl Service { mut validator, metrics_service, rpc, + block_time, } = self; let network_service = network.service().clone(); @@ -440,7 +446,7 @@ impl Service { } } - delay = Some(tokio::time::sleep(std::time::Duration::from_secs(3))); + delay = Some(tokio::time::sleep(block_time / 4)); } message = gossip_stream.next() => { if let Some(message) = message { @@ -486,7 +492,10 @@ impl Service { mod tests { use super::Service; use crate::config::{Config, PrometheusConfig}; - use std::net::{Ipv4Addr, SocketAddr}; + use std::{ + net::{Ipv4Addr, SocketAddr}, + time::Duration, + }; #[tokio::test] async fn basics() { @@ -496,6 +505,7 @@ mod tests { ethereum_beacon_rpc: "http://localhost:5052".into(), ethereum_router_address: "0x05069E9045Ca0D2B72840c6A21C7bE588E02089A".into(), max_commitment_depth: 1000, + block_time: Duration::from_secs(1), key_path: "/tmp/key".into(), network_path: "/tmp/net".into(), net_config: ethexe_network::NetworkConfiguration::new_local(), diff --git a/ethexe/cli/src/tests.rs b/ethexe/cli/src/tests.rs index 4956715c8c3..faefcde1925 100644 --- a/ethexe/cli/src/tests.rs +++ b/ethexe/cli/src/tests.rs @@ -32,7 +32,7 @@ use ethexe_validator::Validator; use futures::StreamExt; use gear_core::ids::prelude::*; use gprimitives::{ActorId, CodeId, H256}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use tokio::{ sync::mpsc::{self, Receiver}, task::{self, JoinHandle}, @@ -106,6 +106,8 @@ struct TestEnv { impl TestEnv { async fn new(rpc: String) -> Result { + let block_time = Duration::from_secs(1); + let db = Database::from_one(&MemDb::default()); let net_config = ethexe_network::NetworkConfiguration::new_local(); @@ -126,7 +128,7 @@ impl TestEnv { let sender_address = sender_public_key.to_address(); let validators = vec![validator_public_key.to_address()]; let ethereum = Ethereum::deploy(&rpc, validators, signer.clone(), sender_address).await?; - let blob_reader = Arc::new(MockBlobReader::default()); + let blob_reader = Arc::new(MockBlobReader::new(block_time)); let router_address = ethereum.router().address(); @@ -180,6 +182,7 @@ impl TestEnv { Some(validator), None, rpc, + block_time, ); let env = TestEnv { diff --git a/ethexe/observer/src/blobs.rs b/ethexe/observer/src/blobs.rs index 87528db7d90..a1e0e548bf2 100644 --- a/ethexe/observer/src/blobs.rs +++ b/ethexe/observer/src/blobs.rs @@ -19,8 +19,6 @@ use tokio::{ time::{self, Duration}, }; -const BEACON_BLOCK_TIME: u64 = 12; - #[async_trait] pub trait BlobReader: Send + Sync { async fn read_blob_from_tx_hash(&self, tx_hash: H256, attempts: Option) -> Result>; @@ -31,14 +29,20 @@ pub struct ConsensusLayerBlobReader { provider: ObserverProvider, http_client: Client, ethereum_beacon_rpc: String, + beacon_block_time: Duration, } impl ConsensusLayerBlobReader { - pub async fn new(ethereum_rpc: &str, ethereum_beacon_rpc: &str) -> Result { + pub async fn new( + ethereum_rpc: &str, + ethereum_beacon_rpc: &str, + beacon_block_time: Duration, + ) -> Result { Ok(Self { provider: ProviderBuilder::new().on_builtin(ethereum_rpc).await?, http_client: Client::new(), ethereum_beacon_rpc: ethereum_beacon_rpc.into(), + beacon_block_time, }) } @@ -78,7 +82,8 @@ impl BlobReader for ConsensusLayerBlobReader { .get_block_by_hash(block_hash, BlockTransactionsKind::Hashes) .await? .ok_or_else(|| anyhow!("failed to get block"))?; - let slot = (block.header.timestamp - BEACON_GENESIS_BLOCK_TIME) / BEACON_BLOCK_TIME; + let slot = + (block.header.timestamp - BEACON_GENESIS_BLOCK_TIME) / self.beacon_block_time.as_secs(); let blob_bundle_result = match attempts { Some(attempts) => { let mut count = 0; @@ -88,7 +93,7 @@ impl BlobReader for ConsensusLayerBlobReader { if blob_bundle_result.is_ok() || count >= attempts { break blob_bundle_result; } else { - time::sleep(Duration::from_secs(BEACON_BLOCK_TIME)).await; + time::sleep(self.beacon_block_time).await; count += 1; } } @@ -115,12 +120,20 @@ impl BlobReader for ConsensusLayerBlobReader { } } -#[derive(Default, Clone)] +#[derive(Clone)] pub struct MockBlobReader { transactions: Arc>>>, + block_time: Duration, } impl MockBlobReader { + pub fn new(block_time: Duration) -> Self { + Self { + transactions: Arc::new(RwLock::new(HashMap::new())), + block_time, + } + } + pub async fn add_blob_transaction(&self, tx_hash: H256, data: Vec) { self.transactions.write().await.insert(tx_hash, data); } @@ -138,7 +151,7 @@ impl BlobReader for MockBlobReader { if maybe_blob_data.is_some() || count >= attempts { break maybe_blob_data; } else { - time::sleep(Duration::from_secs(BEACON_BLOCK_TIME)).await; + time::sleep(self.block_time).await; count += 1; } } diff --git a/ethexe/observer/src/observer.rs b/ethexe/observer/src/observer.rs index 010be1794a2..c90038b0849 100644 --- a/ethexe/observer/src/observer.rs +++ b/ethexe/observer/src/observer.rs @@ -208,6 +208,8 @@ pub(crate) async fn read_block_events( #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use crate::MockBlobReader; use alloy::node_bindings::Anvil; @@ -244,7 +246,7 @@ mod tests { let validators = vec!["0x45D6536E3D4AdC8f4e13c5c4aA54bE968C55Abf1".parse()?]; let ethereum = Ethereum::deploy(ðereum_rpc, validators, signer, sender_address).await?; - let blob_reader = Arc::new(MockBlobReader::default()); + let blob_reader = Arc::new(MockBlobReader::new(Duration::from_secs(1))); let router_address = ethereum.router().address(); let cloned_blob_reader = blob_reader.clone();