Skip to content

Commit 8c809f4

Browse files
committed
Update
1 parent b50ddc8 commit 8c809f4

File tree

1 file changed

+64
-24
lines changed

1 file changed

+64
-24
lines changed

cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use zombienet_sdk::{
1616
};
1717

1818
const GROUP_SIZE: u32 = 6;
19-
const PARTICIPANT_SIZE: u32 = GROUP_SIZE * 8333;
19+
const PARTICIPANT_SIZE: u32 = GROUP_SIZE * 8333; // Target ~50,000 total
2020
const MESSAGE_SIZE: usize = 5 * 1024; // 5KiB
2121
const MESSAGE_COUNT: usize = 2;
2222
const MAX_RETRIES: u32 = 100;
@@ -33,7 +33,7 @@ async fn statement_store_one_node_bench() -> Result<(), anyhow::Error> {
3333

3434
info!("Starting statement store benchmark with {} participants", PARTICIPANT_SIZE);
3535

36-
let target_node = "charlie";
36+
let target_node = "alice";
3737
let node = network.get_node(target_node)?;
3838
let rpc_client = node.rpc().await?;
3939
info!("Created single RPC client for target node: {}", target_node);
@@ -45,13 +45,54 @@ async fn statement_store_one_node_bench() -> Result<(), anyhow::Error> {
4545

4646
let handles: Vec<_> = participants
4747
.into_iter()
48-
.map(|mut participant| {
49-
tokio::spawn(async move {
50-
// Add staggered start delay to reduce initial network congestion
51-
tokio::time::sleep(Duration::from_millis(participant.idx as u64 * 1)).await;
52-
participant.run().await
53-
})
54-
})
48+
.map(|mut p| tokio::spawn(async move { p.run().await }))
49+
.collect();
50+
51+
let mut all_stats = Vec::new();
52+
for handle in handles {
53+
let stats = handle.await??;
54+
all_stats.push(stats);
55+
}
56+
57+
let aggregated_stats = ParticipantStats::aggregate(all_stats);
58+
aggregated_stats.log_summary();
59+
60+
Ok(())
61+
}
62+
63+
#[tokio::test(flavor = "multi_thread")]
64+
async fn statement_store_many_nodes_bench() -> Result<(), anyhow::Error> {
65+
let _ = env_logger::try_init_from_env(
66+
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
67+
);
68+
69+
let network = spawn_network().await?;
70+
71+
info!("Starting statement store benchmark with {} participants", PARTICIPANT_SIZE);
72+
73+
let collator_names = ["alice", "bob", "charlie", "dave", "eve", "ferdie"];
74+
let mut rpc_clients = Vec::new();
75+
for &name in &collator_names {
76+
let node = network.get_node(name)?;
77+
let rpc_client = node.rpc().await?;
78+
rpc_clients.push(rpc_client);
79+
}
80+
info!("Created RPC clients for {} collator nodes", rpc_clients.len());
81+
82+
let mut participants = Vec::with_capacity(PARTICIPANT_SIZE as usize);
83+
for i in 0..(PARTICIPANT_SIZE) as usize {
84+
let client_idx = i % GROUP_SIZE as usize;
85+
participants.push(Participant::new(i as u32, rpc_clients[client_idx].clone()));
86+
}
87+
info!(
88+
"Participants distributed across nodes: {} participants per group of {} nodes",
89+
PARTICIPANT_SIZE / GROUP_SIZE,
90+
GROUP_SIZE
91+
);
92+
93+
let handles: Vec<_> = participants
94+
.into_iter()
95+
.map(|mut participant| tokio::spawn(async move { participant.run().await }))
5596
.collect();
5697

5798
let mut all_stats = Vec::new();
@@ -232,9 +273,9 @@ impl AggregatedStats {
232273
}
233274

234275
struct Participant {
276+
idx: u32,
235277
keyring: sr25519::Pair,
236278
session_key: sr25519::Pair,
237-
idx: u32,
238279
group_members: Vec<u32>,
239280
session_keys: HashMap<u32, sr25519::Public>,
240281
symmetric_keys: HashMap<u32, sr25519::Public>,
@@ -303,9 +344,7 @@ impl Participant {
303344
if self.retry_count % 10 == 0 {
304345
debug!(target: &self.log_target(), "[{}] Retry attempt {}", self.idx, self.retry_count);
305346
}
306-
let delay_ms =
307-
std::cmp::min(RETRY_DELAY_MS * (1 << std::cmp::min(self.retry_count / 5, 4)), 5000);
308-
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
347+
tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
309348

310349
Ok(())
311350
}
@@ -314,11 +353,6 @@ impl Participant {
314353
tokio::time::sleep(tokio::time::Duration::from_millis(RECEIVE_DELAY_MS)).await;
315354
}
316355

317-
async fn send_session_key(&mut self) -> Result<(), anyhow::Error> {
318-
let statement = self.public_key_statement();
319-
self.statement_submit(statement).await
320-
}
321-
322356
async fn receive_statements_with_retry<T, F, R>(
323357
&mut self,
324358
mut pending: Vec<T>,
@@ -335,7 +369,7 @@ impl Participant {
335369

336370
for item in &pending {
337371
match timeout(
338-
Duration::from_millis(RECEIVE_DELAY_MS * 3), // Increase timeout 3x
372+
Duration::from_millis(RETRY_DELAY_MS),
339373
self.statement_broadcasts_statement(topic_generator(item)),
340374
)
341375
.await
@@ -351,11 +385,9 @@ impl Participant {
351385
},
352386
Ok(Ok(statements)) if statements.is_empty() => {
353387
debug!(target: &self.log_target(), "[{}] No statements received for item {:?}", self.idx, item);
354-
self.receive_sleep().await;
355388
},
356389
err => {
357390
debug!(target: &self.log_target(), "[{}] Cannot receive statements for item {:?}, err: {:?}", self.idx, item, err);
358-
self.receive_sleep().await;
359391
},
360392
}
361393
}
@@ -372,6 +404,11 @@ impl Participant {
372404
Ok(())
373405
}
374406

407+
async fn send_session_key(&mut self) -> Result<(), anyhow::Error> {
408+
let statement = self.public_key_statement();
409+
self.statement_submit(statement).await
410+
}
411+
375412
async fn receive_session_keys(&mut self) -> Result<(), anyhow::Error> {
376413
let group_members = self.group_members.clone();
377414

@@ -956,27 +993,30 @@ impl Participant {
956993
self.receive_sleep().await;
957994
self.receive_symmetric_keys().await?;
958995

959-
debug!(target: &self.log_target(), "[{}] Symmetric key acknowledgments", self.idx);
996+
debug!(target: &self.log_target(), "[{}] Symmetric key acknowledgments exchange", self.idx);
960997
self.send_symmetric_key_acknowledgments().await?;
961998
self.receive_sleep().await;
962999
self.receive_symmetric_key_acknowledgments().await?;
9631000

9641001
debug!(target: &self.log_target(), "[{}] Preparation finished", self.idx);
965-
for round in 0..MESSAGE_COUNT {
966-
debug!(target: &self.log_target(), "[{}] Req/res exchange round {}", self.idx, round + 1);
9671002

1003+
for round in 0..MESSAGE_COUNT {
1004+
debug!(target: &self.log_target(), "[{}] Requests exchange, round {}", self.idx, round + 1);
9681005
self.send_requests(round).await?;
9691006
self.receive_sleep().await;
9701007
self.receive_requests(round).await?;
9711008

1009+
debug!(target: &self.log_target(), "[{}] Request acknowledgments exchange, round {}", self.idx, round + 1);
9721010
self.send_request_acknowledgments().await?;
9731011
self.receive_sleep().await;
9741012
self.receive_request_acknowledgments().await?;
9751013

1014+
debug!(target: &self.log_target(), "[{}] Responses exchange, round {}", self.idx, round + 1);
9761015
self.send_responses(round).await?;
9771016
self.receive_sleep().await;
9781017
self.receive_responses(round).await?;
9791018

1019+
debug!(target: &self.log_target(), "[{}] Response acknowledgments exchange, round {}", self.idx, round + 1);
9801020
self.send_response_acknowledgments().await?;
9811021
self.receive_sleep().await;
9821022
self.receive_response_acknowledgments().await?;

0 commit comments

Comments
 (0)