Skip to content

Commit

Permalink
Fix a bug in JoinHandle: We need two Wakers.
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Jan 17, 2025
1 parent 18a9ba5 commit 2b9f228
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
2 changes: 1 addition & 1 deletion iroh-net-report/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// Based on <https://github.com/tailscale/tailscale/blob/main/net/netcheck/netcheck.go>

#![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))]
#![deny(missing_docs, rustdoc::broken_intra_doc_links)]
// #![deny(missing_docs, rustdoc::broken_intra_doc_links)]
#![cfg_attr(not(test), deny(clippy::unwrap_used))]

use std::{
Expand Down
4 changes: 2 additions & 2 deletions iroh-net-report/src/reportgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl Actor {
delay=?timeout,
"Have enough probe reports, aborting further probes soon",
);
tokio::spawn(
task::spawn(
async move {
time::sleep(timeout).await;
// Because we do this after a timeout it is entirely normal that the
Expand Down Expand Up @@ -844,7 +844,7 @@ async fn run_probe(
.await
{
Ok((latency, ip)) => {
debug!(?latency, url = %node.url, "https latency measurement succeeded");
debug!(?latency, "latency");
result.latency = Some(latency);
// We set these IPv4 and IPv6 but they're not really used
// and we don't necessarily set them both. If UDP is blocked
Expand Down
62 changes: 42 additions & 20 deletions iroh-net-report/src/task/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ impl<T> JoinSet<T> {
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
futures_lite::future::poll_fn(|cx| {
let ret = self.handles.poll_next(cx);
match &ret {
Poll::Pending => tracing::info!("polled JoinSet::join_next (pending)"),
Poll::Ready(None) => tracing::info!("polled JoinSet::join_next: None"),
Poll::Ready(Some(Ok(_))) => {
tracing::info!("polled JoinSet::join_next: Some(Ok(_))")
}
Poll::Ready(Some(Err(e))) => {
tracing::info!("polled JoinSet::join_next: Some(Err({e:?}))")
}
}
// clean up handles that are either cancelled or have finished
self.to_cancel.retain(JoinHandle::is_running);
ret
Expand Down Expand Up @@ -87,7 +97,8 @@ pub struct JoinHandle<T> {
struct Task<T> {
cancelled: bool,
completed: bool,
waker: Option<Waker>,
waker_handler: Option<Waker>,
waker_spawn_fn: Option<Waker>,
result: Option<T>,
}

Expand All @@ -106,16 +117,27 @@ impl<T> Task<T> {
}

fn wake(&mut self) {
if let Some(waker) = self.waker.take() {
if let Some(waker) = self.waker_handler.take() {
waker.wake();
}
if let Some(waker) = self.waker_spawn_fn.take() {
waker.wake();
}
}

fn register(&mut self, cx: &mut Context<'_>) {
match self.waker {
fn register_handler(&mut self, cx: &mut Context<'_>) {
match self.waker_handler {
// clone_from can be marginally faster in some cases
Some(ref mut waker) => waker.clone_from(cx.waker()),
None => self.waker_handler = Some(cx.waker().clone()),
}
}

fn register_spawn_fn(&mut self, cx: &mut Context<'_>) {
match self.waker_spawn_fn {
// clone_from can be marginally faster in some cases
Some(ref mut waker) => waker.clone_from(cx.waker()),
None => self.waker = Some(cx.waker().clone()),
None => self.waker_spawn_fn = Some(cx.waker().clone()),
}
}
}
Expand All @@ -126,7 +148,8 @@ impl<T> JoinHandle<T> {
task: Rc::new(RefCell::new(Task {
cancelled: false,
completed: false,
waker: None,
waker_handler: None,
waker_spawn_fn: None,
result: None,
})),
}
Expand All @@ -136,14 +159,6 @@ impl<T> JoinHandle<T> {
self.task.borrow_mut().cancel();
}

fn register(&self, cx: &mut Context<'_>) {
self.task.borrow_mut().register(cx);
}

fn complete(&self, value: T) {
self.task.borrow_mut().complete(value);
}

fn is_running(&self) -> bool {
let task = self.task.borrow();
!task.cancelled && !task.completed
Expand All @@ -160,15 +175,20 @@ impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.task.borrow().cancelled {
tracing::info!("JoinHandle::poll");
let mut task = self.task.borrow_mut();
if task.cancelled {
tracing::info!("JoinHandle::poll: cancelled");
return Poll::Ready(Err(JoinError::Cancelled));
}

if let Some(result) = self.task.borrow_mut().result.take() {
if let Some(result) = task.result.take() {
tracing::info!("JoinHandle::poll: Ready(Ok(_))");
return Poll::Ready(Ok(result));
}

self.register(cx);
tracing::info!("JoinHandle::poll: Pending");
task.register_handler(cx);
Poll::Pending
}
}
Expand All @@ -185,18 +205,20 @@ impl<Fut: Future<Output = T>, T> Future for SpawnFuture<Fut, T> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut task = this.handle.task.borrow_mut();

if this.handle.task.borrow().cancelled {
if task.cancelled {
return Poll::Ready(());
}

match this.fut.poll(cx) {
Poll::Ready(value) => {
this.handle.complete(value);
tracing::info!("waking up");
task.complete(value);
Poll::Ready(())
}
Poll::Pending => {
this.handle.register(cx);
task.register_spawn_fn(cx);
Poll::Pending
}
}
Expand Down
10 changes: 6 additions & 4 deletions iroh-net-report/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ mod wasm {
}

pub fn reset(mut self: Pin<&mut Self>, deadline: Instant) {
let duration = deadline
.checked_duration_since(Instant::now())
.unwrap_or_default();
let duration = deadline.duration_since(Instant::now());
let triggered = Flag::new();

let closure = Closure::once({
let triggered = triggered.clone();
move || triggered.signal()
move || {
tracing::trace!("timeout triggered");
triggered.signal()
}
});

let timeout_id = SendWrapper::new(
Expand Down Expand Up @@ -231,6 +232,7 @@ mod wasm {
}

fn set_timeout(handler: js_sys::Function, timeout: i32) -> Result<JsValue, JsValue> {
tracing::trace!(?timeout, "setting timeout");
let global_this = js_sys::global();
let global_scope = global_this.unchecked_ref::<GlobalScope>();
global_scope.set_timeout_with_callback_and_timeout_and_arguments_0(handler, timeout)
Expand Down

0 comments on commit 2b9f228

Please sign in to comment.