Skip to content

Commit

Permalink
reduce the lock contention in task spawn.
Browse files Browse the repository at this point in the history
The origin version:

spawn_tasks_current_thread
                        time:   [654.87 ns 664.26 ns 672.13 ns]
                        change: [-4.5814% -1.2511% +2.1604%] (p = 0.48 > 0.05)
                        No change in performance detected.

spawn_tasks_current_thread_parallel
                        time:   [537.55 ns 540.00 ns 542.20 ns]
                        change: [-1.0603% -0.2591% +0.6283%] (p = 0.58 > 0.05)
                        No change in performance detected.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

spawn_tasks             time:   [1.0447 µs 1.0533 µs 1.0610 µs]
                        change: [+0.9379% +2.2768% +3.7191%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild

spawn_tasks_parallel    time:   [992.38 ns 1.0002 µs 1.0073 µs]
                        change: [+0.1973% +1.4194% +2.5819%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe

This version:

spawn_tasks_current_thread
                        time:   [705.92 ns 724.31 ns 739.78 ns]

spawn_tasks_current_thread_parallel
                        time:   [529.33 ns 531.00 ns 532.61 ns]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

spawn_tasks             time:   [881.56 ns 892.21 ns 902.10 ns]
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) low mild
  1 (1.00%) high mild

spawn_tasks_parallel    time:   [815.00 ns 819.87 ns 824.60 ns]
Found 4 outliers among 100 measurements (4.00%)
  2 (2.00%) high mild
  2 (2.00%) high severe
  • Loading branch information
wathenjiang committed Sep 12, 2023
1 parent a6be73e commit 895958f
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 108 deletions.
9 changes: 9 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"
rand = "0.8"
rand_chacha = "0.3"
criterion = "0.5.1"
num_cpus = "1.16.0"

[dev-dependencies]
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
Expand Down Expand Up @@ -84,3 +86,10 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "spawn_concurrent"
path = "spawn_concurrent.rs"
harness = false


93 changes: 93 additions & 0 deletions benches/spawn_concurrent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::time::Instant;

use criterion::*;

fn spawn_tasks_current_thread(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("spawn_tasks_current_thread", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, 1).await);
});
start.elapsed()
})
});
}

fn spawn_tasks_current_thread_parallel(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("spawn_tasks_current_thread_parallel", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, num_cpus::get_physical() * 2).await);
});
start.elapsed()
})
});
}

fn spawn_tasks(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();

c.bench_function("spawn_tasks", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, 1).await);
});
start.elapsed()
})
});
}

fn spawn_tasks_parallel(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
c.bench_function("spawn_tasks_parallel", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, num_cpus::get_physical()).await);
});
start.elapsed()
})
});
}

async fn job(iters: usize, procs: usize) {
for _ in 0..procs {
let mut threads_handles = Vec::with_capacity(procs);
threads_handles.push(tokio::spawn(async move {
let mut thread_handles = Vec::with_capacity(iters / procs);
for _ in 0..iters / procs {
thread_handles.push(tokio::spawn(async {
let val = 1 + 1;
tokio::task::yield_now().await;
black_box(val)
}));
}
for handle in thread_handles {
handle.await.unwrap();
}
}));
for handle in threads_handles {
handle.await.unwrap();
}
}
}

criterion_group!(
benches,
spawn_tasks_current_thread,
spawn_tasks_current_thread_parallel,
spawn_tasks,
spawn_tasks_parallel
);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl CurrentThread {
let handle = Arc::new(Handle {
shared: Shared {
inject: Inject::new(),
owned: OwnedTasks::new(),
owned: OwnedTasks::new(1),
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(),
owned: OwnedTasks::new(16),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ pub(crate) struct Header {
/// The tracing ID for this instrumented task.
#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(super) tracing_id: Option<tracing::Id>,

/// The task's ID, is used for deciding which list to put it in.
pub(super) task_id: Id,
}

unsafe impl Send for Header {}
Expand Down Expand Up @@ -216,12 +219,14 @@ impl<T: Future, S: Schedule> Cell<T, S> {
fn new_header(
state: State,
vtable: &'static Vtable,
task_id: Id,
#[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id: Option<tracing::Id>,
) -> Header {
Header {
state,
queue_next: UnsafeCell::new(None),
vtable,
task_id,
owner_id: UnsafeCell::new(None),
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing_id,
Expand All @@ -235,6 +240,7 @@ impl<T: Future, S: Schedule> Cell<T, S> {
header: new_header(
state,
vtable,
task_id,
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing_id,
),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::fmt;
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub struct Id(u64);
pub struct Id(pub(crate) u64);

/// Returns the [`Id`] of the currently running task.
///
Expand Down
114 changes: 76 additions & 38 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};
use crate::util::linked_list::{Link, LinkedList};

use std::marker::PhantomData;
use std::num::NonZeroU64;
use std::sync::atomic::AtomicBool;

// The id from the module below is used to verify whether a given task is stored
// in this OwnedTasks, or some other task. The counter starts at one so we can
Expand Down Expand Up @@ -55,12 +57,14 @@ cfg_not_has_atomic_u64! {
}

pub(crate) struct OwnedTasks<S: 'static> {
inner: Mutex<CountedOwnedTasksInner<S>>,
lists: Vec<Mutex<CountedOwnedTasksInner<S>>>,
pub(crate) id: NonZeroU64,
closed: AtomicBool,
grain: usize,
count: AtomicUsize,
}
struct CountedOwnedTasksInner<S: 'static> {
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
closed: bool,
list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
}
pub(crate) struct LocalOwnedTasks<S: 'static> {
inner: UnsafeCell<OwnedTasksInner<S>>,
Expand All @@ -73,13 +77,25 @@ struct OwnedTasksInner<S: 'static> {
}

impl<S: 'static> OwnedTasks<S> {
pub(crate) fn new() -> Self {
/// grain must be an integer power of 2
pub(crate) fn new(grain: usize) -> Self {
assert_eq!(
grain & (grain - 1),
0,
"grain must be an integer power of 2"
);
let mut lists = Vec::with_capacity(grain);
for _ in 0..grain {
lists.push(Mutex::new(CountedOwnedTasksInner {
list: LinkedList::new(),
}))
}
Self {
inner: Mutex::new(CountedOwnedTasksInner {
list: CountedLinkedList::new(),
closed: false,
}),
lists,
closed: AtomicBool::new(false),
id: get_next_id(),
grain,
count: AtomicUsize::new(0),
}
}

Expand All @@ -97,12 +113,17 @@ impl<S: 'static> OwnedTasks<S> {
T::Output: Send + 'static,
{
let (task, notified, join) = super::new_task(task, scheduler, id);
let notified = unsafe { self.bind_inner(task, notified) };
let notified = unsafe { self.bind_inner(task, notified, id) };
(join, notified)
}

/// The part of `bind` that's the same for every type of future.
unsafe fn bind_inner(&self, task: Task<S>, notified: Notified<S>) -> Option<Notified<S>>
unsafe fn bind_inner(
&self,
task: Task<S>,
notified: Notified<S>,
id: super::Id,
) -> Option<Notified<S>>
where
S: Schedule,
{
Expand All @@ -111,25 +132,23 @@ impl<S: 'static> OwnedTasks<S> {
// to the field.
task.header().set_owner_id(self.id);
}

let mut lock = self.inner.lock();
if lock.closed {
drop(lock);
drop(notified);
// check close flag
if self.closed.load(Ordering::Relaxed) {
task.shutdown();
None
} else {
lock.list.push_front(task);
Some(notified)
return None;
}

let mut lock = self.lists[id.0 as usize & (self.grain - 1)].lock();
lock.list.push_front(task);
self.count.fetch_add(1, Ordering::Relaxed);
Some(notified)
}

/// Asserts that the given task is owned by this OwnedTasks and convert it to
/// a LocalNotified, giving the thread permission to poll this task.
#[inline]
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
debug_assert_eq!(task.header().get_owner_id(), Some(self.id));

// safety: All tasks bound to this OwnedTasks are Send, so it is safe
// to poll it on this thread no matter what thread we are on.
LocalNotified {
Expand All @@ -146,28 +165,35 @@ impl<S: 'static> OwnedTasks<S> {
{
// The first iteration of the loop was unrolled so it can set the
// closed bool.
let first_task = {
let mut lock = self.inner.lock();
lock.closed = true;
lock.list.pop_back()
};
match first_task {
Some(task) => task.shutdown(),
None => return,
}
self.closed.fetch_and(true, Ordering::SeqCst);

loop {
let task = match self.inner.lock().list.pop_back() {
Some(task) => task,
None => return,
for inner in &self.lists {
let first_task = {
let mut lock = inner.lock();
lock.list.pop_back()
};
match first_task {
Some(task) => {
self.count.fetch_sub(1, Ordering::Relaxed);
task.shutdown();
}
None => return,
}

task.shutdown();
loop {
match inner.lock().list.pop_back() {
Some(task) => {
self.count.fetch_sub(1, Ordering::Relaxed);
task.shutdown();
}
None => return,
};
}
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
self.count.load(Ordering::Relaxed)
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
Expand All @@ -179,11 +205,23 @@ impl<S: 'static> OwnedTasks<S> {

// safety: We just checked that the provided task is not in some other
// linked list.
unsafe { self.inner.lock().list.remove(task.header_ptr()) }
unsafe {
match self.lists[(task.header().task_id.0) as usize & (self.grain - 1)]
.lock()
.list
.remove(task.header_ptr())
{
Some(t) => {
self.count.fetch_sub(1, Ordering::Relaxed);
Some(t)
}
None => None,
}
}
}

pub(crate) fn is_empty(&self) -> bool {
self.inner.lock().list.is_empty()
self.count.load(Ordering::SeqCst) == 0
}
}

Expand Down
Loading

0 comments on commit 895958f

Please sign in to comment.