diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index f4dc116539b..777c13d8a83 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -1,5 +1,7 @@ //! This file contains mocks of the types in src/runtime/metrics +use std::thread::ThreadId; + pub(crate) struct SchedulerMetrics {} pub(crate) struct WorkerMetrics {} @@ -30,6 +32,7 @@ impl WorkerMetrics { } pub(crate) fn set_queue_depth(&self, _len: usize) {} + pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {} } impl MetricsBatch { diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 5bb79927a82..f01a720200a 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -2,6 +2,7 @@ use crate::runtime::Handle; cfg_unstable_metrics! { use std::ops::Range; + use std::thread::ThreadId; cfg_64bit_metrics! { use std::sync::atomic::Ordering::Relaxed; } @@ -127,6 +128,49 @@ impl RuntimeMetrics { self.handle.inner.num_idle_blocking_threads() } + /// Returns the thread id of the given worker thread. + /// + /// The returned value is `None` if the worker thread has not yet finished + /// starting up. + /// + /// If additional information about the thread, such as its native id, are + /// required, those can be collected in [`on_thread_start`] and correlated + /// using the thread id. + /// + /// [`on_thread_start`]: crate::runtime::Builder::on_thread_start + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let id = metrics.worker_thread_id(0); + /// println!("worker 0 has id {:?}", id); + /// } + /// ``` + pub fn worker_thread_id(&self, worker: usize) -> Option { + self.handle + .inner + .worker_metrics(worker) + .thread_id() + } + cfg_64bit_metrics! { /// Returns the number of tasks spawned in this runtime since it was created. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 02dddc85247..29804a08798 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -2,6 +2,8 @@ use crate::runtime::metrics::Histogram; use crate::runtime::Config; use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize}; use std::sync::atomic::Ordering::Relaxed; +use std::sync::Mutex; +use std::thread::ThreadId; /// Retrieve runtime worker metrics. /// @@ -49,6 +51,9 @@ pub(crate) struct WorkerMetrics { /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, + + /// Thread id of worker thread. + thread_id: Mutex>, } impl WorkerMetrics { @@ -72,4 +77,12 @@ impl WorkerMetrics { pub(crate) fn set_queue_depth(&self, len: usize) { self.queue_depth.store(len, Relaxed); } + + pub(crate) fn thread_id(&self) -> Option { + *self.thread_id.lock().unwrap() + } + + pub(crate) fn set_thread_id(&self, thread_id: ThreadId) { + *self.thread_id.lock().unwrap() = Some(thread_id); + } } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 4a3d849d264..5c952061e72 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -11,12 +11,12 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; -use std::fmt; use std::future::Future; use std::sync::atomic::Ordering::{AcqRel, Release}; use std::task::Poll::{Pending, Ready}; use std::task::Waker; use std::time::Duration; +use std::{fmt, thread}; /// Executes tasks on the current thread pub(crate) struct CurrentThread { @@ -123,6 +123,7 @@ impl CurrentThread { config: Config, ) -> (CurrentThread, Arc) { let worker_metrics = WorkerMetrics::from_config(&config); + worker_metrics.set_thread_id(thread::current().id()); // Get the configured global queue interval, or use the default. let global_queue_interval = config @@ -172,6 +173,10 @@ impl CurrentThread { // available or the future is complete. loop { if let Some(core) = self.take_core(handle) { + handle + .shared + .worker_metrics + .set_thread_id(thread::current().id()); return core.block_on(future); } else { let notified = self.notify.notified(); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index d71e62df53a..9b521efc9b1 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -72,6 +72,7 @@ use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::RefCell; use std::task::Waker; +use std::thread; use std::time::Duration; cfg_unstable_metrics! { @@ -334,6 +335,12 @@ where if let Some(cx) = maybe_cx { if self.take_core { let core = cx.worker.core.take(); + + if core.is_some() { + cx.worker.handle.shared.worker_metrics[cx.worker.index] + .set_thread_id(thread::current().id()); + } + let mut cx_core = cx.core.borrow_mut(); assert!(cx_core.is_none()); *cx_core = core; @@ -482,6 +489,8 @@ fn run(worker: Arc) { None => return, }; + worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id()); + let handle = scheduler::Handle::MultiThread(worker.handle.clone()); crate::runtime::context::enter_runtime(&handle, true, |_| { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 8f07b84297a..9f6af9d4d7a 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -70,9 +70,9 @@ use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::{Cell, RefCell}; -use std::cmp; use std::task::Waker; use std::time::Duration; +use std::{cmp, thread}; cfg_unstable_metrics! { mod metrics; @@ -569,6 +569,7 @@ impl Worker { } }; + cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id()); core.stats.start_processing_scheduled_tasks(&mut self.stats); if let Some(task) = maybe_task { diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 2e51edee4d9..a6438fb1b3d 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -10,6 +10,7 @@ use std::future::Future; use std::sync::{Arc, Barrier, Mutex}; use std::task::Poll; +use std::thread; use tokio::macros::support::poll_fn; use tokio::runtime::Runtime; @@ -150,6 +151,68 @@ fn remote_schedule_count() { assert_eq!(1, rt.metrics().remote_schedule_count()); } +#[test] +fn worker_thread_id_current_thread() { + let rt = current_thread(); + let metrics = rt.metrics(); + + // Check that runtime is on this thread. + rt.block_on(async {}); + assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0)); + + // Move runtime to another thread. + let thread_id = std::thread::scope(|scope| { + let join_handle = scope.spawn(|| { + rt.block_on(async {}); + }); + join_handle.thread().id() + }); + assert_eq!(Some(thread_id), metrics.worker_thread_id(0)); + + // Move runtime back to this thread. + rt.block_on(async {}); + assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0)); +} + +#[test] +fn worker_thread_id_threaded() { + let rt = threaded(); + let metrics = rt.metrics(); + + rt.block_on(rt.spawn(async move { + // Check that we are running on a worker thread and determine + // the index of our worker. + let thread_id = std::thread::current().id(); + let this_worker = (0..2) + .position(|w| metrics.worker_thread_id(w) == Some(thread_id)) + .expect("task not running on any worker thread"); + + // Force worker to another thread. + let moved_thread_id = tokio::task::block_in_place(|| { + assert_eq!(thread_id, std::thread::current().id()); + + // Wait for worker to move to another thread. + for _ in 0..100 { + let new_id = metrics.worker_thread_id(this_worker).unwrap(); + if thread_id != new_id { + return new_id; + } + std::thread::sleep(Duration::from_millis(100)); + } + + panic!("worker did not move to new thread"); + }); + + // After blocking task worker either stays on new thread or + // is moved back to current thread. + assert!( + metrics.worker_thread_id(this_worker) == Some(moved_thread_id) + || metrics.worker_thread_id(this_worker) == Some(thread_id) + ); + })) + .unwrap() +} + #[test] fn worker_park_count() { let rt = current_thread();