Skip to content

Commit

Permalink
Merge branch 'master' into feat-mpsc-many-permit
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore authored Dec 21, 2023
2 parents 6a1ffbf + e7214e3 commit d585434
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.35.0", features = ["full"] }
tokio = { version = "1.35.1", features = ["full"] }
```
Then, on your main.rs:

Expand Down
27 changes: 27 additions & 0 deletions tokio/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# 1.35.1 (December 19, 2023)

This is a forward part of a change that was backported to 1.25.3.

### Fixed

- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221])

[#6221]: https://github.com/tokio-rs/tokio/pull/6221

# 1.35.0 (December 8th, 2023)

### Added
Expand Down Expand Up @@ -153,6 +163,16 @@
[#6056]: https://github.com/tokio-rs/tokio/pull/6056
[#6058]: https://github.com/tokio-rs/tokio/pull/6058

# 1.32.1 (December 19, 2023)

This is a forward part of a change that was backported to 1.25.3.

### Fixed

- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221])

[#6221]: https://github.com/tokio-rs/tokio/pull/6221

# 1.32.0 (August 16, 2023)

### Fixed
Expand Down Expand Up @@ -515,6 +535,13 @@ This release bumps the MSRV of Tokio to 1.56. ([#5559])
[#5513]: https://github.com/tokio-rs/tokio/pull/5513
[#5517]: https://github.com/tokio-rs/tokio/pull/5517

# 1.25.3 (December 17th, 2023)

### Fixed
- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221])

[#6221]: https://github.com/tokio-rs/tokio/pull/6221

# 1.25.2 (September 22, 2023)

Forward ports 1.20.6 changes.
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.x.y" git tag.
version = "1.35.0"
version = "1.35.1"
edition = "2021"
rust-version = "1.63"
authors = ["Tokio Contributors <[email protected]>"]
Expand Down
2 changes: 1 addition & 1 deletion tokio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.35.0", features = ["full"] }
tokio = { version = "1.35.1", features = ["full"] }
```
Then, on your main.rs:

Expand Down
117 changes: 117 additions & 0 deletions tokio/src/io/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Join two values implementing `AsyncRead` and `AsyncWrite` into a single one.
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Join two values implementing `AsyncRead` and `AsyncWrite` into a
/// single handle.
pub fn join<R, W>(reader: R, writer: W) -> Join<R, W>
where
R: AsyncRead,
W: AsyncWrite,
{
Join { reader, writer }
}

pin_project_lite::pin_project! {
/// Joins two values implementing `AsyncRead` and `AsyncWrite` into a
/// single handle.
#[derive(Debug)]
pub struct Join<R, W> {
#[pin]
reader: R,
#[pin]
writer: W,
}
}

impl<R, W> Join<R, W>
where
R: AsyncRead,
W: AsyncWrite,
{
/// Splits this `Join` back into its `AsyncRead` and `AsyncWrite`
/// components.
pub fn into_inner(self) -> (R, W) {
(self.reader, self.writer)
}

/// Returns a reference to the inner reader.
pub fn reader(&self) -> &R {
&self.reader
}

/// Returns a reference to the inner writer.
pub fn writer(&self) -> &W {
&self.writer
}

/// Returns a mutable reference to the inner reader.
pub fn reader_mut(&mut self) -> &mut R {
&mut self.reader
}

/// Returns a mutable reference to the inner writer.
pub fn writer_mut(&mut self) -> &mut W {
&mut self.writer
}

/// Returns a pinned mutable reference to the inner reader.
pub fn reader_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().reader
}

/// Returns a pinned mutable reference to the inner writer.
pub fn writer_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
self.project().writer
}
}

impl<R, W> AsyncRead for Join<R, W>
where
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
self.project().reader.poll_read(cx, buf)
}
}

impl<R, W> AsyncWrite for Join<R, W>
where
W: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.project().writer.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().writer.poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().writer.poll_shutdown(cx)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
self.project().writer.poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.writer.is_write_vectored()
}
}
2 changes: 2 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ cfg_io_std! {
cfg_io_util! {
mod split;
pub use split::{split, ReadHalf, WriteHalf};
mod join;
pub use join::{join, Join};

pub(crate) mod seek;
pub(crate) mod util;
Expand Down
69 changes: 65 additions & 4 deletions tokio/src/io/util/empty.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::io::{AsyncBufRead, AsyncRead, ReadBuf};
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};

use std::fmt;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

cfg_io_util! {
/// An async reader which is always at EOF.
/// `Empty` ignores any data written via [`AsyncWrite`], and will always be empty
/// (returning zero bytes) when read via [`AsyncRead`].
///
/// This struct is generally created by calling [`empty`]. Please see
/// the documentation of [`empty()`][`empty`] for more details.
Expand All @@ -19,9 +20,12 @@ cfg_io_util! {
_p: (),
}

/// Creates a new empty async reader.
/// Creates a value that is always at EOF for reads, and ignores all data written.
///
/// All reads from the returned reader will return `Poll::Ready(Ok(0))`.
/// All writes on the returned instance will return `Poll::Ready(Ok(buf.len()))`
/// and the contents of the buffer will not be inspected.
///
/// All reads from the returned instance will return `Poll::Ready(Ok(0))`.
///
/// This is an asynchronous version of [`std::io::empty`][std].
///
Expand All @@ -41,6 +45,19 @@ cfg_io_util! {
/// assert!(buffer.is_empty());
/// }
/// ```
///
/// A convoluted way of getting the length of a buffer:
///
/// ```
/// use tokio::io::{self, AsyncWriteExt};
///
/// #[tokio::main]
/// async fn main() {
/// let buffer = vec![1, 2, 3, 5, 8];
/// let num_bytes = io::empty().write(&buffer).await.unwrap();
/// assert_eq!(num_bytes, 5);
/// }
/// ```
pub fn empty() -> Empty {
Empty { _p: () }
}
Expand Down Expand Up @@ -71,6 +88,50 @@ impl AsyncBufRead for Empty {
fn consume(self: Pin<&mut Self>, _: usize) {}
}

impl AsyncWrite for Empty {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(buf.len()))
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(()))
}

#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(()))
}

#[inline]
fn is_write_vectored(&self) -> bool {
true
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
let num_bytes = bufs.iter().map(|b| b.len()).sum();
Poll::Ready(Ok(num_bytes))
}
}

impl fmt::Debug for Empty {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Empty { .. }")
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,16 @@ impl Registration {
loop {
let event = self.readiness(interest).await?;

let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await;

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
}
x => return x,
x => {
coop.made_progress();
return x;
}
}
}
}
Expand Down
77 changes: 77 additions & 0 deletions tokio/tests/coop_budget.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;

/// Ensure that UDP sockets have functional budgeting
///
/// # Design
/// Two sockets communicate by spamming packets from one to the other.
///
/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
/// send system call because we are using the loopback interface.
/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
/// entirety of the lifecycle of a packet within the kernel network stack.
///
/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
/// is through budgeting.
///
/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
/// and there are two budget events per packet, a send and a recv.
#[tokio::test]
async fn coop_budget_udp_send_recv() {
const BUDGET: usize = 128;
const N_ITERATIONS: usize = 1024;

const PACKET: &[u8] = b"Hello, world";
const PACKET_LEN: usize = 12;

assert_eq!(
PACKET_LEN,
PACKET.len(),
"Defect in test, programmer can't do math"
);

// bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();

tx.connect(rx.local_addr().unwrap()).await.unwrap();
rx.connect(tx.local_addr().unwrap()).await.unwrap();

let tracker = Arc::new(AtomicUsize::default());

let tracker_clone = Arc::clone(&tracker);

tokio::task::yield_now().await;

tokio::spawn(async move {
loop {
tracker_clone.fetch_add(1, Ordering::SeqCst);

tokio::task::yield_now().await;
}
});

for _ in 0..N_ITERATIONS {
tx.send(PACKET).await.unwrap();

let mut tmp = [0; PACKET_LEN];

// ensure that we aren't somehow accumulating other
assert_eq!(
PACKET_LEN,
rx.recv(&mut tmp).await.unwrap(),
"Defect in test case, received unexpected result from socket"
);
assert_eq!(
PACKET, &tmp,
"Defect in test case, received unexpected result from socket"
);
}

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}
Loading

0 comments on commit d585434

Please sign in to comment.