Skip to content

Commit 6454aed

Browse files
committed
[broadcast] Use BTreeMap in buffered::Engine
## Overview Swaps the map type in `buffered::Engine` from `HashMap` to `BTreeMap`. This change makes the iteration over values returned by `buffered::Mailbox::get` deterministically ordered.
1 parent b835347 commit 6454aed

File tree

2 files changed

+60
-10
lines changed

2 files changed

+60
-10
lines changed

broadcast/src/buffered/engine.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{
1515
channel::{mpsc, oneshot},
1616
StreamExt,
1717
};
18-
use std::collections::{HashMap, VecDeque};
18+
use std::collections::{BTreeMap, VecDeque};
1919
use tracing::{debug, error, trace, warn};
2020

2121
/// A responder waiting for a message.
@@ -76,7 +76,7 @@ pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + D
7676

7777
/// Pending requests from the application.
7878
#[allow(clippy::type_complexity)]
79-
waiters: HashMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
79+
waiters: BTreeMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
8080

8181
////////////////////////////////////////
8282
// Cache
@@ -85,21 +85,21 @@ pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + D
8585
///
8686
/// We store messages outside of the deques to minimize memory usage
8787
/// when receiving duplicate messages.
88-
items: HashMap<M::Commitment, HashMap<M::Digest, M>>,
88+
items: BTreeMap<M::Commitment, BTreeMap<M::Digest, M>>,
8989

9090
/// A LRU cache of the latest received identities and digests from each peer.
9191
///
9292
/// This is used to limit the number of digests stored per peer.
9393
/// At most `deque_size` digests are stored per peer. This value is expected to be small, so
9494
/// membership checks are done in linear time.
9595
#[allow(clippy::type_complexity)]
96-
deques: HashMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
96+
deques: BTreeMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
9797

9898
/// The number of times each digest (globally unique) exists in one of the deques.
9999
///
100100
/// Multiple peers can send the same message and we only want to store
101101
/// the message once.
102-
counts: HashMap<M::Digest, usize>,
102+
counts: BTreeMap<M::Digest, usize>,
103103

104104
////////////////////////////////////////
105105
// Metrics
@@ -125,10 +125,10 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
125125
deque_size: cfg.deque_size,
126126
codec_config: cfg.codec_config,
127127
mailbox_receiver,
128-
waiters: HashMap::new(),
129-
deques: HashMap::new(),
130-
items: HashMap::new(),
131-
counts: HashMap::new(),
128+
waiters: BTreeMap::new(),
129+
deques: BTreeMap::new(),
130+
items: BTreeMap::new(),
131+
counts: BTreeMap::new(),
132132
metrics,
133133
};
134134

broadcast/src/buffered/mod.rs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ mod tests {
3838
use commonware_codec::RangeCfg;
3939
use commonware_cryptography::{
4040
ed25519::{PrivateKey, PublicKey},
41-
Committable, Digestible, PrivateKeyExt as _, Signer as _,
41+
sha256::Digest as Sha256Digest,
42+
Committable, Digestible, Hasher, PrivateKeyExt as _, Sha256, Signer as _,
4243
};
4344
use commonware_macros::{select, test_traced};
4445
use commonware_p2p::{
@@ -602,6 +603,55 @@ mod tests {
602603
});
603604
}
604605

606+
#[test_traced]
607+
fn test_get_all_for_commitment_deterministic_order() {
608+
let run = |seed: u64| {
609+
let config = deterministic::Config::new()
610+
.with_seed(seed)
611+
.with_timeout(Some(Duration::from_secs(5)));
612+
let runner = deterministic::Runner::new(config);
613+
runner.start(|context| async move {
614+
let (peers, mut registrations, _oracle) =
615+
initialize_simulation(context.clone(), 1, 1.0).await;
616+
let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
617+
618+
let sender1 = peers[0].clone();
619+
let mut mb1 = mailboxes.get(&sender1).unwrap().clone();
620+
621+
// Two messages share commitment but have distinct digests.
622+
let m1 = TestMessage::new(b"id", b"content-1");
623+
let m2 = TestMessage::new(b"id", b"content-2");
624+
let m3 = TestMessage::new(b"id", b"content-3");
625+
mb1.broadcast(Recipients::All, m1.clone())
626+
.await
627+
.await
628+
.unwrap();
629+
mb1.broadcast(Recipients::All, m2.clone())
630+
.await
631+
.await
632+
.unwrap();
633+
mb1.broadcast(Recipients::All, m3.clone())
634+
.await
635+
.await
636+
.unwrap();
637+
638+
let mut hasher = Sha256::default();
639+
let values = mb1.get(None, m1.commitment(), None).await;
640+
for value in values {
641+
hasher.update(&value.content);
642+
}
643+
hasher.finalize()
644+
})
645+
};
646+
647+
for seed in 0..10 {
648+
let h1 = run(seed);
649+
let h2 = run(seed);
650+
651+
assert_eq!(h1, h2, "Messages returned in different order for {seed}");
652+
}
653+
}
654+
605655
#[test_traced]
606656
fn test_ref_count_across_peers() {
607657
let runner = deterministic::Runner::timed(Duration::from_secs(10));

0 commit comments

Comments
 (0)