Skip to content

Commit

Permalink
metrics: add worker thread id (#6695)
Browse files Browse the repository at this point in the history
  • Loading branch information
surban authored Jul 23, 2024
1 parent b69f16a commit 90b23a9
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 2 deletions.
3 changes: 3 additions & 0 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! This file contains mocks of the types in src/runtime/metrics
use std::thread::ThreadId;

pub(crate) struct SchedulerMetrics {}

pub(crate) struct WorkerMetrics {}
Expand Down Expand Up @@ -30,6 +32,7 @@ impl WorkerMetrics {
}

pub(crate) fn set_queue_depth(&self, _len: usize) {}
pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {}
}

impl MetricsBatch {
Expand Down
44 changes: 44 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::runtime::Handle;

cfg_unstable_metrics! {
use std::ops::Range;
use std::thread::ThreadId;
cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}
Expand Down Expand Up @@ -127,6 +128,49 @@ impl RuntimeMetrics {
self.handle.inner.num_idle_blocking_threads()
}

/// Returns the thread id of the given worker thread.
///
/// The returned value is `None` if the worker thread has not yet finished
/// starting up.
///
/// If additional information about the thread, such as its native id, are
/// required, those can be collected in [`on_thread_start`] and correlated
/// using the thread id.
///
/// [`on_thread_start`]: crate::runtime::Builder::on_thread_start
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let id = metrics.worker_thread_id(0);
/// println!("worker 0 has id {:?}", id);
/// }
/// ```
pub fn worker_thread_id(&self, worker: usize) -> Option<ThreadId> {
self.handle
.inner
.worker_metrics(worker)
.thread_id()
}

cfg_64bit_metrics! {
/// Returns the number of tasks spawned in this runtime since it was created.
///
Expand Down
13 changes: 13 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::runtime::metrics::Histogram;
use crate::runtime::Config;
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Mutex;
use std::thread::ThreadId;

/// Retrieve runtime worker metrics.
///
Expand Down Expand Up @@ -49,6 +51,9 @@ pub(crate) struct WorkerMetrics {

/// If `Some`, tracks the number of polls by duration range.
pub(super) poll_count_histogram: Option<Histogram>,

/// Thread id of worker thread.
thread_id: Mutex<Option<ThreadId>>,
}

impl WorkerMetrics {
Expand All @@ -72,4 +77,12 @@ impl WorkerMetrics {
pub(crate) fn set_queue_depth(&self, len: usize) {
self.queue_depth.store(len, Relaxed);
}

pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
}

pub(crate) fn set_thread_id(&self, thread_id: ThreadId) {
*self.thread_id.lock().unwrap() = Some(thread_id);
}
}
7 changes: 6 additions & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::task::Poll::{Pending, Ready};
use std::task::Waker;
use std::time::Duration;
use std::{fmt, thread};

/// Executes tasks on the current thread
pub(crate) struct CurrentThread {
Expand Down Expand Up @@ -123,6 +123,7 @@ impl CurrentThread {
config: Config,
) -> (CurrentThread, Arc<Handle>) {
let worker_metrics = WorkerMetrics::from_config(&config);
worker_metrics.set_thread_id(thread::current().id());

// Get the configured global queue interval, or use the default.
let global_queue_interval = config
Expand Down Expand Up @@ -172,6 +173,10 @@ impl CurrentThread {
// available or the future is complete.
loop {
if let Some(core) = self.take_core(handle) {
handle
.shared
.worker_metrics
.set_thread_id(thread::current().id());
return core.block_on(future);
} else {
let notified = self.notify.notified();
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::RefCell;
use std::task::Waker;
use std::thread;
use std::time::Duration;

cfg_unstable_metrics! {
Expand Down Expand Up @@ -334,6 +335,12 @@ where
if let Some(cx) = maybe_cx {
if self.take_core {
let core = cx.worker.core.take();

if core.is_some() {
cx.worker.handle.shared.worker_metrics[cx.worker.index]
.set_thread_id(thread::current().id());
}

let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
Expand Down Expand Up @@ -482,6 +489,8 @@ fn run(worker: Arc<Worker>) {
None => return,
};

worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());

let handle = scheduler::Handle::MultiThread(worker.handle.clone());

crate::runtime::context::enter_runtime(&handle, true, |_| {
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::{Cell, RefCell};
use std::cmp;
use std::task::Waker;
use std::time::Duration;
use std::{cmp, thread};

cfg_unstable_metrics! {
mod metrics;
Expand Down Expand Up @@ -569,6 +569,7 @@ impl Worker {
}
};

cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
core.stats.start_processing_scheduled_tasks(&mut self.stats);

if let Some(task) = maybe_task {
Expand Down
63 changes: 63 additions & 0 deletions tokio/tests/rt_unstable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::future::Future;
use std::sync::{Arc, Barrier, Mutex};
use std::task::Poll;
use std::thread;
use tokio::macros::support::poll_fn;

use tokio::runtime::Runtime;
Expand Down Expand Up @@ -150,6 +151,68 @@ fn remote_schedule_count() {
assert_eq!(1, rt.metrics().remote_schedule_count());
}

#[test]
fn worker_thread_id_current_thread() {
let rt = current_thread();
let metrics = rt.metrics();

// Check that runtime is on this thread.
rt.block_on(async {});
assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));

// Move runtime to another thread.
let thread_id = std::thread::scope(|scope| {
let join_handle = scope.spawn(|| {
rt.block_on(async {});
});
join_handle.thread().id()
});
assert_eq!(Some(thread_id), metrics.worker_thread_id(0));

// Move runtime back to this thread.
rt.block_on(async {});
assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));
}

#[test]
fn worker_thread_id_threaded() {
let rt = threaded();
let metrics = rt.metrics();

rt.block_on(rt.spawn(async move {
// Check that we are running on a worker thread and determine
// the index of our worker.
let thread_id = std::thread::current().id();
let this_worker = (0..2)
.position(|w| metrics.worker_thread_id(w) == Some(thread_id))
.expect("task not running on any worker thread");

// Force worker to another thread.
let moved_thread_id = tokio::task::block_in_place(|| {
assert_eq!(thread_id, std::thread::current().id());

// Wait for worker to move to another thread.
for _ in 0..100 {
let new_id = metrics.worker_thread_id(this_worker).unwrap();
if thread_id != new_id {
return new_id;
}
std::thread::sleep(Duration::from_millis(100));
}

panic!("worker did not move to new thread");
});

// After blocking task worker either stays on new thread or
// is moved back to current thread.
assert!(
metrics.worker_thread_id(this_worker) == Some(moved_thread_id)
|| metrics.worker_thread_id(this_worker) == Some(thread_id)
);
}))
.unwrap()
}

#[test]
fn worker_park_count() {
let rt = current_thread();
Expand Down

0 comments on commit 90b23a9

Please sign in to comment.