From 8f1fcb404a917022ef3e804f301e97b0f0b02864 Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Fri, 11 Oct 2024 12:52:35 +0800 Subject: [PATCH 01/12] stabilize worker total busy duration, bring WorkerMetrics, MetricsBatch and Histogram out of unstable flag --- tokio/src/runtime/metrics/histogram.rs | 27 +++--- tokio/src/runtime/metrics/mock.rs | 46 --------- tokio/src/runtime/metrics/mod.rs | 19 ++-- tokio/src/runtime/metrics/runtime.rs | 97 +++++++++---------- tokio/src/runtime/metrics/worker.rs | 2 + .../runtime/scheduler/current_thread/mod.rs | 10 +- tokio/src/runtime/scheduler/mod.rs | 14 +-- .../scheduler/multi_thread/handle/metrics.rs | 11 ++- tokio/tests/rt_metrics.rs | 44 +++++++++ tokio/tests/rt_unstable_metrics.rs | 43 -------- 10 files changed, 136 insertions(+), 177 deletions(-) diff --git a/tokio/src/runtime/metrics/histogram.rs b/tokio/src/runtime/metrics/histogram.rs index 4cfd769a94e..ec34806c18e 100644 --- a/tokio/src/runtime/metrics/histogram.rs +++ b/tokio/src/runtime/metrics/histogram.rs @@ -35,31 +35,34 @@ pub(crate) struct HistogramBatch { resolution: u64, } -cfg_unstable! { - /// Whether the histogram used to aggregate a metric uses a linear or - /// logarithmic scale. - #[derive(Debug, Copy, Clone, Eq, PartialEq)] - #[non_exhaustive] - pub enum HistogramScale { - /// Linear bucket scale - Linear, - - /// Logarithmic bucket scale - Log, - } +/// Whether the histogram used to aggregate a metric uses a linear or +/// logarithmic scale. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[non_exhaustive] +#[allow(unreachable_pub)] +pub enum HistogramScale { + /// Linear bucket scale + Linear, + + /// Logarithmic bucket scale + #[allow(dead_code)] + Log, } impl Histogram { + #[allow(dead_code)] pub(crate) fn num_buckets(&self) -> usize { self.buckets.len() } cfg_64bit_metrics! { + #[allow(dead_code)] pub(crate) fn get(&self, bucket: usize) -> u64 { self.buckets[bucket].load(Relaxed) } } + #[allow(dead_code)] pub(crate) fn bucket_range(&self, bucket: usize) -> Range { match self.scale { HistogramScale::Log => Range { diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 777c13d8a83..1cfbd538d6f 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -1,16 +1,7 @@ //! This file contains mocks of the types in src/runtime/metrics -use std::thread::ThreadId; - pub(crate) struct SchedulerMetrics {} -pub(crate) struct WorkerMetrics {} - -pub(crate) struct MetricsBatch {} - -#[derive(Clone, Default)] -pub(crate) struct HistogramBuilder {} - impl SchedulerMetrics { pub(crate) fn new() -> Self { Self {} @@ -20,40 +11,3 @@ impl SchedulerMetrics { pub(crate) fn inc_remote_schedule_count(&self) {} } -impl WorkerMetrics { - pub(crate) fn new() -> Self { - Self {} - } - - pub(crate) fn from_config(config: &crate::runtime::Config) -> Self { - // Prevent the dead-code warning from being triggered - let _ = &config.metrics_poll_count_histogram; - Self::new() - } - - pub(crate) fn set_queue_depth(&self, _len: usize) {} - pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {} -} - -impl MetricsBatch { - pub(crate) fn new(_: &WorkerMetrics) -> Self { - Self {} - } - - pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {} - pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn unparked(&mut self) {} - pub(crate) fn inc_local_schedule_count(&mut self) {} - pub(crate) fn start_processing_scheduled_tasks(&mut self) {} - pub(crate) fn end_processing_scheduled_tasks(&mut self) {} - pub(crate) fn start_poll(&mut self) {} - pub(crate) fn end_poll(&mut self) {} -} - -cfg_rt_multi_thread! { - impl MetricsBatch { - pub(crate) fn incr_steal_count(&mut self, _by: u16) {} - pub(crate) fn incr_steal_operations(&mut self) {} - pub(crate) fn incr_overflow_count(&mut self) {} - } -} diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 295c97cce88..8ed2e8e6d82 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -11,12 +11,16 @@ mod runtime; pub use runtime::RuntimeMetrics; -cfg_unstable_metrics! { - mod batch; - pub(crate) use batch::MetricsBatch; +mod worker; +pub(crate) use worker::WorkerMetrics; + +mod batch; +pub(crate) use batch::MetricsBatch; + +mod histogram; +pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; - mod histogram; - pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; +cfg_unstable_metrics! { #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use histogram::HistogramScale; @@ -24,9 +28,6 @@ cfg_unstable_metrics! { mod scheduler; pub(crate) use scheduler::SchedulerMetrics; - mod worker; - pub(crate) use worker::WorkerMetrics; - cfg_net! { mod io; pub(crate) use io::IoDriverMetrics; @@ -36,5 +37,5 @@ cfg_unstable_metrics! { cfg_not_unstable_metrics! { mod mock; - pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder}; + pub(crate) use mock::{SchedulerMetrics}; } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 008245ecc36..541aeb8fdd5 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,12 +1,10 @@ +use std::sync::atomic::Ordering::Relaxed; +use std::time::Duration; use crate::runtime::Handle; cfg_unstable_metrics! { use std::ops::Range; use std::thread::ThreadId; - cfg_64bit_metrics! { - use std::sync::atomic::Ordering::Relaxed; - } - use std::time::Duration; } /// Handle to the runtime's metrics. @@ -96,6 +94,51 @@ impl RuntimeMetrics { self.handle.inner.injection_queue_depth() } + /// Returns the amount of time the given worker thread has been busy. + /// + /// The worker busy duration starts at zero when the runtime is created and + /// increases whenever the worker is spending time processing work. Using + /// this value can indicate the load of the given worker. If a lot of time + /// is spent busy, then the worker is under load and will check for inbound + /// events less often. + /// + /// The timer is monotonically increasing. It is never decremented or reset + /// to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); + /// } + /// ``` + pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .busy_duration_total + .load(Relaxed); + Duration::from_nanos(nanos) + } cfg_unstable_metrics! { /// Returns the number of additional threads spawned by the runtime. @@ -543,52 +586,6 @@ impl RuntimeMetrics { .load(Relaxed) } - /// Returns the amount of time the given worker thread has been busy. - /// - /// The worker busy duration starts at zero when the runtime is created and - /// increases whenever the worker is spending time processing work. Using - /// this value can indicate the load of the given worker. If a lot of time - /// is spent busy, then the worker is under load and will check for inbound - /// events less often. - /// - /// The timer is monotonically increasing. It is never decremented or reset - /// to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_total_busy_duration(0); - /// println!("worker 0 was busy for a total of {:?}", n); - /// } - /// ``` - pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { - let nanos = self - .handle - .inner - .worker_metrics(worker) - .busy_duration_total - .load(Relaxed); - Duration::from_nanos(nanos) - } - /// Returns the number of tasks scheduled from **within** the runtime on the /// given worker's local queue. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 29804a08798..f690df44fdd 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -70,6 +70,7 @@ impl WorkerMetrics { WorkerMetrics::default() } + #[allow(dead_code)] pub(crate) fn queue_depth(&self) -> usize { self.queue_depth.load(Relaxed) } @@ -78,6 +79,7 @@ impl WorkerMetrics { self.queue_depth.store(len, Relaxed); } + #[allow(dead_code)] pub(crate) fn thread_id(&self) -> Option { *self.thread_id.lock().unwrap() } diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index b8cb5b46ca5..1483df73312 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -532,6 +532,11 @@ impl Handle { pub(crate) fn injection_queue_depth(&self) -> usize { self.shared.inject.len() } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + assert_eq!(0, worker); + &self.shared.worker_metrics + } } cfg_unstable_metrics! { @@ -540,11 +545,6 @@ cfg_unstable_metrics! { &self.shared.scheduler_metrics } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - assert_eq!(0, worker); - &self.shared.worker_metrics - } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { self.worker_metrics(worker).queue_depth() } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index ada8efbad63..1172a7b029a 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -11,6 +11,8 @@ cfg_rt! { use crate::runtime::TaskHooks; } +use crate::runtime::{driver, WorkerMetrics}; + cfg_rt_multi_thread! { mod block_in_place; pub(crate) use block_in_place::block_in_place; @@ -27,8 +29,6 @@ cfg_rt_multi_thread! { } } -use crate::runtime::driver; - #[derive(Debug, Clone)] pub(crate) enum Handle { #[cfg(feature = "rt")] @@ -193,10 +193,14 @@ cfg_rt! { pub(crate) fn injection_queue_depth(&self) -> usize { match_flavor!(self, Handle(handle) => handle.injection_queue_depth()) } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) + } } cfg_unstable_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + use crate::runtime::{SchedulerMetrics}; impl Handle { cfg_64bit_metrics! { @@ -217,10 +221,6 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.scheduler_metrics()) } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) - } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker)) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 21795dbbc2f..35fae9818fa 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,7 +1,8 @@ +use crate::runtime::WorkerMetrics; use super::Handle; cfg_unstable_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + use crate::runtime::{SchedulerMetrics}; } impl Handle { @@ -17,6 +18,10 @@ impl Handle { self.shared.injection_queue_depth() } + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + &self.shared.worker_metrics[worker] + } + cfg_unstable_metrics! { cfg_64bit_metrics! { pub(crate) fn spawned_tasks_count(&self) -> u64 { @@ -39,10 +44,6 @@ impl Handle { &self.shared.scheduler_metrics } - pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - &self.shared.worker_metrics[worker] - } - pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { self.shared.worker_local_queue_depth(worker) } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index e2494cf5e64..12a43669599 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -3,6 +3,7 @@ #![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))] use std::sync::{Arc, Barrier}; +use std::time::Duration; use tokio::runtime::Runtime; #[test] @@ -114,3 +115,46 @@ fn threaded() -> Runtime { .build() .unwrap() } + +#[test] +fn worker_total_busy_duration() { + const N: usize = 5; + + let zero = Duration::from_millis(0); + + let rt = current_thread(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + assert!(zero < metrics.worker_total_busy_duration(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + for i in 0..metrics.num_workers() { + assert!(zero < metrics.worker_total_busy_duration(i)); + } +} \ No newline at end of file diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 5503a9a5b03..603d474aacf 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -502,49 +502,6 @@ fn worker_poll_count_histogram_disabled_without_explicit_enable() { } } -#[test] -fn worker_total_busy_duration() { - const N: usize = 5; - - let zero = Duration::from_millis(0); - - let rt = current_thread(); - let metrics = rt.metrics(); - - rt.block_on(async { - for _ in 0..N { - tokio::spawn(async { - tokio::task::yield_now().await; - }) - .await - .unwrap(); - } - }); - - drop(rt); - - assert!(zero < metrics.worker_total_busy_duration(0)); - - let rt = threaded(); - let metrics = rt.metrics(); - - rt.block_on(async { - for _ in 0..N { - tokio::spawn(async { - tokio::task::yield_now().await; - }) - .await - .unwrap(); - } - }); - - drop(rt); - - for i in 0..metrics.num_workers() { - assert!(zero < metrics.worker_total_busy_duration(i)); - } -} - #[test] fn worker_local_schedule_count() { let rt = current_thread(); From 9b47cf92ecfbd7f6e2295d6ecde2d246ed94e78d Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Fri, 11 Oct 2024 12:57:21 +0800 Subject: [PATCH 02/12] Fix rustfmt ci job --- tokio/src/runtime/metrics/mock.rs | 1 - tokio/src/runtime/metrics/runtime.rs | 2 +- .../runtime/scheduler/multi_thread/handle/metrics.rs | 2 +- tokio/tests/rt_metrics.rs | 10 +++++----- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 1cfbd538d6f..8256e430c8b 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -10,4 +10,3 @@ impl SchedulerMetrics { /// Increment the number of tasks scheduled externally pub(crate) fn inc_remote_schedule_count(&self) {} } - diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 541aeb8fdd5..6b8e823eb54 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,6 +1,6 @@ +use crate::runtime::Handle; use std::sync::atomic::Ordering::Relaxed; use std::time::Duration; -use crate::runtime::Handle; cfg_unstable_metrics! { use std::ops::Range; diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 35fae9818fa..94c527bb31f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,5 +1,5 @@ -use crate::runtime::WorkerMetrics; use super::Handle; +use crate::runtime::WorkerMetrics; cfg_unstable_metrics! { use crate::runtime::{SchedulerMetrics}; diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 12a43669599..ce181cd9fd0 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -130,8 +130,8 @@ fn worker_total_busy_duration() { tokio::spawn(async { tokio::task::yield_now().await; }) - .await - .unwrap(); + .await + .unwrap(); } }); @@ -147,8 +147,8 @@ fn worker_total_busy_duration() { tokio::spawn(async { tokio::task::yield_now().await; }) - .await - .unwrap(); + .await + .unwrap(); } }); @@ -157,4 +157,4 @@ fn worker_total_busy_duration() { for i in 0..metrics.num_workers() { assert!(zero < metrics.worker_total_busy_duration(i)); } -} \ No newline at end of file +} From e2f4f33b447cfebe88b033a5afcdf17473c27c30 Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Fri, 11 Oct 2024 15:52:32 +0800 Subject: [PATCH 03/12] Fix various failing CI jobs by adding cfg(target_has_atomic = "64") to WorkerMetrics --- tokio/src/runtime/metrics/runtime.rs | 3 +++ tokio/src/runtime/scheduler/current_thread/mod.rs | 1 + tokio/src/runtime/scheduler/mod.rs | 8 ++++++-- .../src/runtime/scheduler/multi_thread/handle/metrics.rs | 2 ++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 6b8e823eb54..eebad51c98b 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,5 +1,7 @@ use crate::runtime::Handle; +#[cfg(target_has_atomic = "64")] use std::sync::atomic::Ordering::Relaxed; +#[cfg(target_has_atomic = "64")] use std::time::Duration; cfg_unstable_metrics! { @@ -130,6 +132,7 @@ impl RuntimeMetrics { /// println!("worker 0 was busy for a total of {:?}", n); /// } /// ``` + #[cfg(target_has_atomic = "64")] pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { let nanos = self .handle diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 1483df73312..417e6aeed14 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -533,6 +533,7 @@ impl Handle { self.shared.inject.len() } + #[cfg(target_has_atomic = "64")] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { assert_eq!(0, worker); &self.shared.worker_metrics diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 1172a7b029a..88b3398991e 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -9,9 +9,10 @@ cfg_rt! { pub(crate) use inject::Inject; use crate::runtime::TaskHooks; -} -use crate::runtime::{driver, WorkerMetrics}; + #[cfg(target_has_atomic = "64")] + use crate::runtime::{WorkerMetrics}; +} cfg_rt_multi_thread! { mod block_in_place; @@ -29,6 +30,8 @@ cfg_rt_multi_thread! { } } +use crate::runtime::driver; + #[derive(Debug, Clone)] pub(crate) enum Handle { #[cfg(feature = "rt")] @@ -194,6 +197,7 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.injection_queue_depth()) } + #[cfg(target_has_atomic = "64")] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 94c527bb31f..34ac1d2c5ea 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,4 +1,5 @@ use super::Handle; +#[cfg(target_has_atomic = "64")] use crate::runtime::WorkerMetrics; cfg_unstable_metrics! { @@ -18,6 +19,7 @@ impl Handle { self.shared.injection_queue_depth() } + #[cfg(target_has_atomic = "64")] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { &self.shared.worker_metrics[worker] } From b6974bac12d8a0e90ce45c8988f274c9cca61f83 Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Fri, 11 Oct 2024 21:21:54 +0800 Subject: [PATCH 04/12] Use cfg_64bit_metrics instead --- tokio/src/runtime/metrics/runtime.rs | 87 ++++++++++--------- .../runtime/scheduler/current_thread/mod.rs | 2 +- tokio/src/runtime/scheduler/mod.rs | 3 +- .../scheduler/multi_thread/handle/metrics.rs | 3 +- 4 files changed, 49 insertions(+), 46 deletions(-) diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index eebad51c98b..d670774f8ce 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,12 +1,15 @@ use crate::runtime::Handle; -#[cfg(target_has_atomic = "64")] -use std::sync::atomic::Ordering::Relaxed; -#[cfg(target_has_atomic = "64")] +#[allow(unused_imports)] use std::time::Duration; +cfg_64bit_metrics! { + use std::sync::atomic::Ordering::Relaxed; +} + cfg_unstable_metrics! { use std::ops::Range; use std::thread::ThreadId; + // use std::time::Duration; } /// Handle to the runtime's metrics. @@ -96,44 +99,44 @@ impl RuntimeMetrics { self.handle.inner.injection_queue_depth() } - /// Returns the amount of time the given worker thread has been busy. - /// - /// The worker busy duration starts at zero when the runtime is created and - /// increases whenever the worker is spending time processing work. Using - /// this value can indicate the load of the given worker. If a lot of time - /// is spent busy, then the worker is under load and will check for inbound - /// events less often. - /// - /// The timer is monotonically increasing. It is never decremented or reset - /// to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_total_busy_duration(0); - /// println!("worker 0 was busy for a total of {:?}", n); - /// } - /// ``` - #[cfg(target_has_atomic = "64")] - pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { + cfg_64bit_metrics! { + /// Returns the amount of time the given worker thread has been busy. + /// + /// The worker busy duration starts at zero when the runtime is created and + /// increases whenever the worker is spending time processing work. Using + /// this value can indicate the load of the given worker. If a lot of time + /// is spent busy, then the worker is under load and will check for inbound + /// events less often. + /// + /// The timer is monotonically increasing. It is never decremented or reset + /// to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); + /// } + /// ``` + pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { let nanos = self .handle .inner @@ -141,7 +144,9 @@ impl RuntimeMetrics { .busy_duration_total .load(Relaxed); Duration::from_nanos(nanos) + } } + cfg_unstable_metrics! { /// Returns the number of additional threads spawned by the runtime. diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 417e6aeed14..a6f106f1029 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -533,7 +533,7 @@ impl Handle { self.shared.inject.len() } - #[cfg(target_has_atomic = "64")] + #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { assert_eq!(0, worker); &self.shared.worker_metrics diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 88b3398991e..3b97eea25da 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -10,7 +10,6 @@ cfg_rt! { use crate::runtime::TaskHooks; - #[cfg(target_has_atomic = "64")] use crate::runtime::{WorkerMetrics}; } @@ -197,7 +196,7 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.injection_queue_depth()) } - #[cfg(target_has_atomic = "64")] + #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 34ac1d2c5ea..5cc3e160135 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,5 +1,4 @@ use super::Handle; -#[cfg(target_has_atomic = "64")] use crate::runtime::WorkerMetrics; cfg_unstable_metrics! { @@ -19,7 +18,7 @@ impl Handle { self.shared.injection_queue_depth() } - #[cfg(target_has_atomic = "64")] + #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { &self.shared.worker_metrics[worker] } From 86f019f8d391292487477d005e036ab6fb6ba9f4 Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Fri, 11 Oct 2024 22:00:27 +0800 Subject: [PATCH 05/12] Fix formatting and remove brackets --- tokio/src/runtime/metrics/mod.rs | 2 +- tokio/src/runtime/metrics/runtime.rs | 15 +++++++-------- tokio/src/runtime/scheduler/mod.rs | 4 ++-- .../scheduler/multi_thread/handle/metrics.rs | 2 +- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 8ed2e8e6d82..dbe7693faa5 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -37,5 +37,5 @@ cfg_unstable_metrics! { cfg_not_unstable_metrics! { mod mock; - pub(crate) use mock::{SchedulerMetrics}; + pub(crate) use mock::SchedulerMetrics; } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index d670774f8ce..69fcf3877e2 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -9,7 +9,6 @@ cfg_64bit_metrics! { cfg_unstable_metrics! { use std::ops::Range; use std::thread::ThreadId; - // use std::time::Duration; } /// Handle to the runtime's metrics. @@ -137,13 +136,13 @@ impl RuntimeMetrics { /// } /// ``` pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { - let nanos = self - .handle - .inner - .worker_metrics(worker) - .busy_duration_total - .load(Relaxed); - Duration::from_nanos(nanos) + let nanos = self + .handle + .inner + .worker_metrics(worker) + .busy_duration_total + .load(Relaxed); + Duration::from_nanos(nanos) } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 3b97eea25da..c0759419ff0 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -10,7 +10,7 @@ cfg_rt! { use crate::runtime::TaskHooks; - use crate::runtime::{WorkerMetrics}; + use crate::runtime::WorkerMetrics; } cfg_rt_multi_thread! { @@ -203,7 +203,7 @@ cfg_rt! { } cfg_unstable_metrics! { - use crate::runtime::{SchedulerMetrics}; + use crate::runtime::SchedulerMetrics; impl Handle { cfg_64bit_metrics! { diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 5cc3e160135..f010810c59b 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -2,7 +2,7 @@ use super::Handle; use crate::runtime::WorkerMetrics; cfg_unstable_metrics! { - use crate::runtime::{SchedulerMetrics}; + use crate::runtime::SchedulerMetrics; } impl Handle { From 64f626ddd2d87867842e2cc1e7fbd2287ee290bf Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Wed, 16 Oct 2024 22:34:07 +0800 Subject: [PATCH 06/12] Creat Mock Histogram, HistogramBatch and HistogramBuilder. Revert changes on histogram.rs and guard it behind unstable flag --- tokio/src/runtime/metrics/histogram.rs | 27 ++++++++++++-------------- tokio/src/runtime/metrics/mock.rs | 24 +++++++++++++++++++++++ tokio/src/runtime/metrics/mod.rs | 9 +++++---- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/tokio/src/runtime/metrics/histogram.rs b/tokio/src/runtime/metrics/histogram.rs index ec34806c18e..4cfd769a94e 100644 --- a/tokio/src/runtime/metrics/histogram.rs +++ b/tokio/src/runtime/metrics/histogram.rs @@ -35,34 +35,31 @@ pub(crate) struct HistogramBatch { resolution: u64, } -/// Whether the histogram used to aggregate a metric uses a linear or -/// logarithmic scale. -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -#[non_exhaustive] -#[allow(unreachable_pub)] -pub enum HistogramScale { - /// Linear bucket scale - Linear, - - /// Logarithmic bucket scale - #[allow(dead_code)] - Log, +cfg_unstable! { + /// Whether the histogram used to aggregate a metric uses a linear or + /// logarithmic scale. + #[derive(Debug, Copy, Clone, Eq, PartialEq)] + #[non_exhaustive] + pub enum HistogramScale { + /// Linear bucket scale + Linear, + + /// Logarithmic bucket scale + Log, + } } impl Histogram { - #[allow(dead_code)] pub(crate) fn num_buckets(&self) -> usize { self.buckets.len() } cfg_64bit_metrics! { - #[allow(dead_code)] pub(crate) fn get(&self, bucket: usize) -> u64 { self.buckets[bucket].load(Relaxed) } } - #[allow(dead_code)] pub(crate) fn bucket_range(&self, bucket: usize) -> Range { match self.scale { HistogramScale::Log => Range { diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 8256e430c8b..d142b0dcfe8 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -10,3 +10,27 @@ impl SchedulerMetrics { /// Increment the number of tasks scheduled externally pub(crate) fn inc_remote_schedule_count(&self) {} } + +#[derive(Debug)] +pub(crate) struct Histogram {} + +pub(crate) struct HistogramBatch {} + +#[derive(Debug, Clone, Default)] +pub(crate) struct HistogramBuilder {} + +impl HistogramBuilder { + pub(crate) fn build(&self) -> Histogram { + Histogram {} + } +} + +impl HistogramBatch { + pub(crate) fn from_histogram(_histogram: &Histogram) -> HistogramBatch { + HistogramBatch {} + } + + pub(crate) fn submit(&self, _histogram: &Histogram) {} + + pub(crate) fn measure(&mut self, _value: u64, _count: u64) {} +} diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index dbe7693faa5..896656c4987 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -17,9 +17,6 @@ pub(crate) use worker::WorkerMetrics; mod batch; pub(crate) use batch::MetricsBatch; -mod histogram; -pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; - cfg_unstable_metrics! { #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use histogram::HistogramScale; @@ -32,10 +29,14 @@ cfg_unstable_metrics! { mod io; pub(crate) use io::IoDriverMetrics; } + + mod histogram; + pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; } cfg_not_unstable_metrics! { mod mock; - pub(crate) use mock::SchedulerMetrics; + pub(crate) use mock::{SchedulerMetrics, Histogram, HistogramBatch, HistogramBuilder}; + } From 489003c9b7c4319d709bb050ca06df3c382e22b2 Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Wed, 16 Oct 2024 22:53:45 +0800 Subject: [PATCH 07/12] Hide queue_depth and thread_id behind unstable flag --- tokio/src/runtime/metrics/worker.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index f690df44fdd..9eb30d91f96 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -70,18 +70,20 @@ impl WorkerMetrics { WorkerMetrics::default() } - #[allow(dead_code)] - pub(crate) fn queue_depth(&self) -> usize { - self.queue_depth.load(Relaxed) + cfg_unstable_metrics! { + pub(crate) fn queue_depth(&self) -> usize { + self.queue_depth.load(Relaxed) + } } pub(crate) fn set_queue_depth(&self, len: usize) { self.queue_depth.store(len, Relaxed); } - #[allow(dead_code)] - pub(crate) fn thread_id(&self) -> Option { - *self.thread_id.lock().unwrap() + cfg_unstable_metrics! { + pub(crate) fn thread_id(&self) -> Option { + *self.thread_id.lock().unwrap() + } } pub(crate) fn set_thread_id(&self, thread_id: ThreadId) { From 8a134a29c64675f35131fe82fc9224880aa1219d Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Sat, 19 Oct 2024 12:00:14 +0800 Subject: [PATCH 08/12] Mark most fields of WorkerMetrics as unstable, except for busy_duration_total, queue_depth and thread_id --- tokio/src/runtime/metrics/mock.rs | 72 +++++++++++++++---- tokio/src/runtime/metrics/mod.rs | 23 +++--- tokio/src/runtime/metrics/worker.rs | 47 +++++++++--- .../runtime/scheduler/current_thread/mod.rs | 2 +- .../scheduler/multi_thread/handle/metrics.rs | 2 +- 5 files changed, 110 insertions(+), 36 deletions(-) diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index d142b0dcfe8..64c7f1c6b0d 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -1,7 +1,32 @@ //! This file contains mocks of the types in src/runtime/metrics +use crate::runtime::WorkerMetrics; +use std::sync::atomic::Ordering::Relaxed; +use std::time::{Duration, Instant}; + pub(crate) struct SchedulerMetrics {} +/// The `MetricsBatch` struct in this mock implementation provides a minimal, +/// simplified version of `batch::MetricsBatch`. It contains only the basic fields +/// required to track the total busy duration (`busy_duration_total`) . +/// +/// This mock is used to stabilize the API `worker_total_busy_duration` +/// without relying on the full metrics collection logic. In the real implementation, +/// additional fields provide more detailed tracking of worker activity. +/// +/// This mock can be further enriched when stabailzing other worker metrics, such as +/// `worker_thread_id`, `worker_park_count` and so on +pub(crate) struct MetricsBatch { + /// The total busy duration in nanoseconds. + busy_duration_total: u64, + + /// Instant at which work last resumed (continued after park). + processing_scheduled_tasks_started_at: Instant, +} + +#[derive(Clone, Default)] +pub(crate) struct HistogramBuilder {} + impl SchedulerMetrics { pub(crate) fn new() -> Self { Self {} @@ -11,26 +36,43 @@ impl SchedulerMetrics { pub(crate) fn inc_remote_schedule_count(&self) {} } -#[derive(Debug)] -pub(crate) struct Histogram {} - -pub(crate) struct HistogramBatch {} +impl MetricsBatch { + pub(crate) fn new(_: &WorkerMetrics) -> Self { + let now = Instant::now(); -#[derive(Debug, Clone, Default)] -pub(crate) struct HistogramBuilder {} + MetricsBatch { + busy_duration_total: 0, + processing_scheduled_tasks_started_at: now, + } + } -impl HistogramBuilder { - pub(crate) fn build(&self) -> Histogram { - Histogram {} + pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) { + worker + .busy_duration_total + .store(self.busy_duration_total, Relaxed); + } + pub(crate) fn about_to_park(&mut self) {} + pub(crate) fn unparked(&mut self) {} + pub(crate) fn inc_local_schedule_count(&mut self) {} + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.processing_scheduled_tasks_started_at = Instant::now(); } + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + let busy_duration = self.processing_scheduled_tasks_started_at.elapsed(); + self.busy_duration_total += duration_as_u64(busy_duration); + } + pub(crate) fn start_poll(&mut self) {} + pub(crate) fn end_poll(&mut self) {} } -impl HistogramBatch { - pub(crate) fn from_histogram(_histogram: &Histogram) -> HistogramBatch { - HistogramBatch {} +cfg_rt_multi_thread! { + impl MetricsBatch { + pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + pub(crate) fn incr_steal_operations(&mut self) {} + pub(crate) fn incr_overflow_count(&mut self) {} } +} - pub(crate) fn submit(&self, _histogram: &Histogram) {} - - pub(crate) fn measure(&mut self, _value: u64, _count: u64) {} +fn duration_as_u64(dur: Duration) -> u64 { + u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX) } diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 896656c4987..231226b17f8 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -11,13 +11,13 @@ mod runtime; pub use runtime::RuntimeMetrics; -mod worker; -pub(crate) use worker::WorkerMetrics; +cfg_unstable_metrics! { + mod batch; + pub(crate) use batch::MetricsBatch; -mod batch; -pub(crate) use batch::MetricsBatch; + mod histogram; + pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; -cfg_unstable_metrics! { #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use histogram::HistogramScale; @@ -25,18 +25,19 @@ cfg_unstable_metrics! { mod scheduler; pub(crate) use scheduler::SchedulerMetrics; + mod worker; + pub(crate) use worker::WorkerMetrics; + cfg_net! { mod io; pub(crate) use io::IoDriverMetrics; } - - mod histogram; - pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; } cfg_not_unstable_metrics! { - mod mock; - - pub(crate) use mock::{SchedulerMetrics, Histogram, HistogramBatch, HistogramBuilder}; + mod worker; + pub(crate) use worker::WorkerMetrics; + mod mock; + pub(crate) use mock::{SchedulerMetrics, MetricsBatch, HistogramBuilder}; } diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 9eb30d91f96..948511895a8 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -1,10 +1,13 @@ -use crate::runtime::metrics::Histogram; use crate::runtime::Config; use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize}; use std::sync::atomic::Ordering::Relaxed; use std::sync::Mutex; use std::thread::ThreadId; +cfg_unstable_metrics! { + use crate::runtime::metrics::Histogram; +} + /// Retrieve runtime worker metrics. /// /// **Note**: This is an [unstable API][unstable]. The public API of this type @@ -15,33 +18,51 @@ use std::thread::ThreadId; #[derive(Debug, Default)] #[repr(align(128))] pub(crate) struct WorkerMetrics { + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker parked. pub(crate) park_count: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker parked and unparked. pub(crate) park_unpark_count: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker woke then parked again without doing work. pub(crate) noop_count: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks the worker stole. pub(crate) steal_count: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker stole pub(crate) steal_operations: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks the worker polled. pub(crate) poll_count: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// EWMA task poll time, in nanoseconds. pub(crate) mean_poll_time: MetricAtomicU64, /// Amount of time the worker spent doing work vs. parking. pub(crate) busy_duration_total: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks scheduled for execution on the worker's local queue. pub(crate) local_schedule_count: MetricAtomicU64, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks moved from the local queue to the global queue to free space. pub(crate) overflow_count: MetricAtomicU64, @@ -49,6 +70,8 @@ pub(crate) struct WorkerMetrics { /// current-thread scheduler. pub(crate) queue_depth: MetricAtomicUsize, + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, @@ -57,13 +80,21 @@ pub(crate) struct WorkerMetrics { } impl WorkerMetrics { - pub(crate) fn from_config(config: &Config) -> WorkerMetrics { - let mut worker_metrics = WorkerMetrics::new(); - worker_metrics.poll_count_histogram = config - .metrics_poll_count_histogram - .as_ref() - .map(|histogram_builder| histogram_builder.build()); - worker_metrics + cfg_unstable_metrics! { + pub(crate) fn from_config(config: &Config) -> WorkerMetrics { + let mut worker_metrics = WorkerMetrics::new(); + worker_metrics.poll_count_histogram = config + .metrics_poll_count_histogram + .as_ref() + .map(|histogram_builder| histogram_builder.build()); + worker_metrics + } + } + + cfg_not_unstable_metrics! { + pub(crate) fn from_config(_: &Config) -> WorkerMetrics { + WorkerMetrics::new() + } } pub(crate) fn new() -> WorkerMetrics { diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index a6f106f1029..aea85505fe3 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -533,7 +533,7 @@ impl Handle { self.shared.inject.len() } - #[allow(dead_code)] + // #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { assert_eq!(0, worker); &self.shared.worker_metrics diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index f010810c59b..22d920ce482 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -18,7 +18,7 @@ impl Handle { self.shared.injection_queue_depth() } - #[allow(dead_code)] + // #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { &self.shared.worker_metrics[worker] } From 7eb6b97e23532264a7b648b66af67f49ed11b4ed Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Sat, 19 Oct 2024 12:06:55 +0800 Subject: [PATCH 09/12] Remove allow dead_code, merge master & fix spellcheck --- tokio/src/runtime/metrics/mock.rs | 5 ++++- tokio/src/runtime/scheduler/current_thread/mod.rs | 1 - tokio/src/runtime/scheduler/mod.rs | 1 - tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs | 1 - 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 64c7f1c6b0d..fd6c3b31155 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -14,8 +14,11 @@ pub(crate) struct SchedulerMetrics {} /// without relying on the full metrics collection logic. In the real implementation, /// additional fields provide more detailed tracking of worker activity. /// -/// This mock can be further enriched when stabailzing other worker metrics, such as +/// This mock can be further enriched when stabilizing other worker metrics, such as /// `worker_thread_id`, `worker_park_count` and so on +/// +/// When more worker metrics are stabilized, we can remove this mock and switch back +/// to `batch::MetricsBatch` pub(crate) struct MetricsBatch { /// The total busy duration in nanoseconds. busy_duration_total: u64, diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 449c58d9958..c3310c81084 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -567,7 +567,6 @@ impl Handle { self.shared.inject.len() } - // #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { assert_eq!(0, worker); &self.shared.worker_metrics diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index a0d0127065c..3d19300a61a 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -239,7 +239,6 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.injection_queue_depth()) } - #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 22d920ce482..985495c7561 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -18,7 +18,6 @@ impl Handle { self.shared.injection_queue_depth() } - // #[allow(dead_code)] pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { &self.shared.worker_metrics[worker] } From 57f6b9b1d99772c9ffeb7226cef152b56fe643ee Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Mon, 21 Oct 2024 21:48:31 +0800 Subject: [PATCH 10/12] Add back worker_total_busy_duration test --- tokio/tests/rt_metrics.rs | 43 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 7adee28f917..f964c0d754b 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -83,6 +83,49 @@ fn global_queue_depth_multi_thread() { panic!("exhausted every try to block the runtime"); } +#[test] +fn worker_total_busy_duration() { + const N: usize = 5; + + let zero = Duration::from_millis(0); + + let rt = current_thread(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + assert!(zero < metrics.worker_total_busy_duration(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + for i in 0..metrics.num_workers() { + assert!(zero < metrics.worker_total_busy_duration(i)); + } +} + fn try_block_threaded(rt: &Runtime) -> Result>, mpsc::RecvTimeoutError> { let (tx, rx) = mpsc::channel(); From c8b2c7db2c0935b3f22d3b38adcc0e7713a67b83 Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Tue, 22 Oct 2024 19:30:31 +0800 Subject: [PATCH 11/12] Remove mock metricBatch, split MetricBatch implementation into stable & unstable --- tokio/src/runtime/metrics/batch.rs | 177 +++++++++++++++++----------- tokio/src/runtime/metrics/mock.rs | 66 ----------- tokio/src/runtime/metrics/mod.rs | 5 +- tokio/src/runtime/metrics/worker.rs | 48 +++----- 4 files changed, 132 insertions(+), 164 deletions(-) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index aa6b78db779..43d234362b4 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -1,34 +1,47 @@ -use crate::runtime::metrics::{HistogramBatch, WorkerMetrics}; +use crate::runtime::metrics::WorkerMetrics; + +cfg_unstable_metrics! { + use crate::runtime::metrics::HistogramBatch; +} use std::sync::atomic::Ordering::Relaxed; use std::time::{Duration, Instant}; pub(crate) struct MetricsBatch { + #[cfg(tokio_unstable)] /// Number of times the worker parked. park_count: u64, + #[cfg(tokio_unstable)] /// Number of times the worker parked and unparked. park_unpark_count: u64, + #[cfg(tokio_unstable)] /// Number of times the worker woke w/o doing work. noop_count: u64, + #[cfg(tokio_unstable)] /// Number of tasks stolen. steal_count: u64, + #[cfg(tokio_unstable)] /// Number of times tasks where stolen. steal_operations: u64, + #[cfg(tokio_unstable)] /// Number of tasks that were polled by the worker. poll_count: u64, + #[cfg(tokio_unstable)] /// Number of tasks polled when the worker entered park. This is used to /// track the noop count. poll_count_on_last_park: u64, + #[cfg(tokio_unstable)] /// Number of tasks that were scheduled locally on this worker. local_schedule_count: u64, + #[cfg(tokio_unstable)] /// Number of tasks moved to the global queue to make space in the local /// queue overflow_count: u64, @@ -39,87 +52,107 @@ pub(crate) struct MetricsBatch { /// Instant at which work last resumed (continued after park). processing_scheduled_tasks_started_at: Instant, + #[cfg(tokio_unstable)] /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, } -struct PollTimer { - /// Histogram of poll counts within each band. - poll_counts: HistogramBatch, +cfg_unstable_metrics! { + struct PollTimer { + /// Histogram of poll counts within each band. + poll_counts: HistogramBatch, - /// Instant when the most recent task started polling. - poll_started_at: Instant, + /// Instant when the most recent task started polling. + poll_started_at: Instant, + } } impl MetricsBatch { - pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch { + pub(crate) fn new(_worker_metrics: &WorkerMetrics) -> MetricsBatch { let now = Instant::now(); + #[cfg(not(tokio_unstable))] + { + MetricsBatch { + busy_duration_total: 0, + processing_scheduled_tasks_started_at: now, + } + } - MetricsBatch { - park_count: 0, - park_unpark_count: 0, - noop_count: 0, - steal_count: 0, - steal_operations: 0, - poll_count: 0, - poll_count_on_last_park: 0, - local_schedule_count: 0, - overflow_count: 0, - busy_duration_total: 0, - processing_scheduled_tasks_started_at: now, - poll_timer: worker_metrics - .poll_count_histogram - .as_ref() - .map(|worker_poll_counts| PollTimer { - poll_counts: HistogramBatch::from_histogram(worker_poll_counts), - poll_started_at: now, - }), + #[cfg(tokio_unstable)] + { + MetricsBatch { + park_count: 0, + park_unpark_count: 0, + noop_count: 0, + steal_count: 0, + steal_operations: 0, + poll_count: 0, + poll_count_on_last_park: 0, + local_schedule_count: 0, + overflow_count: 0, + busy_duration_total: 0, + processing_scheduled_tasks_started_at: now, + poll_timer: _worker_metrics.poll_count_histogram.as_ref().map( + |worker_poll_counts| PollTimer { + poll_counts: HistogramBatch::from_histogram(worker_poll_counts), + poll_started_at: now, + }, + ), + } } } - pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { - worker.mean_poll_time.store(mean_poll_time, Relaxed); - worker.park_count.store(self.park_count, Relaxed); - worker - .park_unpark_count - .store(self.park_unpark_count, Relaxed); - worker.noop_count.store(self.noop_count, Relaxed); - worker.steal_count.store(self.steal_count, Relaxed); - worker - .steal_operations - .store(self.steal_operations, Relaxed); - worker.poll_count.store(self.poll_count, Relaxed); - + pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) { + #[cfg(tokio_unstable)] + { + worker.mean_poll_time.store(_mean_poll_time, Relaxed); + worker.park_count.store(self.park_count, Relaxed); + worker + .park_unpark_count + .store(self.park_unpark_count, Relaxed); + worker.noop_count.store(self.noop_count, Relaxed); + worker.steal_count.store(self.steal_count, Relaxed); + worker + .steal_operations + .store(self.steal_operations, Relaxed); + worker.poll_count.store(self.poll_count, Relaxed); + + worker + .local_schedule_count + .store(self.local_schedule_count, Relaxed); + worker.overflow_count.store(self.overflow_count, Relaxed); + + if let Some(poll_timer) = &self.poll_timer { + let dst = worker.poll_count_histogram.as_ref().unwrap(); + poll_timer.poll_counts.submit(dst); + } + } worker .busy_duration_total .store(self.busy_duration_total, Relaxed); - - worker - .local_schedule_count - .store(self.local_schedule_count, Relaxed); - worker.overflow_count.store(self.overflow_count, Relaxed); - - if let Some(poll_timer) = &self.poll_timer { - let dst = worker.poll_count_histogram.as_ref().unwrap(); - poll_timer.poll_counts.submit(dst); - } } /// The worker is about to park. pub(crate) fn about_to_park(&mut self) { - self.park_count += 1; - self.park_unpark_count += 1; - - if self.poll_count_on_last_park == self.poll_count { - self.noop_count += 1; - } else { - self.poll_count_on_last_park = self.poll_count; + #[cfg(tokio_unstable)] + { + self.park_count += 1; + self.park_unpark_count += 1; + + if self.poll_count_on_last_park == self.poll_count { + self.noop_count += 1; + } else { + self.poll_count_on_last_park = self.poll_count; + } } } /// The worker was unparked. pub(crate) fn unparked(&mut self) { - self.park_unpark_count += 1; + #[cfg(tokio_unstable)] + { + self.park_unpark_count += 1; + } } /// Start processing a batch of tasks @@ -135,15 +168,18 @@ impl MetricsBatch { /// Start polling an individual task pub(crate) fn start_poll(&mut self) { - self.poll_count += 1; - - if let Some(poll_timer) = &mut self.poll_timer { - poll_timer.poll_started_at = Instant::now(); + #[cfg(tokio_unstable)] + { + self.poll_count += 1; + if let Some(poll_timer) = &mut self.poll_timer { + poll_timer.poll_started_at = Instant::now(); + } } } /// Stop polling an individual task pub(crate) fn end_poll(&mut self) { + #[cfg(tokio_unstable)] if let Some(poll_timer) = &mut self.poll_timer { let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); poll_timer.poll_counts.measure(elapsed, 1); @@ -151,22 +187,31 @@ impl MetricsBatch { } pub(crate) fn inc_local_schedule_count(&mut self) { - self.local_schedule_count += 1; + #[cfg(tokio_unstable)] + { + self.local_schedule_count += 1; + } } } cfg_rt_multi_thread! { impl MetricsBatch { - pub(crate) fn incr_steal_count(&mut self, by: u16) { - self.steal_count += by as u64; + pub(crate) fn incr_steal_count(&mut self, _by: u16) { + #[cfg(tokio_unstable)] { + self.steal_count += _by as u64; + } } pub(crate) fn incr_steal_operations(&mut self) { - self.steal_operations += 1; + #[cfg(tokio_unstable)] { + self.steal_operations += 1; + } } pub(crate) fn incr_overflow_count(&mut self) { - self.overflow_count += 1; + #[cfg(tokio_unstable)] { + self.overflow_count += 1; + } } } } diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index fd6c3b31155..6d70b2e1b2d 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -1,32 +1,7 @@ //! This file contains mocks of the types in src/runtime/metrics -use crate::runtime::WorkerMetrics; -use std::sync::atomic::Ordering::Relaxed; -use std::time::{Duration, Instant}; - pub(crate) struct SchedulerMetrics {} -/// The `MetricsBatch` struct in this mock implementation provides a minimal, -/// simplified version of `batch::MetricsBatch`. It contains only the basic fields -/// required to track the total busy duration (`busy_duration_total`) . -/// -/// This mock is used to stabilize the API `worker_total_busy_duration` -/// without relying on the full metrics collection logic. In the real implementation, -/// additional fields provide more detailed tracking of worker activity. -/// -/// This mock can be further enriched when stabilizing other worker metrics, such as -/// `worker_thread_id`, `worker_park_count` and so on -/// -/// When more worker metrics are stabilized, we can remove this mock and switch back -/// to `batch::MetricsBatch` -pub(crate) struct MetricsBatch { - /// The total busy duration in nanoseconds. - busy_duration_total: u64, - - /// Instant at which work last resumed (continued after park). - processing_scheduled_tasks_started_at: Instant, -} - #[derive(Clone, Default)] pub(crate) struct HistogramBuilder {} @@ -38,44 +13,3 @@ impl SchedulerMetrics { /// Increment the number of tasks scheduled externally pub(crate) fn inc_remote_schedule_count(&self) {} } - -impl MetricsBatch { - pub(crate) fn new(_: &WorkerMetrics) -> Self { - let now = Instant::now(); - - MetricsBatch { - busy_duration_total: 0, - processing_scheduled_tasks_started_at: now, - } - } - - pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) { - worker - .busy_duration_total - .store(self.busy_duration_total, Relaxed); - } - pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn unparked(&mut self) {} - pub(crate) fn inc_local_schedule_count(&mut self) {} - pub(crate) fn start_processing_scheduled_tasks(&mut self) { - self.processing_scheduled_tasks_started_at = Instant::now(); - } - pub(crate) fn end_processing_scheduled_tasks(&mut self) { - let busy_duration = self.processing_scheduled_tasks_started_at.elapsed(); - self.busy_duration_total += duration_as_u64(busy_duration); - } - pub(crate) fn start_poll(&mut self) {} - pub(crate) fn end_poll(&mut self) {} -} - -cfg_rt_multi_thread! { - impl MetricsBatch { - pub(crate) fn incr_steal_count(&mut self, _by: u16) {} - pub(crate) fn incr_steal_operations(&mut self) {} - pub(crate) fn incr_overflow_count(&mut self) {} - } -} - -fn duration_as_u64(dur: Duration) -> u64 { - u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX) -} diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 03c27048454..1e15217a6d6 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -38,6 +38,9 @@ cfg_not_unstable_metrics! { mod worker; pub(crate) use worker::WorkerMetrics; + mod batch; + pub(crate) use batch::MetricsBatch; + mod mock; - pub(crate) use mock::{SchedulerMetrics, MetricsBatch, HistogramBuilder}; + pub(crate) use mock::{SchedulerMetrics, HistogramBuilder}; } diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 948511895a8..231b7f324a7 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -19,37 +19,30 @@ cfg_unstable_metrics! { #[repr(align(128))] pub(crate) struct WorkerMetrics { #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker parked. pub(crate) park_count: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker parked and unparked. pub(crate) park_unpark_count: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker woke then parked again without doing work. pub(crate) noop_count: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks the worker stole. pub(crate) steal_count: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of times the worker stole pub(crate) steal_operations: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks the worker polled. pub(crate) poll_count: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// EWMA task poll time, in nanoseconds. pub(crate) mean_poll_time: MetricAtomicU64, @@ -57,12 +50,10 @@ pub(crate) struct WorkerMetrics { pub(crate) busy_duration_total: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks scheduled for execution on the worker's local queue. pub(crate) local_schedule_count: MetricAtomicU64, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// Number of tasks moved from the local queue to the global queue to free space. pub(crate) overflow_count: MetricAtomicU64, @@ -71,7 +62,6 @@ pub(crate) struct WorkerMetrics { pub(crate) queue_depth: MetricAtomicUsize, #[cfg(tokio_unstable)] - #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] /// If `Some`, tracks the number of polls by duration range. pub(super) poll_count_histogram: Option, @@ -80,17 +70,6 @@ pub(crate) struct WorkerMetrics { } impl WorkerMetrics { - cfg_unstable_metrics! { - pub(crate) fn from_config(config: &Config) -> WorkerMetrics { - let mut worker_metrics = WorkerMetrics::new(); - worker_metrics.poll_count_histogram = config - .metrics_poll_count_histogram - .as_ref() - .map(|histogram_builder| histogram_builder.build()); - worker_metrics - } - } - cfg_not_unstable_metrics! { pub(crate) fn from_config(_: &Config) -> WorkerMetrics { WorkerMetrics::new() @@ -101,23 +80,30 @@ impl WorkerMetrics { WorkerMetrics::default() } - cfg_unstable_metrics! { - pub(crate) fn queue_depth(&self) -> usize { - self.queue_depth.load(Relaxed) - } - } - pub(crate) fn set_queue_depth(&self, len: usize) { self.queue_depth.store(len, Relaxed); } + pub(crate) fn set_thread_id(&self, thread_id: ThreadId) { + *self.thread_id.lock().unwrap() = Some(thread_id); + } + cfg_unstable_metrics! { + pub(crate) fn from_config(config: &Config) -> WorkerMetrics { + let mut worker_metrics = WorkerMetrics::new(); + worker_metrics.poll_count_histogram = config + .metrics_poll_count_histogram + .as_ref() + .map(|histogram_builder| histogram_builder.build()); + worker_metrics + } + + pub(crate) fn queue_depth(&self) -> usize { + self.queue_depth.load(Relaxed) + } + pub(crate) fn thread_id(&self) -> Option { *self.thread_id.lock().unwrap() } } - - pub(crate) fn set_thread_id(&self, thread_id: ThreadId) { - *self.thread_id.lock().unwrap() = Some(thread_id); - } } From 14543de657e6423a44bc6b698afe2a13247aa27d Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Sat, 23 Nov 2024 11:15:51 +0800 Subject: [PATCH 12/12] Gate code based on cfg_unstable_metrics and cfg_not_unstable_metrics --- tokio/src/runtime/metrics/batch.rs | 123 ++++++++++++++++++---------- tokio/src/runtime/metrics/mod.rs | 18 ++-- tokio/src/runtime/metrics/worker.rs | 12 +-- 3 files changed, 94 insertions(+), 59 deletions(-) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 43d234362b4..197ef29acfe 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -68,18 +68,24 @@ cfg_unstable_metrics! { } impl MetricsBatch { - pub(crate) fn new(_worker_metrics: &WorkerMetrics) -> MetricsBatch { + pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch { let now = Instant::now(); - #[cfg(not(tokio_unstable))] - { + Self::new_unstable(worker_metrics, now) + } + + cfg_not_unstable_metrics! { + #[inline(always)] + fn new_unstable(_worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch { MetricsBatch { busy_duration_total: 0, processing_scheduled_tasks_started_at: now, } } + } - #[cfg(tokio_unstable)] - { + cfg_unstable_metrics! { + #[inline(always)] + fn new_unstable(worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch { MetricsBatch { park_count: 0, park_unpark_count: 0, @@ -92,7 +98,7 @@ impl MetricsBatch { overflow_count: 0, busy_duration_total: 0, processing_scheduled_tasks_started_at: now, - poll_timer: _worker_metrics.poll_count_histogram.as_ref().map( + poll_timer: worker_metrics.poll_count_histogram.as_ref().map( |worker_poll_counts| PollTimer { poll_counts: HistogramBatch::from_histogram(worker_poll_counts), poll_started_at: now, @@ -102,10 +108,23 @@ impl MetricsBatch { } } - pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) { - #[cfg(tokio_unstable)] - { - worker.mean_poll_time.store(_mean_poll_time, Relaxed); + pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { + worker + .busy_duration_total + .store(self.busy_duration_total, Relaxed); + + self.submit_unstable(worker, mean_poll_time); + } + + cfg_not_unstable_metrics! { + #[inline(always)] + fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {} + } + + cfg_unstable_metrics! { + #[inline(always)] + fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { + worker.mean_poll_time.store(mean_poll_time, Relaxed); worker.park_count.store(self.park_count, Relaxed); worker .park_unpark_count @@ -127,34 +146,42 @@ impl MetricsBatch { poll_timer.poll_counts.submit(dst); } } - worker - .busy_duration_total - .store(self.busy_duration_total, Relaxed); } - /// The worker is about to park. - pub(crate) fn about_to_park(&mut self) { - #[cfg(tokio_unstable)] - { - self.park_count += 1; - self.park_unpark_count += 1; - - if self.poll_count_on_last_park == self.poll_count { - self.noop_count += 1; - } else { - self.poll_count_on_last_park = self.poll_count; + cfg_unstable_metrics! { + /// The worker is about to park. + pub(crate) fn about_to_park(&mut self) { + #[cfg(tokio_unstable)] + { + self.park_count += 1; + self.park_unpark_count += 1; + + if self.poll_count_on_last_park == self.poll_count { + self.noop_count += 1; + } else { + self.poll_count_on_last_park = self.poll_count; + } } } } - /// The worker was unparked. - pub(crate) fn unparked(&mut self) { - #[cfg(tokio_unstable)] - { + cfg_not_unstable_metrics! { + /// The worker is about to park. + pub(crate) fn about_to_park(&mut self) {} + } + + cfg_unstable_metrics! { + /// The worker was unparked. + pub(crate) fn unparked(&mut self) { self.park_unpark_count += 1; } } + cfg_not_unstable_metrics! { + /// The worker was unparked. + pub(crate) fn unparked(&mut self) {} + } + /// Start processing a batch of tasks pub(crate) fn start_processing_scheduled_tasks(&mut self) { self.processing_scheduled_tasks_started_at = Instant::now(); @@ -166,10 +193,9 @@ impl MetricsBatch { self.busy_duration_total += duration_as_u64(busy_duration); } - /// Start polling an individual task - pub(crate) fn start_poll(&mut self) { - #[cfg(tokio_unstable)] - { + cfg_unstable_metrics! { + /// Start polling an individual task + pub(crate) fn start_poll(&mut self) { self.poll_count += 1; if let Some(poll_timer) = &mut self.poll_timer { poll_timer.poll_started_at = Instant::now(); @@ -177,21 +203,36 @@ impl MetricsBatch { } } - /// Stop polling an individual task - pub(crate) fn end_poll(&mut self) { - #[cfg(tokio_unstable)] - if let Some(poll_timer) = &mut self.poll_timer { - let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); - poll_timer.poll_counts.measure(elapsed, 1); + cfg_not_unstable_metrics! { + /// Start polling an individual task + pub(crate) fn start_poll(&mut self) {} + } + + cfg_unstable_metrics! { + /// Stop polling an individual task + pub(crate) fn end_poll(&mut self) { + #[cfg(tokio_unstable)] + if let Some(poll_timer) = &mut self.poll_timer { + let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); + poll_timer.poll_counts.measure(elapsed, 1); + } } } - pub(crate) fn inc_local_schedule_count(&mut self) { - #[cfg(tokio_unstable)] - { + cfg_not_unstable_metrics! { + /// Stop polling an individual task + pub(crate) fn end_poll(&mut self) {} + } + + cfg_unstable_metrics! { + pub(crate) fn inc_local_schedule_count(&mut self) { self.local_schedule_count += 1; } } + + cfg_not_unstable_metrics! { + pub(crate) fn inc_local_schedule_count(&mut self) {} + } } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs index 1e15217a6d6..b2772f1842a 100644 --- a/tokio/src/runtime/metrics/mod.rs +++ b/tokio/src/runtime/metrics/mod.rs @@ -11,9 +11,13 @@ mod runtime; pub use runtime::RuntimeMetrics; +mod batch; +pub(crate) use batch::MetricsBatch; + +mod worker; +pub(crate) use worker::WorkerMetrics; + cfg_unstable_metrics! { - mod batch; - pub(crate) use batch::MetricsBatch; mod histogram; pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder}; @@ -21,13 +25,9 @@ cfg_unstable_metrics! { #[allow(unreachable_pub)] // rust-lang/rust#57411 pub use histogram::{HistogramScale, HistogramConfiguration, LogHistogram, LogHistogramBuilder, InvalidHistogramConfiguration}; - mod scheduler; pub(crate) use scheduler::SchedulerMetrics; - mod worker; - pub(crate) use worker::WorkerMetrics; - cfg_net! { mod io; pub(crate) use io::IoDriverMetrics; @@ -35,12 +35,6 @@ cfg_unstable_metrics! { } cfg_not_unstable_metrics! { - mod worker; - pub(crate) use worker::WorkerMetrics; - - mod batch; - pub(crate) use batch::MetricsBatch; - mod mock; pub(crate) use mock::{SchedulerMetrics, HistogramBuilder}; } diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 231b7f324a7..c931aca32e1 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -70,12 +70,6 @@ pub(crate) struct WorkerMetrics { } impl WorkerMetrics { - cfg_not_unstable_metrics! { - pub(crate) fn from_config(_: &Config) -> WorkerMetrics { - WorkerMetrics::new() - } - } - pub(crate) fn new() -> WorkerMetrics { WorkerMetrics::default() } @@ -88,6 +82,12 @@ impl WorkerMetrics { *self.thread_id.lock().unwrap() = Some(thread_id); } + cfg_not_unstable_metrics! { + pub(crate) fn from_config(_: &Config) -> WorkerMetrics { + WorkerMetrics::new() + } + } + cfg_unstable_metrics! { pub(crate) fn from_config(config: &Config) -> WorkerMetrics { let mut worker_metrics = WorkerMetrics::new();