diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 47a830416ab..4daeb9da56c 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -12,6 +12,8 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] } bencher = "0.1.5" rand = "0.8" rand_chacha = "0.3" +criterion = "0.5.1" +num_cpus = "1.16.0" [dev-dependencies] tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] } @@ -84,3 +86,10 @@ harness = false name = "time_now" path = "time_now.rs" harness = false + +[[bench]] +name = "spawn_concurrent" +path = "spawn_concurrent.rs" +harness = false + + diff --git a/benches/spawn_concurrent.rs b/benches/spawn_concurrent.rs new file mode 100644 index 00000000000..ca8f586703e --- /dev/null +++ b/benches/spawn_concurrent.rs @@ -0,0 +1,93 @@ +use std::time::Instant; + +use criterion::*; + +fn spawn_tasks_current_thread(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + c.bench_function("spawn_tasks_current_thread", move |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(job(iters as usize, 1).await); + }); + start.elapsed() + }) + }); +} + +fn spawn_tasks_current_thread_parallel(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + c.bench_function("spawn_tasks_current_thread_parallel", move |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(job(iters as usize, num_cpus::get_physical() * 2).await); + }); + start.elapsed() + }) + }); +} + +fn spawn_tasks(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + + c.bench_function("spawn_tasks", move |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(job(iters as usize, 1).await); + }); + start.elapsed() + }) + }); +} + +fn spawn_tasks_parallel(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + c.bench_function("spawn_tasks_parallel", move |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(job(iters as usize, num_cpus::get_physical()).await); + }); + start.elapsed() + }) + }); +} + +async fn job(iters: usize, procs: usize) { + for _ in 0..procs { + let mut threads_handles = Vec::with_capacity(procs); + threads_handles.push(tokio::spawn(async move { + let mut thread_handles = Vec::with_capacity(iters / procs); + for _ in 0..iters / procs { + thread_handles.push(tokio::spawn(async { + let val = 1 + 1; + tokio::task::yield_now().await; + black_box(val) + })); + } + for handle in thread_handles { + handle.await.unwrap(); + } + })); + for handle in threads_handles { + handle.await.unwrap(); + } + } +} + +criterion_group!( + benches, + spawn_tasks_current_thread, + spawn_tasks_current_thread_parallel, + spawn_tasks, + spawn_tasks_parallel +); +criterion_main!(benches); diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 30b17c0e8ed..3e57aa8a374 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -132,7 +132,7 @@ impl CurrentThread { let handle = Arc::new(Handle { shared: Shared { inject: Inject::new(), - owned: OwnedTasks::new(), + owned: OwnedTasks::new(1), woken: AtomicBool::new(false), config, scheduler_metrics: SchedulerMetrics::new(), diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8f3181ffcc3..418f6c9ad5a 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -287,7 +287,7 @@ pub(super) fn create( remotes: remotes.into_boxed_slice(), inject, idle, - owned: OwnedTasks::new(), + owned: OwnedTasks::new(16), synced: Mutex::new(Synced { idle: idle_synced, inject: inject_synced, diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index d62ea965659..12d5dbc0360 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -179,6 +179,9 @@ pub(crate) struct Header { /// The tracing ID for this instrumented task. #[cfg(all(tokio_unstable, feature = "tracing"))] pub(super) tracing_id: Option, + + /// The task's ID, is used for deciding which list to put it in. + pub(super) task_id: Id, } unsafe impl Send for Header {} @@ -216,12 +219,14 @@ impl Cell { fn new_header( state: State, vtable: &'static Vtable, + task_id: Id, #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id: Option, ) -> Header { Header { state, queue_next: UnsafeCell::new(None), vtable, + task_id, owner_id: UnsafeCell::new(None), #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, @@ -235,6 +240,7 @@ impl Cell { header: new_header( state, vtable, + task_id, #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, ), diff --git a/tokio/src/runtime/task/id.rs b/tokio/src/runtime/task/id.rs index 2b0d95c0243..dd5ae504582 100644 --- a/tokio/src/runtime/task/id.rs +++ b/tokio/src/runtime/task/id.rs @@ -24,7 +24,7 @@ use std::fmt; #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] #[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] -pub struct Id(u64); +pub struct Id(pub(crate) u64); /// Returns the [`Id`] of the currently running task. /// diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 3a1fcce2ec4..373a47de8e0 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -8,12 +8,14 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task}; -use crate::util::linked_list::{CountedLinkedList, Link, LinkedList}; +use crate::util::linked_list::{Link, LinkedList}; use std::marker::PhantomData; use std::num::NonZeroU64; +use std::sync::atomic::AtomicBool; // The id from the module below is used to verify whether a given task is stored // in this OwnedTasks, or some other task. The counter starts at one so we can @@ -55,12 +57,14 @@ cfg_not_has_atomic_u64! { } pub(crate) struct OwnedTasks { - inner: Mutex>, + lists: Vec>>, pub(crate) id: NonZeroU64, + closed: AtomicBool, + grain: usize, + count: AtomicUsize, } struct CountedOwnedTasksInner { - list: CountedLinkedList, as Link>::Target>, - closed: bool, + list: LinkedList, as Link>::Target>, } pub(crate) struct LocalOwnedTasks { inner: UnsafeCell>, @@ -73,13 +77,25 @@ struct OwnedTasksInner { } impl OwnedTasks { - pub(crate) fn new() -> Self { + /// grain must be an integer power of 2 + pub(crate) fn new(grain: usize) -> Self { + assert_eq!( + grain & (grain - 1), + 0, + "grain must be an integer power of 2" + ); + let mut lists = Vec::with_capacity(grain); + for _ in 0..grain { + lists.push(Mutex::new(CountedOwnedTasksInner { + list: LinkedList::new(), + })) + } Self { - inner: Mutex::new(CountedOwnedTasksInner { - list: CountedLinkedList::new(), - closed: false, - }), + lists, + closed: AtomicBool::new(false), id: get_next_id(), + grain, + count: AtomicUsize::new(0), } } @@ -97,12 +113,17 @@ impl OwnedTasks { T::Output: Send + 'static, { let (task, notified, join) = super::new_task(task, scheduler, id); - let notified = unsafe { self.bind_inner(task, notified) }; + let notified = unsafe { self.bind_inner(task, notified, id) }; (join, notified) } /// The part of `bind` that's the same for every type of future. - unsafe fn bind_inner(&self, task: Task, notified: Notified) -> Option> + unsafe fn bind_inner( + &self, + task: Task, + notified: Notified, + id: super::Id, + ) -> Option> where S: Schedule, { @@ -111,17 +132,16 @@ impl OwnedTasks { // to the field. task.header().set_owner_id(self.id); } - - let mut lock = self.inner.lock(); - if lock.closed { - drop(lock); - drop(notified); + // check close flag + if self.closed.load(Ordering::Relaxed) { task.shutdown(); - None - } else { - lock.list.push_front(task); - Some(notified) + return None; } + + let mut lock = self.lists[id.0 as usize & (self.grain - 1)].lock(); + lock.list.push_front(task); + self.count.fetch_add(1, Ordering::Relaxed); + Some(notified) } /// Asserts that the given task is owned by this OwnedTasks and convert it to @@ -129,7 +149,6 @@ impl OwnedTasks { #[inline] pub(crate) fn assert_owner(&self, task: Notified) -> LocalNotified { debug_assert_eq!(task.header().get_owner_id(), Some(self.id)); - // safety: All tasks bound to this OwnedTasks are Send, so it is safe // to poll it on this thread no matter what thread we are on. LocalNotified { @@ -146,28 +165,35 @@ impl OwnedTasks { { // The first iteration of the loop was unrolled so it can set the // closed bool. - let first_task = { - let mut lock = self.inner.lock(); - lock.closed = true; - lock.list.pop_back() - }; - match first_task { - Some(task) => task.shutdown(), - None => return, - } + self.closed.fetch_and(true, Ordering::SeqCst); - loop { - let task = match self.inner.lock().list.pop_back() { - Some(task) => task, - None => return, + for inner in &self.lists { + let first_task = { + let mut lock = inner.lock(); + lock.list.pop_back() }; + match first_task { + Some(task) => { + self.count.fetch_sub(1, Ordering::Relaxed); + task.shutdown(); + } + None => return, + } - task.shutdown(); + loop { + match inner.lock().list.pop_back() { + Some(task) => { + self.count.fetch_sub(1, Ordering::Relaxed); + task.shutdown(); + } + None => return, + }; + } } } pub(crate) fn active_tasks_count(&self) -> usize { - self.inner.lock().list.count() + self.count.load(Ordering::Relaxed) } pub(crate) fn remove(&self, task: &Task) -> Option> { @@ -179,11 +205,23 @@ impl OwnedTasks { // safety: We just checked that the provided task is not in some other // linked list. - unsafe { self.inner.lock().list.remove(task.header_ptr()) } + unsafe { + match self.lists[(task.header().task_id.0) as usize & (self.grain - 1)] + .lock() + .list + .remove(task.header_ptr()) + { + Some(t) => { + self.count.fetch_sub(1, Ordering::Relaxed); + Some(t) + } + None => None, + } + } } pub(crate) fn is_empty(&self) -> bool { - self.inner.lock().list.is_empty() + self.count.load(Ordering::SeqCst) == 0 } } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 1f9bdf4b811..a4a1a42af9d 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -228,53 +228,6 @@ impl fmt::Debug for LinkedList { } } -// ===== impl CountedLinkedList ==== - -// Delegates operations to the base LinkedList implementation, and adds a counter to the elements -// in the list. -pub(crate) struct CountedLinkedList { - list: LinkedList, - count: usize, -} - -impl CountedLinkedList { - pub(crate) fn new() -> CountedLinkedList { - CountedLinkedList { - list: LinkedList::new(), - count: 0, - } - } - - pub(crate) fn push_front(&mut self, val: L::Handle) { - self.list.push_front(val); - self.count += 1; - } - - pub(crate) fn pop_back(&mut self) -> Option { - let val = self.list.pop_back(); - if val.is_some() { - self.count -= 1; - } - val - } - - pub(crate) fn is_empty(&self) -> bool { - self.list.is_empty() - } - - pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { - let val = self.list.remove(node); - if val.is_some() { - self.count -= 1; - } - val - } - - pub(crate) fn count(&self) -> usize { - self.count - } -} - #[cfg(any( feature = "fs", feature = "rt", @@ -796,26 +749,6 @@ pub(crate) mod tests { } } - #[test] - fn count() { - let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new(); - assert_eq!(0, list.count()); - - let a = entry(5); - let b = entry(7); - list.push_front(a.as_ref()); - list.push_front(b.as_ref()); - assert_eq!(2, list.count()); - - list.pop_back(); - assert_eq!(1, list.count()); - - unsafe { - list.remove(ptr(&b)); - } - assert_eq!(0, list.count()); - } - /// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module. #[cfg(fuzzing)] pub fn fuzz_linked_list(ops: &[u8]) {