Skip to content

Commit 7b55518

Browse files
xuorigdavidbarsky
andauthored
sync: avoid creating resource spans with curernt parent, use a None parent instead (#6107)
A child span stored on sync primitives can keep the parent span open, unable to be closed by subscribers due to the sync resource referencing it. Fixes: #6106 Co-authored-by: David Barsky <[email protected]>
1 parent 135d7ca commit 7b55518

File tree

7 files changed

+255
-0
lines changed

7 files changed

+255
-0
lines changed

tokio/src/sync/barrier.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl Barrier {
7474
let resource_span = {
7575
let location = std::panic::Location::caller();
7676
let resource_span = tracing::trace_span!(
77+
parent: None,
7778
"runtime.resource",
7879
concrete_type = "Barrier",
7980
kind = "Sync",

tokio/src/sync/mutex.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ impl<T: ?Sized> Mutex<T> {
340340
let location = std::panic::Location::caller();
341341

342342
tracing::trace_span!(
343+
parent: None,
343344
"runtime.resource",
344345
concrete_type = "Mutex",
345346
kind = "Sync",

tokio/src/sync/oneshot.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
473473
let location = std::panic::Location::caller();
474474

475475
let resource_span = tracing::trace_span!(
476+
parent: None,
476477
"runtime.resource",
477478
concrete_type = "Sender|Receiver",
478479
kind = "Sync",

tokio/src/sync/rwlock.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl<T: ?Sized> RwLock<T> {
209209
let resource_span = {
210210
let location = std::panic::Location::caller();
211211
let resource_span = tracing::trace_span!(
212+
parent: None,
212213
"runtime.resource",
213214
concrete_type = "RwLock",
214215
kind = "Sync",
@@ -282,6 +283,7 @@ impl<T: ?Sized> RwLock<T> {
282283
let location = std::panic::Location::caller();
283284

284285
let resource_span = tracing::trace_span!(
286+
parent: None,
285287
"runtime.resource",
286288
concrete_type = "RwLock",
287289
kind = "Sync",

tokio/src/sync/semaphore.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ impl Semaphore {
400400
let location = std::panic::Location::caller();
401401

402402
tracing::trace_span!(
403+
parent: None,
403404
"runtime.resource",
404405
concrete_type = "Semaphore",
405406
kind = "Sync",

tokio/src/time/interval.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ fn internal_interval_at(
122122
let location = location.expect("should have location if tracing");
123123

124124
tracing::trace_span!(
125+
parent: None,
125126
"runtime.resource",
126127
concrete_type = "Interval",
127128
kind = "timer",
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
//! Tests for sync instrumentation.
2+
//!
3+
//! These tests ensure that the instrumentation for tokio
4+
//! synchronization primitives is correct.
5+
6+
use tokio::sync;
7+
use tracing_mock::{expect, subscriber};
8+
9+
#[tokio::test]
10+
async fn test_barrier_creates_span() {
11+
let barrier_span = expect::span()
12+
.named("runtime.resource")
13+
.with_target("tokio::sync::barrier");
14+
15+
let size_event = expect::event()
16+
.with_target("runtime::resource::state_update")
17+
.with_fields(expect::field("size").with_value(&1u64));
18+
19+
let arrived_event = expect::event()
20+
.with_target("runtime::resource::state_update")
21+
.with_fields(expect::field("arrived").with_value(&0));
22+
23+
let (subscriber, handle) = subscriber::mock()
24+
.new_span(barrier_span.clone().with_explicit_parent(None))
25+
.enter(barrier_span.clone())
26+
.event(size_event)
27+
.event(arrived_event)
28+
.exit(barrier_span.clone())
29+
.drop_span(barrier_span)
30+
.run_with_handle();
31+
32+
{
33+
let _guard = tracing::subscriber::set_default(subscriber);
34+
let _ = sync::Barrier::new(1);
35+
}
36+
37+
handle.assert_finished();
38+
}
39+
40+
#[tokio::test]
41+
async fn test_mutex_creates_span() {
42+
let mutex_span = expect::span()
43+
.named("runtime.resource")
44+
.with_target("tokio::sync::mutex");
45+
46+
let locked_event = expect::event()
47+
.with_target("runtime::resource::state_update")
48+
.with_fields(expect::field("locked").with_value(&false));
49+
50+
let batch_semaphore_span = expect::span()
51+
.named("runtime.resource")
52+
.with_target("tokio::sync::batch_semaphore");
53+
54+
let batch_semaphore_permits_event = expect::event()
55+
.with_target("runtime::resource::state_update")
56+
.with_fields(expect::field("permits").with_value(&1u64))
57+
.with_fields(expect::field("permits.op").with_value(&"override"));
58+
59+
let (subscriber, handle) = subscriber::mock()
60+
.new_span(mutex_span.clone().with_explicit_parent(None))
61+
.enter(mutex_span.clone())
62+
.event(locked_event)
63+
.new_span(batch_semaphore_span.clone())
64+
.enter(batch_semaphore_span.clone())
65+
.event(batch_semaphore_permits_event)
66+
.exit(batch_semaphore_span.clone())
67+
.exit(mutex_span.clone())
68+
.drop_span(mutex_span)
69+
.drop_span(batch_semaphore_span)
70+
.run_with_handle();
71+
72+
{
73+
let _guard = tracing::subscriber::set_default(subscriber);
74+
let _ = sync::Mutex::new(true);
75+
}
76+
77+
handle.assert_finished();
78+
}
79+
80+
#[tokio::test]
81+
async fn test_oneshot_creates_span() {
82+
let oneshot_span = expect::span()
83+
.named("runtime.resource")
84+
.with_target("tokio::sync::oneshot");
85+
86+
let initial_tx_dropped_event = expect::event()
87+
.with_target("runtime::resource::state_update")
88+
.with_fields(expect::field("tx_dropped").with_value(&false))
89+
.with_fields(expect::field("tx_dropped.op").with_value(&"override"));
90+
91+
let final_tx_dropped_event = expect::event()
92+
.with_target("runtime::resource::state_update")
93+
.with_fields(expect::field("tx_dropped").with_value(&true))
94+
.with_fields(expect::field("tx_dropped.op").with_value(&"override"));
95+
96+
let initial_rx_dropped_event = expect::event()
97+
.with_target("runtime::resource::state_update")
98+
.with_fields(expect::field("rx_dropped").with_value(&false))
99+
.with_fields(expect::field("rx_dropped.op").with_value(&"override"));
100+
101+
let final_rx_dropped_event = expect::event()
102+
.with_target("runtime::resource::state_update")
103+
.with_fields(expect::field("rx_dropped").with_value(&true))
104+
.with_fields(expect::field("rx_dropped.op").with_value(&"override"));
105+
106+
let value_sent_event = expect::event()
107+
.with_target("runtime::resource::state_update")
108+
.with_fields(expect::field("value_sent").with_value(&false))
109+
.with_fields(expect::field("value_sent.op").with_value(&"override"));
110+
111+
let value_received_event = expect::event()
112+
.with_target("runtime::resource::state_update")
113+
.with_fields(expect::field("value_received").with_value(&false))
114+
.with_fields(expect::field("value_received.op").with_value(&"override"));
115+
116+
let async_op_span = expect::span()
117+
.named("runtime.resource.async_op")
118+
.with_target("tokio::sync::oneshot");
119+
120+
let async_op_poll_span = expect::span()
121+
.named("runtime.resource.async_op.poll")
122+
.with_target("tokio::sync::oneshot");
123+
124+
let (subscriber, handle) = subscriber::mock()
125+
.new_span(oneshot_span.clone().with_explicit_parent(None))
126+
.enter(oneshot_span.clone())
127+
.event(initial_tx_dropped_event)
128+
.exit(oneshot_span.clone())
129+
.enter(oneshot_span.clone())
130+
.event(initial_rx_dropped_event)
131+
.exit(oneshot_span.clone())
132+
.enter(oneshot_span.clone())
133+
.event(value_sent_event)
134+
.exit(oneshot_span.clone())
135+
.enter(oneshot_span.clone())
136+
.event(value_received_event)
137+
.exit(oneshot_span.clone())
138+
.enter(oneshot_span.clone())
139+
.new_span(async_op_span.clone())
140+
.exit(oneshot_span.clone())
141+
.enter(async_op_span.clone())
142+
.new_span(async_op_poll_span.clone())
143+
.exit(async_op_span.clone())
144+
.enter(oneshot_span.clone())
145+
.event(final_tx_dropped_event)
146+
.exit(oneshot_span.clone())
147+
.enter(oneshot_span.clone())
148+
.event(final_rx_dropped_event)
149+
.exit(oneshot_span.clone())
150+
.drop_span(oneshot_span)
151+
.drop_span(async_op_span)
152+
.drop_span(async_op_poll_span)
153+
.run_with_handle();
154+
155+
{
156+
let _guard = tracing::subscriber::set_default(subscriber);
157+
let _ = sync::oneshot::channel::<bool>();
158+
}
159+
160+
handle.assert_finished();
161+
}
162+
163+
#[tokio::test]
164+
async fn test_rwlock_creates_span() {
165+
let rwlock_span = expect::span()
166+
.named("runtime.resource")
167+
.with_target("tokio::sync::rwlock");
168+
169+
let max_readers_event = expect::event()
170+
.with_target("runtime::resource::state_update")
171+
.with_fields(expect::field("max_readers").with_value(&0x1FFFFFFF_u64));
172+
173+
let write_locked_event = expect::event()
174+
.with_target("runtime::resource::state_update")
175+
.with_fields(expect::field("write_locked").with_value(&false));
176+
177+
let current_readers_event = expect::event()
178+
.with_target("runtime::resource::state_update")
179+
.with_fields(expect::field("current_readers").with_value(&0));
180+
181+
let batch_semaphore_span = expect::span()
182+
.named("runtime.resource")
183+
.with_target("tokio::sync::batch_semaphore");
184+
185+
let batch_semaphore_permits_event = expect::event()
186+
.with_target("runtime::resource::state_update")
187+
.with_fields(expect::field("permits").with_value(&1u64))
188+
.with_fields(expect::field("permits.op").with_value(&"override"));
189+
190+
let (subscriber, handle) = subscriber::mock()
191+
.new_span(rwlock_span.clone().with_explicit_parent(None))
192+
.enter(rwlock_span.clone())
193+
.event(max_readers_event)
194+
.event(write_locked_event)
195+
.event(current_readers_event)
196+
.exit(rwlock_span.clone())
197+
.enter(rwlock_span.clone())
198+
.new_span(batch_semaphore_span.clone())
199+
.enter(batch_semaphore_span.clone())
200+
.event(batch_semaphore_permits_event)
201+
.exit(batch_semaphore_span.clone())
202+
.exit(rwlock_span.clone())
203+
.drop_span(rwlock_span)
204+
.drop_span(batch_semaphore_span)
205+
.run_with_handle();
206+
207+
{
208+
let _guard = tracing::subscriber::set_default(subscriber);
209+
let _ = sync::RwLock::new(true);
210+
}
211+
212+
handle.assert_finished();
213+
}
214+
215+
#[tokio::test]
216+
async fn test_semaphore_creates_span() {
217+
let semaphore_span = expect::span()
218+
.named("runtime.resource")
219+
.with_target("tokio::sync::semaphore");
220+
221+
let batch_semaphore_span = expect::span()
222+
.named("runtime.resource")
223+
.with_target("tokio::sync::batch_semaphore");
224+
225+
let batch_semaphore_permits_event = expect::event()
226+
.with_target("runtime::resource::state_update")
227+
.with_fields(expect::field("permits").with_value(&1u64))
228+
.with_fields(expect::field("permits.op").with_value(&"override"));
229+
230+
let (subscriber, handle) = subscriber::mock()
231+
.new_span(semaphore_span.clone().with_explicit_parent(None))
232+
.enter(semaphore_span.clone())
233+
.new_span(batch_semaphore_span.clone())
234+
.enter(batch_semaphore_span.clone())
235+
.event(batch_semaphore_permits_event)
236+
.exit(batch_semaphore_span.clone())
237+
.exit(semaphore_span.clone())
238+
.drop_span(semaphore_span)
239+
.drop_span(batch_semaphore_span)
240+
.run_with_handle();
241+
242+
{
243+
let _guard = tracing::subscriber::set_default(subscriber);
244+
let _ = sync::Semaphore::new(1);
245+
}
246+
247+
handle.assert_finished();
248+
}

0 commit comments

Comments
 (0)