forked from tokio-rs/tokio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync_watch.rs
64 lines (57 loc) · 1.75 KB
/
sync_watch.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use bencher::{black_box, Bencher};
use rand::prelude::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{watch, Notify};
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(6)
.build()
.unwrap()
}
fn do_work(rng: &mut impl RngCore) -> u32 {
use std::fmt::Write;
let mut message = String::new();
for i in 1..=10 {
let _ = write!(&mut message, " {i}={}", rng.gen::<f64>());
}
message
.as_bytes()
.iter()
.map(|&c| c as u32)
.fold(0, u32::wrapping_add)
}
fn contention_resubscribe(b: &mut Bencher) {
const NTASK: u64 = 1000;
let rt = rt();
let (snd, rcv) = watch::channel(0i32);
let wg = Arc::new((AtomicU64::new(0), Notify::new()));
for n in 0..NTASK {
let mut rcv = rcv.clone();
let wg = wg.clone();
let mut rng = rand::rngs::StdRng::seed_from_u64(n);
rt.spawn(async move {
while rcv.changed().await.is_ok() {
let _ = *rcv.borrow(); // contend on rwlock
let r = do_work(&mut rng);
let _ = black_box(r);
if wg.0.fetch_sub(1, Ordering::Release) == 1 {
wg.1.notify_one();
}
}
});
}
b.iter(|| {
rt.block_on(async {
for _ in 0..100 {
assert_eq!(wg.0.fetch_add(NTASK, Ordering::Relaxed), 0);
let _ = snd.send(black_box(42));
while wg.0.load(Ordering::Acquire) > 0 {
wg.1.notified().await;
}
}
});
});
}
bencher::benchmark_group!(contention, contention_resubscribe);
bencher::benchmark_main!(contention);