Skip to content

Commit

Permalink
runtime: only mitigate false sharing for multi-threaded runtimes
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Dec 22, 2023
1 parent e7214e3 commit 3b9535b
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 98 deletions.
11 changes: 11 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,17 @@ impl task::Schedule for Arc<Handle> {
fn yield_now(&self, task: Notified) {
self.schedule_task(task, true);
}

fn min_align(&self) -> usize {
use crate::util::cacheline::CachePadded;

// One for single-threaded runtime, otherwise use a high value to avoid
// false sharing.
match self.shared.remotes.len() {
1 => 1,
_ => std::mem::align_of::<CachePadded<()>>(),
}
}
}

impl Handle {
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,17 @@ impl task::Schedule for Arc<Handle> {
fn yield_now(&self, task: Notified) {
self.shared.schedule_task(task, true);
}

fn min_align(&self) -> usize {
use crate::util::cacheline::CachePadded;

// One for single-threaded runtime, otherwise use a high value to avoid
// false sharing.
match self.shared.remotes.len() {
1 => 1,
_ => std::mem::align_of::<CachePadded<()>>(),
}
}
}

impl AsMut<Synced> for Synced {
Expand Down
96 changes: 7 additions & 89 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::loom::cell::UnsafeCell;
use crate::runtime::context;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Id, Schedule};
use crate::runtime::task::{Id, Schedule, TaskBox};
use crate::util::linked_list;

use std::num::NonZeroU64;
Expand All @@ -30,87 +30,6 @@ use std::task::{Context, Poll, Waker};
/// Any changes to the layout of this struct _must_ also be reflected in the
/// const fns in raw.rs.
///
// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
// from crossbeam-utils/src/cache_padded.rs
//
// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
// lines at a time, so we have to align to 128 bytes rather than 64.
//
// Sources:
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
//
// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
//
// Sources:
// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
//
// powerpc64 has 128-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "sparc",
target_arch = "hexagon",
),
repr(align(32))
)]
// m68k has 16-byte cache line size.
//
// Sources:
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
#[cfg_attr(target_arch = "m68k", repr(align(16)))]
// s390x has 256-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
// x86, riscv, wasm, and sparc64 have 64-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
//
// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "sparc",
target_arch = "hexagon",
target_arch = "m68k",
target_arch = "s390x",
)),
repr(align(64))
)]
#[repr(C)]
pub(super) struct Cell<T: Future, S> {
/// Hot task state data
Expand Down Expand Up @@ -205,7 +124,7 @@ pub(super) enum Stage<T: Future> {
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> TaskBox<T, S> {
// Separated into a non-generic function to reduce LLVM codegen
fn new_header(
state: State,
Expand All @@ -225,22 +144,21 @@ impl<T: Future, S: Schedule> Cell<T, S> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let tracing_id = future.id();
let vtable = raw::vtable::<T, S>();
let result = Box::new(Cell {
header: new_header(
let result = TaskBox::new(
new_header(
state,
vtable,
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing_id,
),
core: Core {
Core {
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
},
trailer: Trailer::new(),
});
);

#[cfg(debug_assertions)]
{
Expand Down Expand Up @@ -459,7 +377,7 @@ impl Header {
}

impl Trailer {
fn new() -> Self {
pub(super) fn new() -> Self {
Trailer {
waker: UnsafeCell::new(None),
owned: linked_list::Pointers::new(),
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, Header, Trailer};
use crate::runtime::task::state::{Snapshot, State};
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task, TaskBox};

use std::any::Any;
use std::mem;
Expand Down Expand Up @@ -269,7 +269,7 @@ where
// are allowed to be dangling after their last use, even if the
// reference has not yet gone out of scope.
unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
drop(TaskBox::from_raw(self.cell));
}
}

Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ pub(crate) use self::raw::RawTask;
mod state;
use self::state::State;

mod task_box;
use self::task_box::TaskBox;

mod waker;

cfg_taskdump! {
Expand Down Expand Up @@ -272,6 +275,15 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
self.schedule(task);
}

/// The minimum alignment for tasks spawned on this runtime.
///
/// This is used by the multi-threaded runtime to avoid false sharing.
///
/// The same scheduler must always return the same value.
fn min_align(&self) -> usize {
1
}

/// Polling the task resulted in a panic. Should the runtime shutdown?
fn unhandled_panic(&self) {
// By default, do nothing. This maintains the 1.0 behavior.
Expand Down
7 changes: 3 additions & 4 deletions tokio/src/runtime/task/raw.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::future::Future;
use crate::runtime::task::core::{Core, Trailer};
use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State};
use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State, TaskBox};

use std::ptr::NonNull;
use std::task::{Poll, Waker};
Expand Down Expand Up @@ -162,10 +162,9 @@ impl RawTask {
T: Future,
S: Schedule,
{
let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
let ptr = unsafe { NonNull::new_unchecked(ptr.cast()) };
let ptr = TaskBox::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));

RawTask { ptr }
RawTask { ptr: ptr.cast() }
}

pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask {
Expand Down
124 changes: 124 additions & 0 deletions tokio/src/runtime/task/task_box.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Helper module for allocating and deallocating tasks.
use crate::runtime::task::core::{Cell, Core, Header, Trailer};
use crate::runtime::task::Schedule;

use std::alloc::{alloc, dealloc, handle_alloc_error, Layout};
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{align_of, size_of, ManuallyDrop};
use std::ptr::{drop_in_place, NonNull};

fn layout_of<T: Future, S: Schedule>(scheduler: &S) -> Layout {
let size = std::mem::size_of::<Cell<T, S>>();
let mut align = std::mem::align_of::<Cell<T, S>>();
let min_align = scheduler.min_align();
if align < min_align {
align = min_align;
}
match Layout::from_size_align(size, align) {
Ok(layout) => layout,
Err(_) => panic!("Failed to build layout of type."),
}
}

/// A `Box<Cell<T, S>>` with an alignment of at least `s.min_align()`.
pub(super) struct TaskBox<T: Future, S: Schedule> {
ptr: NonNull<Cell<T, S>>,
_phantom: PhantomData<Cell<T, S>>,
}

impl<T: Future, S: Schedule> TaskBox<T, S> {
/// Creates a new task allocation.
pub(super) fn new(header: Header, core: Core<T, S>) -> Self {
let layout = layout_of::<T, S>(&core.scheduler);

assert_eq!(size_of::<Cell<T, S>>(), layout.size());
assert_ne!(size_of::<Cell<T, S>>(), 0);
assert!(align_of::<Cell<T, S>>() <= layout.align());

// SAFETY: The size of `layout` is non-zero as checked above.
let ptr = unsafe { alloc(layout) } as *mut Cell<T, S>;

let ptr = match NonNull::new(ptr) {
Some(ptr) => ptr,
None => handle_alloc_error(layout),
};

// SAFETY: We just allocated memory with the same size and a compatible
// alignment for `Cell<T, S>`.
unsafe {
ptr.as_ptr().write(Cell {
header,
core,
trailer: Trailer::new(),
});
};

Self {
ptr,
_phantom: PhantomData,
}
}

/// Convert this allocation into a raw pointer.
pub(super) fn into_raw(self) -> NonNull<Cell<T, S>> {
let me = ManuallyDrop::new(self);
me.ptr
}

/// Convert this allocation back into a `TaskBox`.
///
/// # Safety
///
/// The provided pointer must originate from a previous call to `into_raw`,
/// and the raw pointer must not be used again after this call.
pub(super) unsafe fn from_raw(ptr: NonNull<Cell<T, S>>) -> Self {
Self {
ptr,
_phantom: PhantomData,
}
}
}

impl<T: Future, S: Schedule> std::ops::Deref for TaskBox<T, S> {
type Target = Cell<T, S>;

fn deref(&self) -> &Cell<T, S> {
// SAFETY: This box always points at a valid cell.
unsafe { &*self.ptr.as_ptr() }
}
}

impl<T: Future, S: Schedule> Drop for TaskBox<T, S> {
fn drop(&mut self) {
let ptr = self.ptr.as_ptr();

// SAFETY: The task is still valid, so we can dereference the pointer.
let layout = layout_of::<T, S>(unsafe { &(*ptr).core.scheduler });

// SAFETY: The pointer was allocated with this layout. (The return value
// of `min_align` doesn't change.)
let _drop_helper = DropHelper {
layout,
ptr: ptr as *mut u8,
};

// SAFETY: A task box contains a pointer to a valid cell, and we have
// not dropped the allocation yet.
unsafe { drop_in_place(self.ptr.as_ptr()) };
}
}

struct DropHelper {
ptr: *mut u8,
layout: Layout,
}

impl Drop for DropHelper {
#[inline]
fn drop(&mut self) {
// SAFETY: See `TaskBox::drop`.
unsafe { dealloc(self.ptr, self.layout) };
}
}
Loading

0 comments on commit 3b9535b

Please sign in to comment.