Skip to content

Commit 278a2b1

Browse files
authored
Handle spurious wakeups (#740)
1 parent e8d7458 commit 278a2b1

File tree

1 file changed

+50
-16
lines changed

1 file changed

+50
-16
lines changed

content/tokio/tutorial/async.md

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -248,19 +248,19 @@ impl MiniTokio {
248248
tasks: VecDeque::new(),
249249
}
250250
}
251-
251+
252252
/// Spawn a future onto the mini-tokio instance.
253253
fn spawn<F>(&mut self, future: F)
254254
where
255255
F: Future<Output = ()> + Send + 'static,
256256
{
257257
self.tasks.push_back(Box::pin(future));
258258
}
259-
259+
260260
fn run(&mut self) {
261261
let waker = task::noop_waker();
262262
let mut cx = Context::from_waker(&waker);
263-
263+
264264
while let Some(mut task) = self.tasks.pop_front() {
265265
if task.as_mut().poll(&mut cx).is_pending() {
266266
self.tasks.push_back(task);
@@ -453,22 +453,33 @@ Wakers are `Sync` and can be cloned. When `wake` is called, the task must be
453453
scheduled for execution. To implement this, we have a channel. When the `wake()`
454454
is called on the waker, the task is pushed into the send half of the channel.
455455
Our `Task` structure will implement the wake logic. To do this, it needs to
456-
contain both the spawned future and the channel send half.
456+
contain both the spawned future and the channel send half. We place the future
457+
in a `TaskFuture` struct alongside a `Poll` enum to keep track of the latest
458+
`Future::poll()` result, which is needed to handle spurious wake-ups. More
459+
details are given in the implementation of the `poll()` method in `TaskFuture`.
457460

458461
```rust
459462
# use std::future::Future;
460463
# use std::pin::Pin;
461464
# use std::sync::mpsc;
465+
# use std::task::Poll;
462466
use std::sync::{Arc, Mutex};
463467

468+
/// A structure holding a future and the result of
469+
/// the latest call to its `poll` method.
470+
struct TaskFuture {
471+
future: Pin<Box<dyn Future<Output = ()> + Send>>,
472+
poll: Poll<()>,
473+
}
474+
464475
struct Task {
465476
// The `Mutex` is to make `Task` implement `Sync`. Only
466-
// one thread accesses `future` at any given time. The
467-
// `Mutex` is not required for correctness. Real Tokio
477+
// one thread accesses `task_future` at any given time.
478+
// The `Mutex` is not required for correctness. Real Tokio
468479
// does not use a mutex here, but real Tokio has
469480
// more lines of code than can fit in a single tutorial
470481
// page.
471-
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
482+
task_future: Mutex<TaskFuture>,
472483
executor: mpsc::Sender<Arc<Task>>,
473484
}
474485

@@ -520,13 +531,17 @@ channel. Next, we implement receiving and executing the tasks in the
520531
# use std::future::Future;
521532
# use std::pin::Pin;
522533
# use std::sync::{Arc, Mutex};
523-
# use std::task::{Context};
534+
# use std::task::{Context, Poll};
524535
# struct MiniTokio {
525536
# scheduled: mpsc::Receiver<Arc<Task>>,
526537
# sender: mpsc::Sender<Arc<Task>>,
527538
# }
539+
# struct TaskFuture {
540+
# future: Pin<Box<dyn Future<Output = ()> + Send>>,
541+
# poll: Poll<()>,
542+
# }
528543
# struct Task {
529-
# future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
544+
# task_future: Mutex<TaskFuture>,
530545
# executor: mpsc::Sender<Arc<Task>>,
531546
# }
532547
# impl ArcWake for Task {
@@ -558,18 +573,38 @@ impl MiniTokio {
558573
}
559574
}
560575

576+
impl TaskFuture {
577+
fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
578+
TaskFuture {
579+
future: Box::pin(future),
580+
poll: Poll::Pending,
581+
}
582+
}
583+
584+
fn poll(&mut self, cx: &mut Context<'_>) {
585+
// Spurious wake-ups are allowed, even after a future has
586+
// returned `Ready`. However, polling a future which has
587+
// already returned `Ready` is *not* allowed. For this
588+
// reason we need to check that the future is still pending
589+
// before we call it. Failure to do so can lead to a panic.
590+
if self.poll.is_pending() {
591+
self.poll = self.future.as_mut().poll(cx);
592+
}
593+
}
594+
}
595+
561596
impl Task {
562597
fn poll(self: Arc<Self>) {
563598
// Create a waker from the `Task` instance. This
564599
// uses the `ArcWake` impl from above.
565600
let waker = task::waker(self.clone());
566601
let mut cx = Context::from_waker(&waker);
567602

568-
// No other thread ever tries to lock the future
569-
let mut future = self.future.try_lock().unwrap();
603+
// No other thread ever tries to lock the task_future
604+
let mut task_future = self.task_future.try_lock().unwrap();
570605

571-
// Poll the future
572-
let _ = future.as_mut().poll(&mut cx);
606+
// Poll the inner future
607+
task_future.poll(&mut cx);
573608
}
574609

575610
// Spawns a new task with the given future.
@@ -582,13 +617,12 @@ impl Task {
582617
F: Future<Output = ()> + Send + 'static,
583618
{
584619
let task = Arc::new(Task {
585-
future: Mutex::new(Box::pin(future)),
620+
task_future: Mutex::new(TaskFuture::new(future)),
586621
executor: sender.clone(),
587622
});
588623

589624
let _ = sender.send(task);
590625
}
591-
592626
}
593627
```
594628

@@ -638,7 +672,7 @@ use std::pin::Pin;
638672
# type Output = ();
639673
# fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
640674
# Poll::Pending
641-
# }
675+
# }
642676
# }
643677

644678
#[tokio::main]

0 commit comments

Comments
 (0)