Skip to content

Commit 4733c9f

Browse files
committed
Added new test to drop join handles druing schedule and moved existing
ref cycle test into a non-loom test (leaks will be detected by miri)
1 parent cea78d0 commit 4733c9f

File tree

3 files changed

+94
-33
lines changed

3 files changed

+94
-33
lines changed

tokio/src/runtime/tests/loom_current_thread.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod yield_now;
22

3-
use crate::loom::sync::atomic::AtomicUsize;
3+
use crate::loom::sync::atomic::{AtomicUsize, Ordering};
44
use crate::loom::sync::Arc;
55
use crate::loom::thread;
66
use crate::runtime::{Builder, Runtime};
@@ -9,7 +9,7 @@ use crate::task;
99
use std::future::Future;
1010
use std::pin::Pin;
1111
use std::sync::atomic::Ordering::{Acquire, Release};
12-
use std::task::{Context, Poll};
12+
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1313

1414
fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
1515
let (tx, rx) = oneshot::channel();
@@ -106,6 +106,60 @@ fn assert_no_unnecessary_polls() {
106106
});
107107
}
108108

109+
#[test]
110+
fn drop_jh_during_schedule() {
111+
unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
112+
let atomic = unsafe { &*(ptr as *const AtomicUsize) };
113+
atomic.fetch_add(1, Ordering::Relaxed);
114+
RawWaker::new(ptr, &VTABLE)
115+
}
116+
unsafe fn waker_drop(ptr: *const ()) {
117+
let atomic = unsafe { &*(ptr as *const AtomicUsize) };
118+
atomic.fetch_sub(1, Ordering::Relaxed);
119+
}
120+
unsafe fn waker_nop(_ptr: *const ()) {}
121+
122+
static VTABLE: RawWakerVTable =
123+
RawWakerVTable::new(waker_clone, waker_drop, waker_nop, waker_drop);
124+
125+
loom::model(|| {
126+
let rt = Builder::new_current_thread().build().unwrap();
127+
128+
let mut jh = rt.spawn(async {});
129+
// Using AbortHandle to increment task refcount. This ensures that the waker is not
130+
// destroyed due to the refcount hitting zero.
131+
let task_refcnt = jh.abort_handle();
132+
133+
let waker_refcnt = AtomicUsize::new(1);
134+
{
135+
// Set up the join waker.
136+
use std::future::Future;
137+
use std::pin::Pin;
138+
139+
// SAFETY: Before `waker_refcnt` goes out of scope, this test asserts that the refcnt
140+
// has dropped to zero.
141+
let join_waker = unsafe {
142+
Waker::from_raw(RawWaker::new(
143+
(&waker_refcnt) as *const AtomicUsize as *const (),
144+
&VTABLE,
145+
))
146+
};
147+
148+
assert!(Pin::new(&mut jh)
149+
.poll(&mut Context::from_waker(&join_waker))
150+
.is_pending());
151+
}
152+
assert_eq!(waker_refcnt.load(Ordering::Relaxed), 1);
153+
154+
let bg_thread = loom::thread::spawn(move || drop(jh));
155+
rt.block_on(crate::task::yield_now());
156+
bg_thread.join().unwrap();
157+
158+
assert_eq!(waker_refcnt.load(Ordering::Relaxed), 0);
159+
drop(task_refcnt);
160+
});
161+
}
162+
109163
struct BlockedFuture {
110164
rx: Receiver<()>,
111165
num_polls: Arc<AtomicUsize>,

tokio/src/runtime/tests/loom_multi_thread.rs

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ mod yield_now;
1010
/// In order to speed up the C
1111
use crate::runtime::tests::loom_oneshot as oneshot;
1212
use crate::runtime::{self, Runtime};
13-
use crate::sync::mpsc::channel;
1413
use crate::{spawn, task};
1514
use tokio_test::assert_ok;
1615

@@ -460,32 +459,3 @@ impl<T: Future> Future for Track<T> {
460459
})
461460
}
462461
}
463-
464-
#[test]
465-
fn drop_tasks_with_reference_cycle() {
466-
loom::model(|| {
467-
let pool = mk_pool(2);
468-
469-
pool.block_on(async move {
470-
let (tx, mut rx) = channel(1);
471-
472-
let (a_closer, mut wait_for_close_a) = channel::<()>(1);
473-
let (b_closer, mut wait_for_close_b) = channel::<()>(1);
474-
475-
let a = spawn(async move {
476-
let b = rx.recv().await.unwrap();
477-
478-
futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await;
479-
});
480-
481-
let b = spawn(async move {
482-
let _ = a.await;
483-
let _ = b_closer.send(()).await;
484-
});
485-
486-
tx.send(b).await.unwrap();
487-
488-
futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await;
489-
});
490-
});
491-
}

tokio/src/runtime/tests/task.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::runtime::task::{
22
self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
33
};
4-
use crate::runtime::tests::NoopSchedule;
4+
use crate::runtime::{self, tests::NoopSchedule};
5+
use crate::spawn;
6+
use crate::sync::{mpsc, Barrier};
57

68
use std::collections::VecDeque;
79
use std::future::Future;
@@ -45,6 +47,41 @@ impl Drop for AssertDrop {
4547
}
4648
}
4749

50+
#[test]
51+
fn drop_tasks_with_reference_cycle() {
52+
let rt = runtime::Builder::new_current_thread().build().unwrap();
53+
54+
rt.block_on(async {
55+
let (tx, mut rx) = mpsc::channel(1);
56+
57+
let barrier = Arc::new(Barrier::new(3));
58+
let barrier_a = barrier.clone();
59+
let barrier_b = barrier.clone();
60+
61+
let a = spawn(async move {
62+
let b = rx.recv().await.unwrap();
63+
64+
// Poll the JoinHandle once. This registers the waker.
65+
// The other task cannot have finished at this point due to the barrier below.
66+
futures::future::select(b, std::future::ready(())).await;
67+
68+
barrier_a.wait().await;
69+
});
70+
71+
let b = spawn(async move {
72+
// Poll the JoinHandle once. This registers the waker.
73+
// The other task cannot have finished at this point due to the barrier below.
74+
futures::future::select(a, std::future::ready(())).await;
75+
76+
barrier_b.wait().await;
77+
});
78+
79+
tx.send(b).await.unwrap();
80+
81+
barrier.wait().await;
82+
});
83+
}
84+
4885
// A Notified does not shut down on drop, but it is dropped once the ref-count
4986
// hits zero.
5087
#[test]

0 commit comments

Comments
 (0)