Skip to content

Commit

Permalink
Add tokio taskname registration for use in tokio-console (#89)
Browse files Browse the repository at this point in the history
This introduces the `tracing` feature that has to be enabled and also
the environment variable `RUSTFLAGS="--cfg tokio_unstable"` must be set to
enable tokio taskname registration.

See example program `tokio-console.rs` for usage.

---------

Co-authored-by: Finomnis <[email protected]>
  • Loading branch information
hirschenberger and Finomnis authored Nov 24, 2024
1 parent 8b2d7f1 commit e3cee38
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 62 deletions.
9 changes: 3 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ jobs:
uses: taiki-e/install-action@cross

- name: Build
run: cross build --all-features --release --target=${{ matrix.target }}
run: cross build --release --target=${{ matrix.target }}

build-examples:
name: Build Examples
runs-on: ubuntu-latest
needs: [lints, docs]
env:
RUSTFLAGS: "-D warnings"
RUSTFLAGS: "-D warnings --cfg tokio_unstable"
steps:
- name: Checkout sources
uses: actions/checkout@v4
Expand Down Expand Up @@ -112,9 +112,6 @@ jobs:
- name: Check with minimal versions
run: cargo minimal-versions check --workspace --ignore-private

- name: Test with minimal versions
run: cargo minimal-versions test -- --test-threads 1

min-versions-msrv:
name: Minimal Dependency Versions (MSRV)
runs-on: ubuntu-latest
Expand Down Expand Up @@ -165,7 +162,7 @@ jobs:
run: cargo fmt --all -- --check

- name: Run cargo clippy
run: cargo clippy --all-features --all-targets -- -D warnings
run: cargo clippy --all-targets -- -D warnings

docs:
name: Documentation
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ jobs:
uses: actions/checkout@v4
- name: Install llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
#- uses: Swatinem/rust-cache@v1
- name: Compute Coverage
run:
cargo llvm-cov --all-features --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json
cargo llvm-cov --workspace --ignore-filename-regex tests.rs --codecov --output-path codecov.json
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/rust-clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ jobs:
- name: Run rust-clippy
run:
cargo clippy
--all-features
--all-targets
--message-format=json | clippy-sarif | tee rust-clippy-results.sarif | sarif-fmt
continue-on-error: true
Expand Down
18 changes: 18 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ exclude = [
"/UPCOMING_VERSION_CHANGES.txt",
]

[features]
# Enable task naming and task caller location.
tracing = ["tokio/tracing"]

[[example]]
name = "tokio_console"
required-features = ["tracing"]


[dependencies]
tracing = { version = "0.1.37", default-features = false }

Expand Down Expand Up @@ -67,10 +76,19 @@ headers = ">= 0.3.5" # Required to fix minimal-versions
serde_urlencoded = ">= 0.7.1" # Required to fix minimal-versions
unicode-linebreak = ">= 0.1.5" # Required to fix minimal-versions

gcc = ">= 0.3.4" # Required to fix minimal-versions

# tokio-console
console-subscriber = "0.2.0"

# For testing unix signals
[target.'cfg(unix)'.dev-dependencies]
nix = { version = "0.29.0", default-features = false, features = ["signal"] }

# Make leak sanitizer more reliable
[profile.dev]
opt-level = 1

# Define `tokio_unstable` config for linter
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
66 changes: 66 additions & 0 deletions examples/tokio_console.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! This example demonstrates how to use the tokio-console application for tracing tokio tasks's
//! runtime behaviour. Subsystems will appear under their registration names.
//!
//! Run this example with:
//!
//! ```
//! RUSTFLAGS="--cfg tokio_unstable" cargo run --features "tracing" --example tokio_console
//! ```
//!
//! Then, open the `tokio-console` application (see https://crates.io/crates/tokio-console) to
//! follow the subsystem tasks live.
use miette::Result;
use tokio::time::{sleep, Duration};
use tokio_graceful_shutdown::{FutureExt, SubsystemBuilder, SubsystemHandle, Toplevel};
use tracing::Level;
use tracing_subscriber::{fmt::writer::MakeWriterExt, prelude::*};

async fn child(subsys: SubsystemHandle) -> Result<()> {
sleep(Duration::from_millis(3000))
.cancel_on_shutdown(&subsys)
.await
.ok();
Ok(())
}

async fn parent(subsys: SubsystemHandle) -> Result<()> {
tracing::info!("Parent started.");

let mut iteration = 0;
while !subsys.is_shutdown_requested() {
subsys.start(SubsystemBuilder::new(format!("child{iteration}"), child));
iteration += 1;

sleep(Duration::from_millis(1000))
.cancel_on_shutdown(&subsys)
.await
.ok();
}

tracing::info!("Parent stopped.");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
// Init tokio-console server and tracing
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(
tracing_subscriber::fmt::layer()
.with_writer(std::io::stdout.with_max_level(Level::DEBUG))
.compact(),
)
.init();

// Setup and execute subsystem tree
Toplevel::new(|s| async move {
s.start(SubsystemBuilder::new("parent", parent));
})
.catch_signals()
.handle_shutdown_requests(Duration::from_millis(1000))
.await
.map_err(Into::into)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ mod into_subsystem;
mod runner;
mod signal_handling;
mod subsystem;
mod tokio_task;
mod toplevel;
mod utils;

Expand Down
101 changes: 54 additions & 47 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) struct SubsystemRunner {
}

impl SubsystemRunner {
#[track_caller]
pub(crate) fn new<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
name: Arc<str>,
subsystem: Subsys,
Expand All @@ -32,8 +33,8 @@ impl SubsystemRunner {
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
{
let future = async { run_subsystem(name, subsystem, subsystem_handle, guard).await };
let aborthandle = tokio::spawn(future).abort_handle();
let future = run_subsystem(name, subsystem, subsystem_handle, guard);
let aborthandle = crate::tokio_task::spawn(future, "subsystem_runner").abort_handle();
SubsystemRunner { aborthandle }
}
}
Expand All @@ -44,67 +45,73 @@ impl Drop for SubsystemRunner {
}
}

async fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
#[track_caller]
fn run_subsystem<Fut, Subsys, ErrType: ErrTypeTraits, Err>(
name: Arc<str>,
subsystem: Subsys,
mut subsystem_handle: SubsystemHandle<ErrType>,
guard: AliveGuard,
) where
) -> impl Future<Output = ()> + 'static
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Fut: 'static + Future<Output = Result<(), Err>> + Send,
Err: Into<ErrType>,
{
let mut redirected_subsystem_handle = subsystem_handle.delayed_clone();

let future = async { subsystem(subsystem_handle).await.map_err(|e| e.into()) };
let join_handle = tokio::spawn(future);
let join_handle = crate::tokio_task::spawn(future, &name);

// Abort on drop
guard.on_cancel({
let abort_handle = join_handle.abort_handle();
let name = Arc::clone(&name);
move || {
if !abort_handle.is_finished() {
tracing::warn!("Subsystem cancelled: '{}'", name);
async move {
// Abort on drop
guard.on_cancel({
let abort_handle = join_handle.abort_handle();
let name = Arc::clone(&name);
move || {
if !abort_handle.is_finished() {
tracing::warn!("Subsystem cancelled: '{}'", name);
}
abort_handle.abort();
}
abort_handle.abort();
}
});
});

let failure = match join_handle.await {
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};
let failure = match join_handle.await {
Ok(Ok(())) => None,
Ok(Err(e)) => Some(SubsystemError::Failed(name, SubsystemFailure(e))),
Err(e) => {
// We can assume that this is a panic, because a cancellation
// can never happen as long as we still hold `guard`.
assert!(e.is_panic());
Some(SubsystemError::Panicked(name))
}
};

// Retrieve the handle that was passed into the subsystem.
// Originally it was intended to pass the handle as reference, but due
// to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters)
// it was decided to pass ownership instead.
//
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => {
tracing::error!("The SubsystemHandle object must not be leaked out of the subsystem!");
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
// Retrieve the handle that was passed into the subsystem.
// Originally it was intended to pass the handle as reference, but due
// to complications (https://stackoverflow.com/questions/77172947/async-lifetime-issues-of-pass-by-reference-parameters)
// it was decided to pass ownership instead.
//
// It is still important that the handle does not leak out of the subsystem.
let subsystem_handle = match redirected_subsystem_handle.try_recv() {
Ok(s) => s,
Err(_) => {
tracing::error!(
"The SubsystemHandle object must not be leaked out of the subsystem!"
);
panic!("The SubsystemHandle object must not be leaked out of the subsystem!");
}
};

// Raise potential errors
let joiner_token = subsystem_handle.joiner_token;
if let Some(failure) = failure {
joiner_token.raise_failure(failure);
}
};

// Raise potential errors
let joiner_token = subsystem_handle.joiner_token;
if let Some(failure) = failure {
joiner_token.raise_failure(failure);
// Wait for children to finish before we destroy the `SubsystemHandle` object.
// Otherwise the children would be cancelled immediately.
//
// This is the main mechanism that forwards a cancellation to all the children.
joiner_token.downgrade().join().await;
}

// Wait for children to finish before we destroy the `SubsystemHandle` object.
// Otherwise the children would be cancelled immediately.
//
// This is the main mechanism that forwards a cancellation to all the children.
joiner_token.downgrade().join().await;
}
8 changes: 7 additions & 1 deletion src/subsystem/subsystem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
/// Ok(())
/// }
/// ```
#[track_caller]
pub fn start<Err, Fut, Subsys>(
&self,
builder: SubsystemBuilder<ErrType, Err, Fut, Subsys>,
Expand All @@ -82,7 +83,11 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
Err: Into<ErrType>,
{
self.start_with_abs_name(
Arc::from(format!("{}/{}", self.inner.name, builder.name)),
if self.inner.name.as_ref() == "/" {
Arc::from(format!("/{}", builder.name))
} else {
Arc::from(format!("{}/{}", self.inner.name, builder.name))
},
builder.subsystem,
ErrorActions {
on_failure: Atomic::new(builder.failure_action),
Expand All @@ -92,6 +97,7 @@ impl<ErrType: ErrTypeTraits> SubsystemHandle<ErrType> {
)
}

#[track_caller]
pub(crate) fn start_with_abs_name<Err, Fut, Subsys>(
&self,
name: Arc<str>,
Expand Down
23 changes: 23 additions & 0 deletions src/tokio_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::future::Future;
use tokio::task::JoinHandle;

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
#[track_caller]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, _name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
{
tokio::spawn(f)
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
#[track_caller]
pub(crate) fn spawn<F: Future + Send + 'static>(f: F, name: &str) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
{
tokio::task::Builder::new()
.name(name)
.spawn(f)
.expect("a task should be spawned")
}
15 changes: 10 additions & 5 deletions src/toplevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
/// * `subsystem` - The subsystem that should be spawned as the root node.
/// Usually the job of this subsystem is to spawn further subsystems.
#[allow(clippy::new_without_default)]
#[track_caller]
pub fn new<Fut, Subsys>(subsystem: Subsys) -> Self
where
Subsys: 'static + FnOnce(SubsystemHandle<ErrType>) -> Fut + Send,
Expand All @@ -78,7 +79,7 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
});

let toplevel_subsys = root_handle.start_with_abs_name(
Arc::from(""),
Arc::from("/"),
move |s| async move {
subsystem(s).await;
Result::<(), ErrType>::Ok(())
Expand Down Expand Up @@ -118,13 +119,17 @@ impl<ErrType: ErrTypeTraits> Toplevel<ErrType> {
///
/// Especially the caveats from [tokio::signal::unix::Signal] are important for Unix targets.
///
#[track_caller]
pub fn catch_signals(self) -> Self {
let shutdown_token = self.root_handle.get_cancellation_token().clone();

tokio::spawn(async move {
wait_for_signal().await;
shutdown_token.cancel();
});
crate::tokio_task::spawn(
async move {
wait_for_signal().await;
shutdown_token.cancel();
},
"catch_signals",
);

self
}
Expand Down

0 comments on commit e3cee38

Please sign in to comment.