Skip to content

Commit

Permalink
Merge branch 'master' into sem-token-bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej authored Sep 10, 2023
2 parents 84f5eea + 61f095f commit 3ce7a52
Show file tree
Hide file tree
Showing 17 changed files with 809 additions and 638 deletions.
2 changes: 1 addition & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test-util = ["tokio/test-util"]

[dependencies]
tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"
criterion = "0.5.1"
rand = "0.8"
rand_chacha = "0.3"

Expand Down
98 changes: 55 additions & 43 deletions benches/copy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bencher::{benchmark_group, benchmark_main, Bencher};
use criterion::{criterion_group, criterion_main, Criterion};

use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;
Expand Down Expand Up @@ -174,65 +174,77 @@ fn rt() -> tokio::runtime::Runtime {
.unwrap()
}

fn copy_mem_to_mem(b: &mut Bencher) {
fn copy_mem_to_mem(c: &mut Criterion) {
let rt = rt();

b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})
c.bench_function("copy_mem_to_mem", |b| {
b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})
});
}

fn copy_mem_to_slow_hdd(b: &mut Bencher) {
fn copy_mem_to_slow_hdd(c: &mut Criterion) {
let rt = rt();

b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})
c.bench_function("copy_mem_to_slow_hdd", |b| {
b.iter(|| {
let task = || async {
let mut source = repeat(0).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})
});
}

fn copy_chunk_to_mem(b: &mut Bencher) {
fn copy_chunk_to_mem(c: &mut Criterion) {
let rt = rt();
b.iter(|| {
let task = || async {
let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})

c.bench_function("copy_chunk_to_mem", |b| {
b.iter(|| {
let task = || async {
let mut source =
ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = Vec::new();
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})
});
}

fn copy_chunk_to_slow_hdd(b: &mut Bencher) {
fn copy_chunk_to_slow_hdd(c: &mut Criterion) {
let rt = rt();
b.iter(|| {
let task = || async {
let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})

c.bench_function("copy_chunk_to_slow_hdd", |b| {
b.iter(|| {
let task = || async {
let mut source =
ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
copy(&mut source, &mut dest).await.unwrap();
};

rt.block_on(task());
})
});
}

benchmark_group!(
criterion_group!(
copy_bench,
copy_mem_to_mem,
copy_mem_to_slow_hdd,
copy_chunk_to_mem,
copy_chunk_to_slow_hdd,
);
benchmark_main!(copy_bench);
criterion_main!(copy_bench);
103 changes: 56 additions & 47 deletions benches/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::codec::{BytesCodec, FramedRead /*FramedWrite*/};

use bencher::{benchmark_group, benchmark_main, Bencher};
use criterion::{criterion_group, criterion_main, Criterion};

use std::fs::File as StdFile;
use std::io::Read as StdRead;
Expand All @@ -23,81 +23,90 @@ const BLOCK_COUNT: usize = 1_000;
const BUFFER_SIZE: usize = 4096;
const DEV_ZERO: &str = "/dev/zero";

fn async_read_codec(b: &mut Bencher) {
fn async_read_codec(c: &mut Criterion) {
let rt = rt();

b.iter(|| {
let task = || async {
let file = File::open(DEV_ZERO).await.unwrap();
let mut input_stream = FramedRead::with_capacity(file, BytesCodec::new(), BUFFER_SIZE);
c.bench_function("async_read_codec", |b| {
b.iter(|| {
let task = || async {
let file = File::open(DEV_ZERO).await.unwrap();
let mut input_stream =
FramedRead::with_capacity(file, BytesCodec::new(), BUFFER_SIZE);

for _i in 0..BLOCK_COUNT {
let _bytes = input_stream.next().await.unwrap();
}
};
for _i in 0..BLOCK_COUNT {
let _bytes = input_stream.next().await.unwrap();
}
};

rt.block_on(task());
rt.block_on(task());
})
});
}

fn async_read_buf(b: &mut Bencher) {
fn async_read_buf(c: &mut Criterion) {
let rt = rt();

b.iter(|| {
let task = || async {
let mut file = File::open(DEV_ZERO).await.unwrap();
let mut buffer = [0u8; BUFFER_SIZE];

for _i in 0..BLOCK_COUNT {
let count = file.read(&mut buffer).await.unwrap();
if count == 0 {
break;
c.bench_function("async_read_buf", |b| {
b.iter(|| {
let task = || async {
let mut file = File::open(DEV_ZERO).await.unwrap();
let mut buffer = [0u8; BUFFER_SIZE];

for _i in 0..BLOCK_COUNT {
let count = file.read(&mut buffer).await.unwrap();
if count == 0 {
break;
}
}
}
};
};

rt.block_on(task());
rt.block_on(task());
});
});
}

fn async_read_std_file(b: &mut Bencher) {
fn async_read_std_file(c: &mut Criterion) {
let rt = rt();

let task = || async {
let mut file = tokio::task::block_in_place(|| Box::pin(StdFile::open(DEV_ZERO).unwrap()));
c.bench_function("async_read_std_file", |b| {
b.iter(|| {
let task = || async {
let mut file =
tokio::task::block_in_place(|| Box::pin(StdFile::open(DEV_ZERO).unwrap()));

for _i in 0..BLOCK_COUNT {
let mut buffer = [0u8; BUFFER_SIZE];
let mut file_ref = file.as_mut();
for _i in 0..BLOCK_COUNT {
let mut buffer = [0u8; BUFFER_SIZE];
let mut file_ref = file.as_mut();

tokio::task::block_in_place(move || {
file_ref.read_exact(&mut buffer).unwrap();
});
}
};
tokio::task::block_in_place(move || {
file_ref.read_exact(&mut buffer).unwrap();
});
}
};

b.iter(|| {
rt.block_on(task());
rt.block_on(task());
});
});
}

fn sync_read(b: &mut Bencher) {
b.iter(|| {
let mut file = StdFile::open(DEV_ZERO).unwrap();
let mut buffer = [0u8; BUFFER_SIZE];
fn sync_read(c: &mut Criterion) {
c.bench_function("sync_read", |b| {
b.iter(|| {
let mut file = StdFile::open(DEV_ZERO).unwrap();
let mut buffer = [0u8; BUFFER_SIZE];

for _i in 0..BLOCK_COUNT {
file.read_exact(&mut buffer).unwrap();
}
for _i in 0..BLOCK_COUNT {
file.read_exact(&mut buffer).unwrap();
}
})
});
}

benchmark_group!(
criterion_group!(
file,
async_read_std_file,
async_read_buf,
async_read_codec,
sync_read
);

benchmark_main!(file);
criterion_main!(file);
74 changes: 40 additions & 34 deletions benches/rt_current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,50 @@
use tokio::runtime::{self, Runtime};

use bencher::{benchmark_group, benchmark_main, Bencher};
use criterion::{criterion_group, criterion_main, Criterion};

const NUM_SPAWN: usize = 1_000;

fn spawn_many_local(b: &mut Bencher) {
fn spawn_many_local(c: &mut Criterion) {
let rt = rt();
let mut handles = Vec::with_capacity(NUM_SPAWN);

b.iter(|| {
rt.block_on(async {
for _ in 0..NUM_SPAWN {
handles.push(tokio::spawn(async move {}));
}

for handle in handles.drain(..) {
handle.await.unwrap();
}
});
c.bench_function("spawn_many_local", |b| {
b.iter(|| {
rt.block_on(async {
for _ in 0..NUM_SPAWN {
handles.push(tokio::spawn(async move {}));
}

for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
}

fn spawn_many_remote_idle(b: &mut Bencher) {
fn spawn_many_remote_idle(c: &mut Criterion) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);

b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
c.bench_function("spawn_many_remote_idle", |b| {
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
});

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
}

fn spawn_many_remote_busy(b: &mut Bencher) {
fn spawn_many_remote_busy(c: &mut Criterion) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);
Expand All @@ -56,28 +60,30 @@ fn spawn_many_remote_busy(b: &mut Bencher) {
iter()
});

b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
c.bench_function("spawn_many_remote_busy", |b| {
b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}
});

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
})
});
}

fn rt() -> Runtime {
runtime::Builder::new_current_thread().build().unwrap()
}

benchmark_group!(
criterion_group!(
scheduler,
spawn_many_local,
spawn_many_remote_idle,
spawn_many_remote_busy
);

benchmark_main!(scheduler);
criterion_main!(scheduler);
Loading

0 comments on commit 3ce7a52

Please sign in to comment.