Skip to content

Commit

Permalink
feat(ethexe): append --block-time arg; make prometheus args optional (
Browse files Browse the repository at this point in the history
  • Loading branch information
grishasobol authored Jul 18, 2024
1 parent 89802fd commit 67ae8d7
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 19 deletions.
7 changes: 6 additions & 1 deletion ethexe/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ pub struct Args {
#[arg(long = "max-depth")]
pub max_commitment_depth: Option<u32>,

/// Block time in seconds (approximate).
/// Ethexe uses it to estimate inner timeouts.
#[arg(long, default_value = "12")]
pub block_time: u64,

/// Run a temporary node.
///
/// A temporary directory will be created to store the configuration and will be deleted
Expand All @@ -96,7 +101,7 @@ pub struct Args {

#[allow(missing_docs)]
#[clap(flatten)]
pub prometheus_params: PrometheusParams,
pub prometheus_params: Option<PrometheusParams>,

#[command(subcommand)]
pub extra_command: Option<ExtraCommands>,
Expand Down
14 changes: 9 additions & 5 deletions ethexe/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use directories::ProjectDirs;
use ethexe_network::NetworkConfiguration;
use ethexe_prometheus_endpoint::Registry;
use ethexe_signer::PublicKey;
use std::{iter, net::SocketAddr, path::PathBuf};
use std::{iter, net::SocketAddr, path::PathBuf, time::Duration};
use tempfile::TempDir;

const DEFAULT_PROMETHEUS_PORT: u16 = 9635;
Expand Down Expand Up @@ -79,9 +79,12 @@ pub struct Config {
/// Address of Ethereum Router contract
pub ethereum_router_address: String,

// Max depth to discover last commitment.
/// Max depth to discover last commitment.
pub max_commitment_depth: u32,

/// Block production time.
pub block_time: Duration,

/// Network path
pub network_path: PathBuf,

Expand Down Expand Up @@ -159,10 +162,11 @@ impl TryFrom<Args> for Config {
.ethereum_router_address
.unwrap_or(chain_spec.ethereum_router_address),
max_commitment_depth: args.max_commitment_depth.unwrap_or(1000),
block_time: Duration::from_secs(args.block_time),
net_config,
prometheus_config: args
.prometheus_params
.prometheus_config(DEFAULT_PROMETHEUS_PORT, "ethexe-dev".to_string()),
prometheus_config: args.prometheus_params.and_then(|params| {
params.prometheus_config(DEFAULT_PROMETHEUS_PORT, "ethexe-dev".to_string())
}),
database_path: base_path.join("db"),
network_path: base_path.join("net"),
key_path: base_path.join("key"),
Expand Down
16 changes: 13 additions & 3 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use ethexe_validator::Commitment;
use futures::{future, stream::StreamExt, FutureExt};
use gprimitives::H256;
use parity_scale_codec::Decode;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::time;

/// ethexe service.
Expand All @@ -47,6 +47,7 @@ pub struct Service {
validator: Option<ethexe_validator::Validator>,
metrics_service: Option<MetricsService>,
rpc: ethexe_rpc::RpcService,
block_time: Duration,
}

async fn maybe_sleep(maybe_timer: &mut Option<time::Sleep>) {
Expand All @@ -66,6 +67,7 @@ impl Service {
ethexe_observer::ConsensusLayerBlobReader::new(
&config.ethereum_rpc,
&config.ethereum_beacon_rpc,
config.block_time,
)
.await?,
);
Expand Down Expand Up @@ -148,6 +150,7 @@ impl Service {
validator,
metrics_service,
rpc,
block_time: config.block_time,
})
}

Expand All @@ -164,6 +167,7 @@ impl Service {
validator: Option<ethexe_validator::Validator>,
metrics_service: Option<MetricsService>,
rpc: ethexe_rpc::RpcService,
block_time: Duration,
) -> Self {
Self {
db,
Expand All @@ -176,6 +180,7 @@ impl Service {
validator,
metrics_service,
rpc,
block_time,
}
}

Expand Down Expand Up @@ -366,6 +371,7 @@ impl Service {
mut validator,
metrics_service,
rpc,
block_time,
} = self;

let network_service = network.service().clone();
Expand Down Expand Up @@ -440,7 +446,7 @@ impl Service {
}
}

delay = Some(tokio::time::sleep(std::time::Duration::from_secs(3)));
delay = Some(tokio::time::sleep(block_time / 4));
}
message = gossip_stream.next() => {
if let Some(message) = message {
Expand Down Expand Up @@ -486,7 +492,10 @@ impl Service {
mod tests {
use super::Service;
use crate::config::{Config, PrometheusConfig};
use std::net::{Ipv4Addr, SocketAddr};
use std::{
net::{Ipv4Addr, SocketAddr},
time::Duration,
};

#[tokio::test]
async fn basics() {
Expand All @@ -496,6 +505,7 @@ mod tests {
ethereum_beacon_rpc: "http://localhost:5052".into(),
ethereum_router_address: "0x05069E9045Ca0D2B72840c6A21C7bE588E02089A".into(),
max_commitment_depth: 1000,
block_time: Duration::from_secs(1),
key_path: "/tmp/key".into(),
network_path: "/tmp/net".into(),
net_config: ethexe_network::NetworkConfiguration::new_local(),
Expand Down
7 changes: 5 additions & 2 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use ethexe_validator::Validator;
use futures::StreamExt;
use gear_core::ids::prelude::*;
use gprimitives::{ActorId, CodeId, H256};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::{
sync::mpsc::{self, Receiver},
task::{self, JoinHandle},
Expand Down Expand Up @@ -106,6 +106,8 @@ struct TestEnv {

impl TestEnv {
async fn new(rpc: String) -> Result<TestEnv> {
let block_time = Duration::from_secs(1);

let db = Database::from_one(&MemDb::default());

let net_config = ethexe_network::NetworkConfiguration::new_local();
Expand All @@ -126,7 +128,7 @@ impl TestEnv {
let sender_address = sender_public_key.to_address();
let validators = vec![validator_public_key.to_address()];
let ethereum = Ethereum::deploy(&rpc, validators, signer.clone(), sender_address).await?;
let blob_reader = Arc::new(MockBlobReader::default());
let blob_reader = Arc::new(MockBlobReader::new(block_time));

let router_address = ethereum.router().address();

Expand Down Expand Up @@ -180,6 +182,7 @@ impl TestEnv {
Some(validator),
None,
rpc,
block_time,
);

let env = TestEnv {
Expand Down
27 changes: 20 additions & 7 deletions ethexe/observer/src/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use tokio::{
time::{self, Duration},
};

const BEACON_BLOCK_TIME: u64 = 12;

#[async_trait]
pub trait BlobReader: Send + Sync {
async fn read_blob_from_tx_hash(&self, tx_hash: H256, attempts: Option<u8>) -> Result<Vec<u8>>;
Expand All @@ -31,14 +29,20 @@ pub struct ConsensusLayerBlobReader {
provider: ObserverProvider,
http_client: Client,
ethereum_beacon_rpc: String,
beacon_block_time: Duration,
}

impl ConsensusLayerBlobReader {
pub async fn new(ethereum_rpc: &str, ethereum_beacon_rpc: &str) -> Result<Self> {
pub async fn new(
ethereum_rpc: &str,
ethereum_beacon_rpc: &str,
beacon_block_time: Duration,
) -> Result<Self> {
Ok(Self {
provider: ProviderBuilder::new().on_builtin(ethereum_rpc).await?,
http_client: Client::new(),
ethereum_beacon_rpc: ethereum_beacon_rpc.into(),
beacon_block_time,
})
}

Expand Down Expand Up @@ -78,7 +82,8 @@ impl BlobReader for ConsensusLayerBlobReader {
.get_block_by_hash(block_hash, BlockTransactionsKind::Hashes)
.await?
.ok_or_else(|| anyhow!("failed to get block"))?;
let slot = (block.header.timestamp - BEACON_GENESIS_BLOCK_TIME) / BEACON_BLOCK_TIME;
let slot =
(block.header.timestamp - BEACON_GENESIS_BLOCK_TIME) / self.beacon_block_time.as_secs();
let blob_bundle_result = match attempts {
Some(attempts) => {
let mut count = 0;
Expand All @@ -88,7 +93,7 @@ impl BlobReader for ConsensusLayerBlobReader {
if blob_bundle_result.is_ok() || count >= attempts {
break blob_bundle_result;
} else {
time::sleep(Duration::from_secs(BEACON_BLOCK_TIME)).await;
time::sleep(self.beacon_block_time).await;
count += 1;
}
}
Expand All @@ -115,12 +120,20 @@ impl BlobReader for ConsensusLayerBlobReader {
}
}

#[derive(Default, Clone)]
#[derive(Clone)]
pub struct MockBlobReader {
transactions: Arc<RwLock<HashMap<H256, Vec<u8>>>>,
block_time: Duration,
}

impl MockBlobReader {
pub fn new(block_time: Duration) -> Self {
Self {
transactions: Arc::new(RwLock::new(HashMap::new())),
block_time,
}
}

pub async fn add_blob_transaction(&self, tx_hash: H256, data: Vec<u8>) {
self.transactions.write().await.insert(tx_hash, data);
}
Expand All @@ -138,7 +151,7 @@ impl BlobReader for MockBlobReader {
if maybe_blob_data.is_some() || count >= attempts {
break maybe_blob_data;
} else {
time::sleep(Duration::from_secs(BEACON_BLOCK_TIME)).await;
time::sleep(self.block_time).await;
count += 1;
}
}
Expand Down
4 changes: 3 additions & 1 deletion ethexe/observer/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ pub(crate) async fn read_block_events(

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::MockBlobReader;
use alloy::node_bindings::Anvil;
Expand Down Expand Up @@ -244,7 +246,7 @@ mod tests {
let validators = vec!["0x45D6536E3D4AdC8f4e13c5c4aA54bE968C55Abf1".parse()?];

let ethereum = Ethereum::deploy(&ethereum_rpc, validators, signer, sender_address).await?;
let blob_reader = Arc::new(MockBlobReader::default());
let blob_reader = Arc::new(MockBlobReader::new(Duration::from_secs(1)));

let router_address = ethereum.router().address();
let cloned_blob_reader = blob_reader.clone();
Expand Down

0 comments on commit 67ae8d7

Please sign in to comment.