diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index a73eec8799a..12f70be862e 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -42,7 +42,7 @@ futures-io = { version = "0.3.0", optional = true } futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` -tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } +tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } [target.'cfg(tokio_unstable)'.dependencies] hashbrown = { version = "0.14.0", default-features = false, optional = true } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 4c5f7d46acb..b07f50150b7 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -101,7 +101,7 @@ socket2 = { version = "0.5.5", optional = true, features = [ "all" ] } # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. [target.'cfg(tokio_unstable)'.dependencies] -tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full +tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } # Not in full # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f8466a19bd9..7eec91d23d9 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD}; use crate::util::metric_atomics::MetricAtomicUsize; +use crate::util::trace::{blocking_task, SpawnMeta}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -299,10 +300,21 @@ impl Spawner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt) + let fn_size = std::mem::size_of::<F>(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { + self.spawn_blocking_inner( + Box::new(func), + Mandatory::NonMandatory, + SpawnMeta::new_unnamed(fn_size), + rt, + ) } else { - self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt) + self.spawn_blocking_inner( + func, + Mandatory::NonMandatory, + SpawnMeta::new_unnamed(fn_size), + rt, + ) }; match spawn_result { @@ -326,18 +338,19 @@ impl Spawner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { + let fn_size = std::mem::size_of::<F>(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { self.spawn_blocking_inner( Box::new(func), Mandatory::Mandatory, - None, + SpawnMeta::new_unnamed(fn_size), rt, ) } else { self.spawn_blocking_inner( func, Mandatory::Mandatory, - None, + SpawnMeta::new_unnamed(fn_size), rt, ) }; @@ -355,35 +368,16 @@ impl Spawner { &self, func: F, is_mandatory: Mandatory, - name: Option<&str>, + spawn_meta: SpawnMeta<'_>, rt: &Handle, ) -> (JoinHandle<R>, Result<(), SpawnError>) where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let fut = BlockingTask::new(func); let id = task::Id::next(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let fut = { - use tracing::Instrument; - let location = std::panic::Location::caller(); - let span = tracing::trace_span!( - target: "tokio::task::blocking", - "runtime.spawn", - kind = %"blocking", - task.name = %name.unwrap_or_default(), - task.id = id.as_u64(), - "fn" = %std::any::type_name::<F>(), - loc.file = location.file(), - loc.line = location.line(), - loc.col = location.column(), - ); - fut.instrument(span) - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let _ = name; + let fut = + blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64()); let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 7e3cd1504e5..9026e8773a0 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -18,10 +18,11 @@ pub struct Handle { use crate::runtime::task::JoinHandle; use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; +use crate::util::trace::SpawnMeta; use std::future::Future; use std::marker::PhantomData; -use std::{error, fmt}; +use std::{error, fmt, mem}; /// Runtime context guard. /// @@ -189,10 +190,11 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { - if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - self.spawn_named(Box::pin(future), None) + let fut_size = mem::size_of::<F>(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.spawn_named(future, None) + self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) } } @@ -296,15 +298,16 @@ impl Handle { /// [`tokio::time`]: crate::time #[track_caller] pub fn block_on<F: Future>(&self, future: F) -> F::Output { - if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - self.block_on_inner(Box::pin(future)) + let fut_size = mem::size_of::<F>(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.block_on_inner(future) + self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - fn block_on_inner<F: Future>(&self, future: F) -> F::Output { + fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { #[cfg(all( tokio_unstable, tokio_taskdump, @@ -316,7 +319,7 @@ impl Handle { #[cfg(all(tokio_unstable, feature = "tracing"))] let future = - crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64()); + crate::util::trace::task(future, "block_on", _meta, super::task::Id::next().as_u64()); // Enter the runtime context. This sets the current driver handles and // prevents blocking an existing runtime. @@ -326,7 +329,7 @@ impl Handle { } #[track_caller] - pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output> + pub(crate) fn spawn_named<F>(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static, @@ -341,7 +344,7 @@ impl Handle { ))] let future = super::task::trace::Trace::root(future); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", _name, id.as_u64()); + let future = crate::util::trace::task(future, "task", _meta, id.as_u64()); self.inner.spawn(future, id) } diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 74061d24ce8..4b22ae9747c 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -3,8 +3,10 @@ use crate::runtime::blocking::BlockingPool; use crate::runtime::scheduler::CurrentThread; use crate::runtime::{context, EnterGuard, Handle}; use crate::task::JoinHandle; +use crate::util::trace::SpawnMeta; use std::future::Future; +use std::mem; use std::time::Duration; cfg_rt_multi_thread! { @@ -241,10 +243,13 @@ impl Runtime { F: Future + Send + 'static, F::Output: Send + 'static, { - if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - self.handle.spawn_named(Box::pin(future), None) + let fut_size = mem::size_of::<F>(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.handle + .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.handle.spawn_named(future, None) + self.handle + .spawn_named(future, SpawnMeta::new_unnamed(fut_size)) } } @@ -329,15 +334,16 @@ impl Runtime { /// [handle]: fn@Handle::block_on #[track_caller] pub fn block_on<F: Future>(&self, future: F) -> F::Output { - if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - self.block_on_inner(Box::pin(future)) + let fut_size = mem::size_of::<F>(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.block_on_inner(future) + self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - fn block_on_inner<F: Future>(&self, future: F) -> F::Output { + fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { #[cfg(all( tokio_unstable, tokio_taskdump, @@ -351,7 +357,7 @@ impl Runtime { let future = crate::util::trace::task( future, "block_on", - None, + _meta, crate::runtime::task::Id::next().as_u64(), ); diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index c98849b2746..6053352a01c 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -2,8 +2,9 @@ use crate::{ runtime::{Handle, BOX_FUTURE_THRESHOLD}, task::{JoinHandle, LocalSet}, + util::trace::SpawnMeta, }; -use std::{future::Future, io}; +use std::{future::Future, io, mem}; /// Factory which is used to configure the properties of a new task. /// @@ -88,10 +89,11 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD { - super::spawn::spawn_inner(Box::pin(future), self.name) + let fut_size = mem::size_of::<Fut>(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + super::spawn::spawn_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - super::spawn::spawn_inner(future, self.name) + super::spawn::spawn_inner(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -108,10 +110,11 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD { - handle.spawn_named(Box::pin(future), self.name) + let fut_size = mem::size_of::<Fut>(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + handle.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - handle.spawn_named(future, self.name) + handle.spawn_named(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -135,10 +138,11 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD { - super::local::spawn_local_inner(Box::pin(future), self.name) + let fut_size = mem::size_of::<Fut>(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + super::local::spawn_local_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - super::local::spawn_local_inner(future, self.name) + super::local::spawn_local_inner(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -159,7 +163,12 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - Ok(local_set.spawn_named(future, self.name)) + let fut_size = mem::size_of::<Fut>(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + local_set.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size)) + } else { + local_set.spawn_named(future, SpawnMeta::new(self.name, fut_size)) + }) } /// Spawns blocking code on the blocking threadpool. @@ -200,19 +209,19 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, spawn_result) = if std::mem::size_of::<Function>() > BOX_FUTURE_THRESHOLD - { + let fn_size = mem::size_of::<Function>(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { handle.inner.blocking_spawner().spawn_blocking_inner( Box::new(function), Mandatory::NonMandatory, - self.name, + SpawnMeta::new(self.name, fn_size), handle, ) } else { handle.inner.blocking_spawner().spawn_blocking_inner( function, Mandatory::NonMandatory, - self.name, + SpawnMeta::new(self.name, fn_size), handle, ) }; diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 90d4d3612e8..d5341937893 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -6,6 +6,7 @@ use crate::runtime; use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task, TaskHarnessScheduleHooks}; use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD}; use crate::sync::AtomicWaker; +use crate::util::trace::SpawnMeta; use crate::util::RcCell; use std::cell::Cell; @@ -13,6 +14,7 @@ use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::marker::PhantomData; +use std::mem; use std::pin::Pin; use std::rc::Rc; use std::task::Poll; @@ -367,22 +369,23 @@ cfg_rt! { F: Future + 'static, F::Output: 'static, { - if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - spawn_local_inner(Box::pin(future), None) + let fut_size = std::mem::size_of::<F>(); + if fut_size > BOX_FUTURE_THRESHOLD { + spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - spawn_local_inner(future, None) + spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output> + pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static { match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) { None => panic!("`spawn_local` called from outside of a `task::LocalSet`"), - Some(cx) => cx.spawn(future, name) + Some(cx) => cx.spawn(future, meta) } } } @@ -521,7 +524,12 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { - self.spawn_named(future, None) + let fut_size = mem::size_of::<F>(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) + } else { + self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) + } } /// Runs a future to completion on the provided runtime, driving any local @@ -643,26 +651,22 @@ impl LocalSet { pub(in crate::task) fn spawn_named<F>( &self, future: F, - name: Option<&str>, + meta: SpawnMeta<'_>, ) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static, { - if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - self.spawn_named_inner(Box::pin(future), name) - } else { - self.spawn_named_inner(future, name) - } + self.spawn_named_inner(future, meta) } #[track_caller] - fn spawn_named_inner<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output> + fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static, { - let handle = self.context.spawn(future, name); + let handle = self.context.spawn(future, meta); // Because a task was spawned from *outside* the `LocalSet`, wake the // `LocalSet` future to execute the new task, if it hasn't been woken. @@ -949,13 +953,13 @@ impl Drop for LocalSet { impl Context { #[track_caller] - fn spawn<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output> + fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static, { let id = crate::runtime::task::Id::next(); - let future = crate::util::trace::task(future, "local", name, id.as_u64()); + let future = crate::util::trace::task(future, "local", meta, id.as_u64()); // Safety: called from the thread that owns the `LocalSet` let (handle, notified) = { diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 4208ac6e0c6..7c748226121 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,5 +1,6 @@ use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::task::JoinHandle; +use crate::util::trace::SpawnMeta; use std::future::Future; @@ -167,17 +168,16 @@ cfg_rt! { F: Future + Send + 'static, F::Output: Send + 'static, { - // preventing stack overflows on debug mode, by quickly sending the - // task to the heap. - if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { - spawn_inner(Box::pin(future), None) + let fut_size = std::mem::size_of::<F>(); + if fut_size > BOX_FUTURE_THRESHOLD { + spawn_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - spawn_inner(future, None) + spawn_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - pub(super) fn spawn_inner<T>(future: T, name: Option<&str>) -> JoinHandle<T::Output> + pub(super) fn spawn_inner<T>(future: T, meta: SpawnMeta<'_>) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static, @@ -197,7 +197,7 @@ cfg_rt! { ))] let future = task::trace::Trace::root(future); let id = task::Id::next(); - let task = crate::util::trace::task(future, "task", name, id.as_u64()); + let task = crate::util::trace::task(future, "task", meta, id.as_u64()); match context::with_current(|handle| handle.spawn(task, id)) { Ok(join_handle) => join_handle, diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index e1827686ca9..97006df474e 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,34 +1,110 @@ -cfg_trace! { - cfg_rt! { +cfg_rt! { + use std::marker::PhantomData; + + pub(crate) struct SpawnMeta<'a> { + /// The name of the task + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) name: Option<&'a str>, + /// The original size of the future or function being spawned + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) original_size: usize, + _pd: PhantomData<&'a ()>, + } + + impl<'a> SpawnMeta<'a> { + /// Create new spawn meta with a name and original size (before possible auto-boxing) + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) fn new(name: Option<&'a str>, original_size: usize) -> Self { + Self { + name, + original_size, + _pd: PhantomData, + } + } + + /// Create a new unnamed spawn meta with the original size (before possible auto-boxing) + pub(crate) fn new_unnamed(original_size: usize) -> Self { + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _original_size = original_size; + + Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + name: None, + #[cfg(all(tokio_unstable, feature = "tracing"))] + original_size, + _pd: PhantomData, + } + } + } + + cfg_trace! { use core::{ pin::Pin, task::{Context, Poll}, }; use pin_project_lite::pin_project; + use std::mem; use std::future::Future; + use tracing::instrument::Instrument; pub(crate) use tracing::instrument::Instrumented; #[inline] #[track_caller] - pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>, id: u64) -> Instrumented<F> { + pub(crate) fn task<F>(task: F, kind: &'static str, meta: SpawnMeta<'_>, id: u64) -> Instrumented<F> { #[track_caller] - fn get_span(kind: &'static str, name: Option<&str>, id: u64) -> tracing::Span { + fn get_span(kind: &'static str, spawn_meta: SpawnMeta<'_>, id: u64, task_size: usize) -> tracing::Span { let location = std::panic::Location::caller(); + let original_size = if spawn_meta.original_size != task_size { + Some(spawn_meta.original_size) + } else { + None + }; tracing::trace_span!( target: "tokio::task", parent: None, "runtime.spawn", %kind, - task.name = %name.unwrap_or_default(), + task.name = %spawn_meta.name.unwrap_or_default(), task.id = id, + original_size.bytes = original_size, + size.bytes = task_size, loc.file = location.file(), loc.line = location.line(), loc.col = location.column(), ) } use tracing::instrument::Instrument; - let span = get_span(kind, name, id); + let span = get_span(kind, meta, id, mem::size_of::<F>()); + task.instrument(span) + } + + #[inline] + #[track_caller] + pub(crate) fn blocking_task<Fn, Fut>(task: Fut, spawn_meta: SpawnMeta<'_>, id: u64) -> Instrumented<Fut> { + let location = std::panic::Location::caller(); + + let fn_size = mem::size_of::<Fn>(); + let original_size = if spawn_meta.original_size != fn_size { + Some(spawn_meta.original_size) + } else { + None + }; + + let span = tracing::trace_span!( + target: "tokio::task::blocking", + "runtime.spawn", + kind = %"blocking", + task.name = %spawn_meta.name.unwrap_or_default(), + task.id = id, + "fn" = %std::any::type_name::<Fn>(), + original_size.bytes = original_size, + size.bytes = fn_size, + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); task.instrument(span) + } pub(crate) fn async_op<P,F>(inner: P, resource_span: tracing::Span, source: &str, poll_op_name: &'static str, inherits_child_attrs: bool) -> InstrumentedAsyncOp<F> @@ -83,7 +159,23 @@ cfg_trace! { } } } + + cfg_not_trace! { + #[inline] + pub(crate) fn task<F>(task: F, _kind: &'static str, _meta: SpawnMeta<'_>, _id: u64) -> F { + // nop + task + } + + #[inline] + pub(crate) fn blocking_task<Fn, Fut>(task: Fut, _spawn_meta: SpawnMeta<'_>, _id: u64) -> Fut { + let _ = PhantomData::<&Fn>; + // nop + task + } + } } + cfg_time! { #[track_caller] pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> { @@ -93,13 +185,3 @@ cfg_time! { None } } - -cfg_not_trace! { - cfg_rt! { - #[inline] - pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>, _: u64) -> F { - // nop - task - } - } -} diff --git a/tokio/tests/tracing-instrumentation/tests/task.rs b/tokio/tests/tracing-instrumentation/tests/task.rs index 7bdb078e32c..fb215ca7ce0 100644 --- a/tokio/tests/tracing-instrumentation/tests/task.rs +++ b/tokio/tests/tracing-instrumentation/tests/task.rs @@ -3,6 +3,8 @@ //! These tests ensure that the instrumentation for task spawning and task //! lifecycles is correct. +use std::{mem, time::Duration}; + use tokio::task; use tracing_mock::{expect, span::NewSpan, subscriber}; @@ -93,6 +95,81 @@ async fn task_builder_loc_file_recorded() { handle.assert_finished(); } +#[tokio::test] +async fn task_spawn_sizes_recorded() { + let future = futures::future::ready(()); + let size = mem::size_of_val(&future) as u64; + + let task_span = expect::span() + .named("runtime.spawn") + .with_target("tokio::task") + // TODO(hds): check that original_size.bytes is NOT recorded when this can be done in + // tracing-mock without listing every other field. + .with_field(expect::field("size.bytes").with_value(&size)); + + let (subscriber, handle) = subscriber::mock().new_span(task_span).run_with_handle(); + + { + let _guard = tracing::subscriber::set_default(subscriber); + + task::Builder::new() + .spawn(future) + .unwrap() + .await + .expect("failed to await join handle"); + } + + handle.assert_finished(); +} + +#[tokio::test] +async fn task_big_spawn_sizes_recorded() { + let future = { + async fn big<const N: usize>() { + let mut a = [0_u8; N]; + for (idx, item) in a.iter_mut().enumerate() { + *item = (idx % 256) as u8; + } + tokio::time::sleep(Duration::from_millis(10)).await; + for (idx, item) in a.iter_mut().enumerate() { + assert_eq!(*item, (idx % 256) as u8); + } + } + + // This is larger than the release auto-boxing threshold + big::<20_000>() + }; + + fn boxed_size<T>(_: &T) -> usize { + mem::size_of::<Box<T>>() + } + let size = mem::size_of_val(&future) as u64; + let boxed_size = boxed_size(&future); + + let task_span = expect::span() + .named("runtime.spawn") + .with_target("tokio::task") + .with_field( + expect::field("size.bytes") + .with_value(&boxed_size) + .and(expect::field("original_size.bytes").with_value(&size)), + ); + + let (subscriber, handle) = subscriber::mock().new_span(task_span).run_with_handle(); + + { + let _guard = tracing::subscriber::set_default(subscriber); + + task::Builder::new() + .spawn(future) + .unwrap() + .await + .expect("failed to await join handle"); + } + + handle.assert_finished(); +} + /// Expect a task with name /// /// This is a convenience function to create the expectation for a new task