Skip to content

Commit

Permalink
tests: handle spurious EWOULDBLOCK in io_async_fd (#6776)
Browse files Browse the repository at this point in the history
* tests: handle spurious EWOULDBLOCK in io_async_fd

## Motivation

The `io_async_fd.rs` tests contain a `drain()` function, which
currently performs synchronous reads from a UDS socket until it returns
`io::ErrorKind::WouldBlock` (i.e., errno `EWOULDBLOCK`/`EAGAIN`). The
*intent* behind this function is to ensure that all data has been
drained from the UDS socket's buffer...which is what it appears to
do...on Linux. On other systems, it appears that an `EWOULDBLOCK` or
`EAGAIN` may be returned before enough data has been read from the UDS
socket to result in the other end being notified that the socket is now
writable. In particular, this appears to be the case on illumos, where
the tests using this function hang forever (see [this comment][1] on PR
#6769).

To my knowledge, this behavior is still POSIX-compliant --- the
reader will still be notified that the socket is readable, and if it
were actually doing non-blocking IO, it would continue reading upon
receipt of that notification. So, relying on `EWOULDBLOCK` to indicate
that the socket has been sufficiently drained appears to rely on
Linux/FreeBSD behavior that isn't necessarily portable to other Unices.

## Solution

This commit changes the `drain()` function to take an argument for the
number of bytes *written* to the socket previously, and continue looping
until it has read that many bytes, regardless of whether `EWOULDBLOCK`
is returned. This should ensure that the socket is drained on all
POSIX-compliant systems, and indeed, the `io_async_fd::reset_writable`
and `io_async_fd::poll_fns` tests no longer hang forever on illumos.

I think making this change is an appropriate solution to the
test failure here, as the `drain()` function is part of the test, rather
than the code in Tokio *being* tested, and (as I mentioned above) the
use of blocking reads on a non-blocking socket without a mechanism to
continue reading when the socket becomes readable again is not really
something a real life program seems likely to do. Ensuring that all the
written bytes have been read by passing in a byte count seems more
faithful to what the test is actually *trying* to do here, anyway.

Thanks to @jclulow for debugging what was going on here!

This change was cherry-picked from commit
f18d6ed from PR #6769, so that the fix
can be merged separately.

[1]: #6769 (comment)

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Aug 15, 2024
1 parent 39c3c19 commit 2d697fc
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,14 @@ fn socketpair() -> (FileDescriptor, FileDescriptor) {
fds
}

fn drain(mut fd: &FileDescriptor) {
fn drain(mut fd: &FileDescriptor, mut amt: usize) {
let mut buf = [0u8; 512];
#[allow(clippy::unused_io_amount)]
loop {
while amt > 0 {
match fd.read(&mut buf[..]) {
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
Ok(0) => panic!("unexpected EOF"),
Err(e) => panic!("unexpected error: {:?}", e),
Ok(_) => continue,
Ok(x) => amt -= x,
}
}
}
Expand Down Expand Up @@ -219,10 +218,10 @@ async fn reset_writable() {
let mut guard = afd_a.writable().await.unwrap();

// Write until we get a WouldBlock. This also clears the ready state.
while guard
.try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
.is_ok()
{}
let mut bytes = 0;
while let Ok(Ok(amt)) = guard.try_io(|_| afd_a.get_ref().write(&[0; 512][..])) {
bytes += amt;
}

// Writable state should be cleared now.
let writable = afd_a.writable();
Expand All @@ -234,7 +233,7 @@ async fn reset_writable() {
}

// Read from the other side; we should become writable now.
drain(&b);
drain(&b, bytes);

let _ = writable.await.unwrap();
}
Expand Down Expand Up @@ -386,7 +385,10 @@ async fn poll_fns() {
let afd_b = Arc::new(AsyncFd::new(b).unwrap());

// Fill up the write side of A
while afd_a.get_ref().write(&[0; 512]).is_ok() {}
let mut bytes = 0;
while let Ok(amt) = afd_a.get_ref().write(&[0; 512]) {
bytes += amt;
}

let waker = TestWaker::new();

Expand Down Expand Up @@ -446,7 +448,7 @@ async fn poll_fns() {
}

// Make it writable now
drain(afd_b.get_ref());
drain(afd_b.get_ref(), bytes);

// now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
let _ = write_fut.await;
Expand Down

0 comments on commit 2d697fc

Please sign in to comment.