Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio: remove ref of scheduler in owned task #5992

Closed
7 changes: 7 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"
rand = "0.8"
rand_chacha = "0.3"
criterion = "0.5.1"
num_cpus = "1.16.0"

[dev-dependencies]
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
Expand Down Expand Up @@ -84,3 +86,8 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "spawn2"
path = "spawn2.rs"
harness = false
93 changes: 93 additions & 0 deletions benches/spawn2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::time::Instant;

use criterion::*;

fn spawn_tasks_current_thread(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("spawn_tasks_current_thread", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, 1).await);
});
start.elapsed()
})
});
}

fn spawn_tasks_current_thread_parallel(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("spawn_tasks_current_thread_parallel", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, num_cpus::get_physical()).await);
});
start.elapsed()
})
});
}

fn spawn_tasks(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();

c.bench_function("spawn_tasks", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, 1).await);
});
start.elapsed()
})
});
}

fn spawn_tasks_parallel(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
c.bench_function("spawn_tasks_parallel", move |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(job(iters as usize, num_cpus::get_physical()).await);
});
start.elapsed()
})
});
}

async fn job(iters: usize, procs: usize) {
for _ in 0..procs {
let mut threads_handles = Vec::with_capacity(procs);
threads_handles.push(tokio::spawn(async move {
let mut thread_handles = Vec::with_capacity(iters / procs);
for _ in 0..iters / procs {
thread_handles.push(tokio::spawn(async {
let val = 1 + 1;
tokio::task::yield_now().await;
black_box(val)
}));
}
for handle in thread_handles {
handle.await.unwrap();
}
}));
for handle in threads_handles {
handle.await.unwrap();
}
}
}

criterion_group!(
benches,
spawn_tasks_current_thread,
spawn_tasks_current_thread_parallel,
spawn_tasks,
spawn_tasks_parallel
);
criterion_main!(benches);
6 changes: 3 additions & 3 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct Shared {
}

pub(crate) struct Task {
task: task::UnownedTask<BlockingSchedule>,
task: task::UnownedTask,
mandatory: Mandatory,
}

Expand Down Expand Up @@ -151,7 +151,7 @@ impl From<SpawnError> for io::Error {
}

impl Task {
pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task {
pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task {
Task { task, mandatory }
}

Expand Down Expand Up @@ -381,7 +381,7 @@ impl Spawner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
let (task, handle) = task::unowned(fut, Some(BlockingSchedule::new(rt)), id);

let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/blocking/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl BlockingSchedule {
}

impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
fn release(&self, _task: &Task) -> Option<Task> {
#[cfg(feature = "test-util")]
{
match &self.handle.inner {
Expand All @@ -54,7 +54,7 @@ impl task::Schedule for BlockingSchedule {
None
}

fn schedule(&self, _task: task::Notified<Self>) {
fn schedule(&self, _task: task::Notified) {
unreachable!();
}
}
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct Core {
/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue
inject: Inject<Arc<Handle>>,
inject: Inject,

/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Handle>>,
Expand Down Expand Up @@ -104,7 +104,7 @@ pub(crate) struct Context {
pub(crate) defer: Defer,
}

type Notified = task::Notified<Arc<Handle>>;
type Notified = task::Notified;

/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;
Expand Down Expand Up @@ -432,7 +432,7 @@ impl Handle {
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
let (handle, notified) = me.shared.owned.bind(future, None, id);

if let Some(notified) = notified {
me.schedule(notified);
Expand Down Expand Up @@ -564,11 +564,11 @@ impl fmt::Debug for Handle {
// ===== impl Shared =====

impl Schedule for Arc<Handle> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
fn release(&self, task: &Task) -> Option<Task> {
self.shared.owned.remove(task)
}

fn schedule(&self, task: task::Notified<Self>) {
fn schedule(&self, task: task::Notified) {
use scheduler::Context::CurrentThread;

context::with_scheduler(|maybe_cx| match maybe_cx {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/scheduler/inject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ cfg_metrics! {

/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
/// overflow queue when the local, fixed-size, array queue overflows.
pub(crate) struct Inject<T: 'static> {
shared: Shared<T>,
pub(crate) struct Inject {
shared: Shared,
synced: Mutex<Synced>,
}

impl<T: 'static> Inject<T> {
pub(crate) fn new() -> Inject<T> {
impl Inject {
pub(crate) fn new() -> Inject {
let (shared, synced) = Shared::new();

Inject {
Expand All @@ -54,13 +54,13 @@ impl<T: 'static> Inject<T> {
/// Pushes a value into the queue.
///
/// This does nothing if the queue is closed.
pub(crate) fn push(&self, task: task::Notified<T>) {
pub(crate) fn push(&self, task: task::Notified) {
let mut synced = self.synced.lock();
// safety: passing correct `Synced`
unsafe { self.shared.push(&mut synced, task) }
}

pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
pub(crate) fn pop(&self) -> Option<task::Notified> {
if self.shared.is_empty() {
return None;
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/inject/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Inject;

impl<T: 'static> Inject<T> {
impl Inject {
pub(crate) fn len(&self) -> usize {
self.shared.len()
}
Expand Down
23 changes: 8 additions & 15 deletions tokio/src/runtime/scheduler/inject/pop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@ use super::Synced;

use crate::runtime::task;

use std::marker::PhantomData;

pub(crate) struct Pop<'a, T: 'static> {
pub(crate) struct Pop<'a> {
len: usize,
synced: &'a mut Synced,
_p: PhantomData<T>,
}

impl<'a, T: 'static> Pop<'a, T> {
pub(super) fn new(len: usize, synced: &'a mut Synced) -> Pop<'a, T> {
Pop {
len,
synced,
_p: PhantomData,
}
impl<'a> Pop<'a> {
pub(super) fn new(len: usize, synced: &'a mut Synced) -> Pop<'a> {
Pop { len, synced }
}
}

impl<'a, T: 'static> Iterator for Pop<'a, T> {
type Item = task::Notified<T>;
impl<'a> Iterator for Pop<'a> {
type Item = task::Notified;

fn next(&mut self) -> Option<Self::Item> {
if self.len == 0 {
Expand All @@ -42,13 +35,13 @@ impl<'a, T: 'static> Iterator for Pop<'a, T> {
}
}

impl<'a, T: 'static> ExactSizeIterator for Pop<'a, T> {
impl<'a> ExactSizeIterator for Pop<'a> {
fn len(&self) -> usize {
self.len
}
}

impl<'a, T: 'static> Drop for Pop<'a, T> {
impl<'a> Drop for Pop<'a> {
fn drop(&mut self) {
for _ in self.by_ref() {}
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/inject/rt_multi_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl AsMut<Synced> for Synced {
}
}

impl<T: 'static> Shared<T> {
impl Shared {
/// Pushes several values into the queue.
///
/// # Safety
Expand All @@ -29,7 +29,7 @@ impl<T: 'static> Shared<T> {
pub(crate) unsafe fn push_batch<L, I>(&self, shared: L, mut iter: I)
where
L: Lock<Synced>,
I: Iterator<Item = task::Notified<T>>,
I: Iterator<Item = task::Notified>,
{
let first = match iter.next() {
Some(first) => first.into_raw(),
Expand Down Expand Up @@ -84,7 +84,7 @@ impl<T: 'static> Shared<T> {
while let Some(task) = curr {
curr = task.get_queue_next();

let _ = unsafe { task::Notified::<T>::from_raw(task) };
let _ = unsafe { task::Notified::from_raw(task) };
}

return;
Expand Down
20 changes: 8 additions & 12 deletions tokio/src/runtime/scheduler/inject/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@ use super::{Pop, Synced};
use crate::loom::sync::atomic::AtomicUsize;
use crate::runtime::task;

use std::marker::PhantomData;
use std::sync::atomic::Ordering::{Acquire, Release};

pub(crate) struct Shared<T: 'static> {
pub(crate) struct Shared {
/// Number of pending tasks in the queue. This helps prevent unnecessary
/// locking in the hot path.
pub(super) len: AtomicUsize,

_p: PhantomData<T>,
}

unsafe impl<T> Send for Shared<T> {}
unsafe impl<T> Sync for Shared<T> {}
unsafe impl Send for Shared {}
unsafe impl Sync for Shared {}

impl<T: 'static> Shared<T> {
pub(crate) fn new() -> (Shared<T>, Synced) {
impl Shared {
pub(crate) fn new() -> (Shared, Synced) {
let inject = Shared {
len: AtomicUsize::new(0),
_p: PhantomData,
};

let synced = Synced {
Expand Down Expand Up @@ -68,7 +64,7 @@ impl<T: 'static> Shared<T> {
/// # Safety
///
/// Must be called with the same `Synced` instance returned by `Inject::new`
pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified) {
if synced.is_closed {
return;
}
Expand Down Expand Up @@ -97,7 +93,7 @@ impl<T: 'static> Shared<T> {
/// # Safety
///
/// Must be called with the same `Synced` instance returned by `Inject::new`
pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified> {
self.pop_n(synced, 1).next()
}

Expand All @@ -106,7 +102,7 @@ impl<T: 'static> Shared<T> {
/// # Safety
///
/// Must be called with the same `Synced` instance returned by `Inject::new`
pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a> {
use std::cmp;

debug_assert!(n > 0);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/inject/synced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ unsafe impl Send for Synced {}
unsafe impl Sync for Synced {}

impl Synced {
pub(super) fn pop<T: 'static>(&mut self) -> Option<task::Notified<T>> {
pub(super) fn pop(&mut self) -> Option<task::Notified> {
let task = self.head?;

self.head = unsafe { task.get_queue_next() };
Expand Down
Loading