Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stabilize worker_total_busy_duration #6899

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 57 additions & 15 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
//! This file contains mocks of the types in src/runtime/metrics

use crate::runtime::WorkerMetrics;
use std::sync::atomic::Ordering::Relaxed;
use std::time::{Duration, Instant};

pub(crate) struct SchedulerMetrics {}

/// The `MetricsBatch` struct in this mock implementation provides a minimal,
/// simplified version of `batch::MetricsBatch`. It contains only the basic fields
/// required to track the total busy duration (`busy_duration_total`) .
///
/// This mock is used to stabilize the API `worker_total_busy_duration`
/// without relying on the full metrics collection logic. In the real implementation,
/// additional fields provide more detailed tracking of worker activity.
///
/// This mock can be further enriched when stabailzing other worker metrics, such as
/// `worker_thread_id`, `worker_park_count` and so on
pub(crate) struct MetricsBatch {
/// The total busy duration in nanoseconds.
busy_duration_total: u64,

/// Instant at which work last resumed (continued after park).
processing_scheduled_tasks_started_at: Instant,
}

#[derive(Clone, Default)]
pub(crate) struct HistogramBuilder {}

impl SchedulerMetrics {
pub(crate) fn new() -> Self {
Self {}
Expand All @@ -11,26 +36,43 @@ impl SchedulerMetrics {
pub(crate) fn inc_remote_schedule_count(&self) {}
}

#[derive(Debug)]
pub(crate) struct Histogram {}

pub(crate) struct HistogramBatch {}
impl MetricsBatch {
pub(crate) fn new(_: &WorkerMetrics) -> Self {
let now = Instant::now();

#[derive(Debug, Clone, Default)]
pub(crate) struct HistogramBuilder {}
MetricsBatch {
busy_duration_total: 0,
processing_scheduled_tasks_started_at: now,
}
}

impl HistogramBuilder {
pub(crate) fn build(&self) -> Histogram {
Histogram {}
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this function duplicates part of the submit function in batch::MetricsBatch?

I think this is a problematic way of gradually stabilizing metrics, as it opens the possibility of having divirging implementations if a change is made to the "real" MetricsBatch by someone who doesn't realise that there is another one.

This is additionally confusing because this effectively becomes the "stable" implementation, but it lives in a module called mock.

I would propose that we instead split the metrics::MetricsBatch implementation into stable (always compiles) and unstable (gated by cfg option), the same way we've done elsewhere in this PR. The same as with another comment, we would group all the unstable functions into a single cfg_unstable_metrics! block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed spliting metrics::MetricsBatch is a much viable way of stabilising. I've adopted your suggestion and split it into stable & unstable (and group unstable functions into a single unstable block. Thanks a lot for reviewing!

worker
.busy_duration_total
.store(self.busy_duration_total, Relaxed);
}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.processing_scheduled_tasks_started_at = Instant::now();
}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {
let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
self.busy_duration_total += duration_as_u64(busy_duration);
}
pub(crate) fn start_poll(&mut self) {}
pub(crate) fn end_poll(&mut self) {}
}

impl HistogramBatch {
pub(crate) fn from_histogram(_histogram: &Histogram) -> HistogramBatch {
HistogramBatch {}
cfg_rt_multi_thread! {
impl MetricsBatch {
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
pub(crate) fn incr_steal_operations(&mut self) {}
pub(crate) fn incr_overflow_count(&mut self) {}
}
}

pub(crate) fn submit(&self, _histogram: &Histogram) {}

pub(crate) fn measure(&mut self, _value: u64, _count: u64) {}
fn duration_as_u64(dur: Duration) -> u64 {
u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
}
23 changes: 12 additions & 11 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,33 @@
mod runtime;
pub use runtime::RuntimeMetrics;

mod worker;
pub(crate) use worker::WorkerMetrics;
cfg_unstable_metrics! {
mod batch;
pub(crate) use batch::MetricsBatch;

mod batch;
pub(crate) use batch::MetricsBatch;
mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};

cfg_unstable_metrics! {
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use histogram::HistogramScale;


mod scheduler;
pub(crate) use scheduler::SchedulerMetrics;

mod worker;
pub(crate) use worker::WorkerMetrics;

cfg_net! {
mod io;
pub(crate) use io::IoDriverMetrics;
}

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};
}

cfg_not_unstable_metrics! {
mod mock;

pub(crate) use mock::{SchedulerMetrics, Histogram, HistogramBatch, HistogramBuilder};
mod worker;
pub(crate) use worker::WorkerMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the modules and imports that are in both the macro blocks out above them, we don't need to gate them at all.


mod mock;
pub(crate) use mock::{SchedulerMetrics, MetricsBatch, HistogramBuilder};
}
47 changes: 39 additions & 8 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::runtime::metrics::Histogram;
use crate::runtime::Config;
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Mutex;
use std::thread::ThreadId;

cfg_unstable_metrics! {
use crate::runtime::metrics::Histogram;
}

/// Retrieve runtime worker metrics.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
Expand All @@ -15,40 +18,60 @@ use std::thread::ThreadId;
#[derive(Debug, Default)]
#[repr(align(128))]
pub(crate) struct WorkerMetrics {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker parked.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? Since this isn't a public method, it won't appear in the documentation.

pub(crate) park_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker parked and unparked.
pub(crate) park_unpark_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks the worker stole.
pub(crate) steal_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker stole
pub(crate) steal_operations: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks the worker polled.
pub(crate) poll_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// EWMA task poll time, in nanoseconds.
pub(crate) mean_poll_time: MetricAtomicU64,

/// Amount of time the worker spent doing work vs. parking.
pub(crate) busy_duration_total: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks scheduled for execution on the worker's local queue.
pub(crate) local_schedule_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks moved from the local queue to the global queue to free space.
pub(crate) overflow_count: MetricAtomicU64,

/// Number of tasks currently in the local queue. Used only by the
/// current-thread scheduler.
pub(crate) queue_depth: MetricAtomicUsize,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// If `Some`, tracks the number of polls by duration range.
pub(super) poll_count_histogram: Option<Histogram>,

Expand All @@ -57,13 +80,21 @@ pub(crate) struct WorkerMetrics {
}

impl WorkerMetrics {
pub(crate) fn from_config(config: &Config) -> WorkerMetrics {
let mut worker_metrics = WorkerMetrics::new();
worker_metrics.poll_count_histogram = config
.metrics_poll_count_histogram
.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
cfg_unstable_metrics! {
pub(crate) fn from_config(config: &Config) -> WorkerMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we grouped all the unstable functions together at the bottom of the impl block, instead of spreading them out.

let mut worker_metrics = WorkerMetrics::new();
worker_metrics.poll_count_histogram = config
.metrics_poll_count_histogram
.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
}
}

cfg_not_unstable_metrics! {
pub(crate) fn from_config(_: &Config) -> WorkerMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this down to be right above the cfg_unstable_metrics! block so that we keep the conditionally compiled implementations together.

WorkerMetrics::new()
}
}

pub(crate) fn new() -> WorkerMetrics {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ impl Handle {
self.shared.inject.len()
}

#[allow(dead_code)]
// #[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// #[allow(dead_code)]

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Handle {
self.shared.injection_queue_depth()
}

#[allow(dead_code)]
// #[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// #[allow(dead_code)]

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}
Expand Down
Loading