Skip to content

Commit

Permalink
test: enable data_with_churn using networkspawner
Browse files Browse the repository at this point in the history
  • Loading branch information
ermineJose committed Mar 6, 2025
1 parent 2c3ed57 commit b533664
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 67 deletions.
38 changes: 19 additions & 19 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -684,15 +684,6 @@ jobs:
echo "EVM_NETWORK has been set to $EVM_NETWORK"
fi
- name: Chunks data integrity during nodes churn
run: cargo test --release -p ant-node --test data_with_churn -- --nocapture
env:
TEST_DURATION_MINS: 5
TEST_TOTAL_CHURN_CYCLES: 15
ANT_LOG: "all"
CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }}
timeout-minutes: 30

# Sleep for a while to allow restarted nodes can be detected by others
- name: Sleep a while
run: sleep 300
Expand All @@ -705,23 +696,32 @@ jobs:
log_file_prefix: safe_test_logs_churn
platform: ${{ matrix.os }}

- name: Chunks data integrity during nodes churn
run: cargo test --release -p ant-node --test data_with_churn -- --nocapture
env:
TEST_DURATION_MINS: 5
TEST_TOTAL_CHURN_CYCLES: 15
ANT_LOG: "all"
CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }}
timeout-minutes: 30

- name: Get total node count
shell: bash
timeout-minutes: 1
run: |
node_count=$(ls "${{ matrix.node_data_path }}" | wc -l)
echo "Node dir count is $node_count"
- name: Get restart of nodes using rg
shell: bash
timeout-minutes: 1
# get the counts, then the specific line, and then the digit count only
# then check we have an expected level of restarts
# TODO: make this use an env var, or relate to testnet size
run: |
restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Restarted $restart_count nodes"
# - name: Get restart of nodes using rg
# shell: bash
# timeout-minutes: 1
# # get the counts, then the specific line, and then the digit count only
# # then check we have an expected level of restarts
# # TODO: make this use an env var, or relate to testnet size
# run: |
# restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \
# rg "(\d+) matches" | rg "\d+" -o)
# echo "Restarted $restart_count nodes"

# `PeerRemovedFromRoutingTable` now only happens when a peer reported as `BadNode`.
# Otherwise kad will remove a `dropped out node` directly from RT.
Expand Down
227 changes: 182 additions & 45 deletions ant-node/tests/data_with_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,34 @@

mod common;

use crate::common::{
client::{get_client_and_funded_wallet, get_node_count},
NodeRestart,
};
use crate::common::client::get_node_count;
use ant_evm::Amount;
use ant_logging::LogBuilder;
use ant_node::{
spawn::{
network_spawner::{NetworkSpawner, RunningNetwork},
node_spawner::NodeSpawner,
},
RunningNode,
};
use ant_protocol::{
storage::{ChunkAddress, GraphEntry, GraphEntryAddress, PointerTarget, ScratchpadAddress},
NetworkAddress,
};
use autonomi::{data::DataAddress, Client, Wallet};
use autonomi::{data::DataAddress, Client, ClientConfig, Wallet};
use bls::{PublicKey, SecretKey};
use bytes::Bytes;
use common::client::transfer_to_new_wallet;
use evmlib::Network;
use eyre::{bail, ErrReport, Result};
use libp2p::Multiaddr;
use rand::Rng;
use self_encryption::MAX_CHUNK_SIZE;
use std::{
collections::{BTreeMap, HashMap, VecDeque},
fmt,
fs::create_dir_all,
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
sync::{Arc, LazyLock},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -77,6 +85,34 @@ type ContentErredList = Arc<RwLock<BTreeMap<NetworkAddress, ContentError>>>;

#[tokio::test(flavor = "multi_thread")]
async fn data_availability_during_churn() -> Result<()> {
let evm_testnet = evmlib::testnet::Testnet::new().await;
println!("created evm_testnet");
let evm_network = evm_testnet.to_network();
let evm_sk = evm_testnet.default_wallet_private_key();
println!("created evm_sk");
let funded_wallet =
Wallet::new_from_private_key(evm_network.clone(), &evm_sk).expect("Invalid private key");
println!("created funded_wallet");
let network = NetworkSpawner::new()
.with_evm_network(evm_network.clone())
.with_rewards_address(funded_wallet.address())
.with_local(true)
.with_size(20)
.spawn()
.await
.unwrap();
println!("created network with networkspawner");
let peer = network.bootstrap_peer().await;
println!("got the peer");
let config = ClientConfig {
local: true,
peers: Some(vec![peer]),
evm_network: evm_network.clone(),
strategy: autonomi::ClientOperatingStrategy::default(),
};
println!("created config");
let client = Client::init_with_config(config).await.unwrap();
println!("created client");
let _log_appender_guard = LogBuilder::init_multi_threaded_tokio_test("data_with_churn", false);

let test_duration = if let Ok(str) = std::env::var("TEST_DURATION_MINS") {
Expand Down Expand Up @@ -118,11 +154,9 @@ async fn data_availability_during_churn() -> Result<()> {
if chunks_only { " (Chunks only)" } else { "" }
);

let (client, main_wallet) = get_client_and_funded_wallet().await;

info!(
"Client and wallet created. Main wallet address: {:?}",
main_wallet.address()
funded_wallet.address()
);

// Shared bucket where we keep track of content created/stored on the network
Expand All @@ -131,7 +165,17 @@ async fn data_availability_during_churn() -> Result<()> {
println!("Uploading some chunks before carry out node churning");
info!("Uploading some chunks before carry out node churning");

let chunk_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let chunk_wallet = Wallet::new_with_random_wallet(evm_network.clone());
funded_wallet
.transfer_tokens(chunk_wallet.address(), Amount::from(TOKENS_TO_TRANSFER))
.await?;
funded_wallet
.transfer_gas_tokens(
chunk_wallet.address(),
Amount::from_str("10000000000000000000")?,
)
.await?;

// Spawn a task to store Chunks at random locations, at a higher frequency than the churning events
let store_chunks_handle = store_chunks_task(
client.clone(),
Expand All @@ -143,7 +187,16 @@ async fn data_availability_during_churn() -> Result<()> {
// Spawn a task to create Pointers at random locations,
// at a higher frequency than the churning events
let create_pointer_handle = if !chunks_only {
let pointer_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let pointer_wallet = Wallet::new_with_random_wallet(evm_network.clone());
funded_wallet
.transfer_tokens(pointer_wallet.address(), Amount::from(TOKENS_TO_TRANSFER))
.await?;
funded_wallet
.transfer_gas_tokens(
pointer_wallet.address(),
Amount::from_str("10000000000000000000")?,
)
.await?;
let create_pointer_handle = create_pointers_task(
client.clone(),
pointer_wallet,
Expand All @@ -158,7 +211,19 @@ async fn data_availability_during_churn() -> Result<()> {
// Spawn a task to create GraphEntry at random locations,
// at a higher frequency than the churning events
let create_graph_entry_handle = if !chunks_only {
let graph_entry_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let graph_entry_wallet = Wallet::new_with_random_wallet(evm_network.clone());
funded_wallet
.transfer_tokens(
graph_entry_wallet.address(),
Amount::from(TOKENS_TO_TRANSFER),
)
.await?;
funded_wallet
.transfer_gas_tokens(
graph_entry_wallet.address(),
Amount::from_str("10000000000000000000")?,
)
.await?;
let create_graph_entry_handle = create_graph_entry_task(
client.clone(),
graph_entry_wallet,
Expand All @@ -173,7 +238,19 @@ async fn data_availability_during_churn() -> Result<()> {
// Spawn a task to create ScratchPad at random locations,
// at a higher frequency than the churning events
let create_scratchpad_handle = if !chunks_only {
let scratchpad_wallet = transfer_to_new_wallet(&main_wallet, TOKENS_TO_TRANSFER).await?;
let scratchpad_wallet = Wallet::new_with_random_wallet(evm_network.clone());
funded_wallet
.transfer_tokens(
scratchpad_wallet.address(),
Amount::from(TOKENS_TO_TRANSFER),
)
.await?;
funded_wallet
.transfer_gas_tokens(
scratchpad_wallet.address(),
Amount::from_str("10000000000000000000")?,
)
.await?;
let create_scratchpad_handle = create_scratchpad_task(
client.clone(),
scratchpad_wallet,
Expand All @@ -186,7 +263,18 @@ async fn data_availability_during_churn() -> Result<()> {
};

// Spawn a task to churn nodes
churn_nodes_task(Arc::clone(&churn_count), test_duration, churn_period);
tokio::spawn(async move {
let _ = data_churn_with_network_restart(
network,
&evm_network,
&funded_wallet,
true,
false,
churn_period,
test_duration,
)
.await;
});

// Shared bucket where we keep track of the content which erred when creating/storing/fetching.
// We remove them from this bucket if we are then able to query/fetch them successfully.
Expand Down Expand Up @@ -311,6 +399,86 @@ async fn data_availability_during_churn() -> Result<()> {
Ok(())
}

async fn data_churn_with_network_restart(
running_network: RunningNetwork,
evm_network: &Network,
funded_wallet: &Wallet,
local: bool,
upnp: bool,
churn_period: Duration,
total_period: Duration,
) -> Result<()> {
println!("data churning for the network spawner initiated");
info!("data churning for the network spawner initiated");
let start = Instant::now();
let mut churn_count = 1;

let mut running_nodes = running_network.running_nodes().clone();
'outer: loop {
let mut restarted_nodes: Vec<RunningNode> = Vec::new();
let mut initial_peers: Vec<Multiaddr> = vec![];
for peer in running_nodes.iter() {
if let Ok(listen_addrs_with_peer_id) = peer.get_listen_addrs_with_peer_id().await {
initial_peers.extend(listen_addrs_with_peer_id);
}
}
for nodes in running_nodes.clone() {
sleep(churn_period).await;
if start.elapsed() > total_period {
println!("Total period elapsed. Stopping the churn.");
info!("Total period elapsed. Stopping the churn.");
break 'outer;
}
println!(
"Churn #{churn_count} Churning a node with peer_id {:?}",
nodes.peer_id()
);
info!(
"Churn #{churn_count} Churning a node with peer_id {:?}",
nodes.peer_id()
);
nodes.clone().shutdown();
println!("Restarting the node with peer_id {:?}", nodes.peer_id());

churn_count += 1;
let mut temp_peer = initial_peers.clone();
if let Ok(listen_addrs_with_peer_id) = nodes.get_listen_addrs_with_peer_id().await {
for exclude_addr in listen_addrs_with_peer_id {
initial_peers = temp_peer
.iter() // Use iter() to borrow, not move
.filter(|addr| *addr != &exclude_addr) // Deref to compare values
.cloned() // Clone to collect into a new Vec
.collect();
temp_peer = initial_peers.clone();
}
}

let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
let node = NodeSpawner::new()
.with_socket_addr(socket_addr)
.with_evm_network(evm_network.clone())
.with_rewards_address(funded_wallet.address())
.with_initial_peers(initial_peers.clone())
.with_local(local)
.with_upnp(upnp)
.with_root_dir(None)
.spawn()
.await?;
sleep(Duration::from_secs(2)).await;
if let Ok(listen_addrs_with_peer_id) = node.get_listen_addrs_with_peer_id().await {
initial_peers.extend(listen_addrs_with_peer_id);
}
println!(
"A new Node joined the network with peer_id {:?}",
node.peer_id()
);
restarted_nodes.push(node);
}
running_nodes = restarted_nodes;
}
Ok(())
}

// Spawns a task which periodically creates ScratchPads at random locations.
fn create_scratchpad_task(
client: Client,
Expand Down Expand Up @@ -765,37 +933,6 @@ fn query_content_task(
});
}

// Spawns a task which periodically picks up a node, and restarts it to cause churn in the network.
fn churn_nodes_task(
churn_count: Arc<RwLock<usize>>,
test_duration: Duration,
churn_period: Duration,
) {
let start = Instant::now();
let _handle: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut node_restart = NodeRestart::new(true, false)?;

loop {
sleep(churn_period).await;

// break out if we've run the duration of churn
if start.elapsed() > test_duration {
debug!("Test duration reached, stopping churn nodes task");
break;
}

if let Err(err) = node_restart.restart_next(true, true).await {
println!("Failed to restart node {err}");
info!("Failed to restart node {err}");
continue;
}

*churn_count.write().await += 1;
}
Ok(())
});
}

// Checks (periodically) for any content that an error was reported either at the moment of its creation or
// in a later query attempt.
fn retry_query_content_task(
Expand Down
Loading

0 comments on commit b533664

Please sign in to comment.