diff --git a/README.md b/README.md index a9e348756fe..00979b4dfa3 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.35.0", features = ["full"] } +tokio = { version = "1.35.1", features = ["full"] } ``` Then, on your main.rs: diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index 147b62d332d..1b04986af8b 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -1,3 +1,13 @@ +# 1.35.1 (December 19, 2023) + +This is a forward part of a change that was backported to 1.25.3. + +### Fixed + +- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221]) + +[#6221]: https://github.com/tokio-rs/tokio/pull/6221 + # 1.35.0 (December 8th, 2023) ### Added @@ -153,6 +163,16 @@ [#6056]: https://github.com/tokio-rs/tokio/pull/6056 [#6058]: https://github.com/tokio-rs/tokio/pull/6058 +# 1.32.1 (December 19, 2023) + +This is a forward part of a change that was backported to 1.25.3. + +### Fixed + +- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221]) + +[#6221]: https://github.com/tokio-rs/tokio/pull/6221 + # 1.32.0 (August 16, 2023) ### Fixed @@ -515,6 +535,13 @@ This release bumps the MSRV of Tokio to 1.56. ([#5559]) [#5513]: https://github.com/tokio-rs/tokio/pull/5513 [#5517]: https://github.com/tokio-rs/tokio/pull/5517 +# 1.25.3 (December 17th, 2023) + +### Fixed +- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221]) + +[#6221]: https://github.com/tokio-rs/tokio/pull/6221 + # 1.25.2 (September 22, 2023) Forward ports 1.20.6 changes. diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 17d5673ae37..05157cdb5a4 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -6,7 +6,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.x.y" git tag. -version = "1.35.0" +version = "1.35.1" edition = "2021" rust-version = "1.63" authors = ["Tokio Contributors "] diff --git a/tokio/README.md b/tokio/README.md index a9e348756fe..00979b4dfa3 100644 --- a/tokio/README.md +++ b/tokio/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.35.0", features = ["full"] } +tokio = { version = "1.35.1", features = ["full"] } ``` Then, on your main.rs: diff --git a/tokio/src/io/join.rs b/tokio/src/io/join.rs new file mode 100644 index 00000000000..dbc7043b67e --- /dev/null +++ b/tokio/src/io/join.rs @@ -0,0 +1,117 @@ +//! Join two values implementing `AsyncRead` and `AsyncWrite` into a single one. + +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Join two values implementing `AsyncRead` and `AsyncWrite` into a +/// single handle. +pub fn join(reader: R, writer: W) -> Join +where + R: AsyncRead, + W: AsyncWrite, +{ + Join { reader, writer } +} + +pin_project_lite::pin_project! { + /// Joins two values implementing `AsyncRead` and `AsyncWrite` into a + /// single handle. + #[derive(Debug)] + pub struct Join { + #[pin] + reader: R, + #[pin] + writer: W, + } +} + +impl Join +where + R: AsyncRead, + W: AsyncWrite, +{ + /// Splits this `Join` back into its `AsyncRead` and `AsyncWrite` + /// components. + pub fn into_inner(self) -> (R, W) { + (self.reader, self.writer) + } + + /// Returns a reference to the inner reader. + pub fn reader(&self) -> &R { + &self.reader + } + + /// Returns a reference to the inner writer. + pub fn writer(&self) -> &W { + &self.writer + } + + /// Returns a mutable reference to the inner reader. + pub fn reader_mut(&mut self) -> &mut R { + &mut self.reader + } + + /// Returns a mutable reference to the inner writer. + pub fn writer_mut(&mut self) -> &mut W { + &mut self.writer + } + + /// Returns a pinned mutable reference to the inner reader. + pub fn reader_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.project().reader + } + + /// Returns a pinned mutable reference to the inner writer. + pub fn writer_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.project().writer + } +} + +impl AsyncRead for Join +where + R: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.project().reader.poll_read(cx, buf) + } +} + +impl AsyncWrite for Join +where + W: AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().writer.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + self.project().writer.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.writer.is_write_vectored() + } +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 0fd6cc2c5cb..ff35a0e0f7e 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -265,6 +265,8 @@ cfg_io_std! { cfg_io_util! { mod split; pub use split::{split, ReadHalf, WriteHalf}; + mod join; + pub use join::{join, Join}; pub(crate) mod seek; pub(crate) mod util; diff --git a/tokio/src/io/util/empty.rs b/tokio/src/io/util/empty.rs index b96fabbaabe..06be4ff3073 100644 --- a/tokio/src/io/util/empty.rs +++ b/tokio/src/io/util/empty.rs @@ -1,4 +1,4 @@ -use crate::io::{AsyncBufRead, AsyncRead, ReadBuf}; +use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; use std::fmt; use std::io; @@ -6,7 +6,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; cfg_io_util! { - /// An async reader which is always at EOF. + /// `Empty` ignores any data written via [`AsyncWrite`], and will always be empty + /// (returning zero bytes) when read via [`AsyncRead`]. /// /// This struct is generally created by calling [`empty`]. Please see /// the documentation of [`empty()`][`empty`] for more details. @@ -19,9 +20,12 @@ cfg_io_util! { _p: (), } - /// Creates a new empty async reader. + /// Creates a value that is always at EOF for reads, and ignores all data written. /// - /// All reads from the returned reader will return `Poll::Ready(Ok(0))`. + /// All writes on the returned instance will return `Poll::Ready(Ok(buf.len()))` + /// and the contents of the buffer will not be inspected. + /// + /// All reads from the returned instance will return `Poll::Ready(Ok(0))`. /// /// This is an asynchronous version of [`std::io::empty`][std]. /// @@ -41,6 +45,19 @@ cfg_io_util! { /// assert!(buffer.is_empty()); /// } /// ``` + /// + /// A convoluted way of getting the length of a buffer: + /// + /// ``` + /// use tokio::io::{self, AsyncWriteExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let buffer = vec![1, 2, 3, 5, 8]; + /// let num_bytes = io::empty().write(&buffer).await.unwrap(); + /// assert_eq!(num_bytes, 5); + /// } + /// ``` pub fn empty() -> Empty { Empty { _p: () } } @@ -71,6 +88,50 @@ impl AsyncBufRead for Empty { fn consume(self: Pin<&mut Self>, _: usize) {} } +impl AsyncWrite for Empty { + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + ready!(poll_proceed_and_make_progress(cx)); + Poll::Ready(Ok(buf.len())) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + ready!(poll_proceed_and_make_progress(cx)); + Poll::Ready(Ok(())) + } + + #[inline] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + ready!(poll_proceed_and_make_progress(cx)); + Poll::Ready(Ok(())) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + true + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + ready!(crate::trace::trace_leaf(cx)); + ready!(poll_proceed_and_make_progress(cx)); + let num_bytes = bufs.iter().map(|b| b.len()).sum(); + Poll::Ready(Ok(num_bytes)) + } +} + impl fmt::Debug for Empty { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Empty { .. }") diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 759589863eb..dc5961086f7 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -219,11 +219,16 @@ impl Registration { loop { let event = self.readiness(interest).await?; + let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await; + match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(event); } - x => return x, + x => { + coop.made_progress(); + return x; + } } } } diff --git a/tokio/tests/coop_budget.rs b/tokio/tests/coop_budget.rs new file mode 100644 index 00000000000..0c4cc7e6497 --- /dev/null +++ b/tokio/tests/coop_budget.rs @@ -0,0 +1,77 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", target_os = "linux"))] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::net::UdpSocket; + +/// Ensure that UDP sockets have functional budgeting +/// +/// # Design +/// Two sockets communicate by spamming packets from one to the other. +/// +/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the +/// send system call because we are using the loopback interface. +/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the +/// entirety of the lifecycle of a packet within the kernel network stack. +/// +/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop +/// is through budgeting. +/// +/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded. +/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128 +/// and there are two budget events per packet, a send and a recv. +#[tokio::test] +async fn coop_budget_udp_send_recv() { + const BUDGET: usize = 128; + const N_ITERATIONS: usize = 1024; + + const PACKET: &[u8] = b"Hello, world"; + const PACKET_LEN: usize = 12; + + assert_eq!( + PACKET_LEN, + PACKET.len(), + "Defect in test, programmer can't do math" + ); + + // bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface + let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + tx.connect(rx.local_addr().unwrap()).await.unwrap(); + rx.connect(tx.local_addr().unwrap()).await.unwrap(); + + let tracker = Arc::new(AtomicUsize::default()); + + let tracker_clone = Arc::clone(&tracker); + + tokio::task::yield_now().await; + + tokio::spawn(async move { + loop { + tracker_clone.fetch_add(1, Ordering::SeqCst); + + tokio::task::yield_now().await; + } + }); + + for _ in 0..N_ITERATIONS { + tx.send(PACKET).await.unwrap(); + + let mut tmp = [0; PACKET_LEN]; + + // ensure that we aren't somehow accumulating other + assert_eq!( + PACKET_LEN, + rx.recv(&mut tmp).await.unwrap(), + "Defect in test case, received unexpected result from socket" + ); + assert_eq!( + PACKET, &tmp, + "Defect in test case, received unexpected result from socket" + ); + } + + assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst)); +} diff --git a/tokio/tests/io_join.rs b/tokio/tests/io_join.rs new file mode 100644 index 00000000000..69b09393311 --- /dev/null +++ b/tokio/tests/io_join.rs @@ -0,0 +1,83 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{join, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Join, ReadBuf}; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct R; + +impl AsyncRead for R { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + buf.put_slice(&[b'z']); + Poll::Ready(Ok(())) + } +} + +struct W; + +impl AsyncWrite for W { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Ready(Ok(1)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Poll::Ready(Ok(2)) + } + + fn is_write_vectored(&self) -> bool { + true + } +} + +#[test] +fn is_send_and_sync() { + fn assert_bound() {} + + assert_bound::>(); +} + +#[test] +fn method_delegation() { + let mut rw = join(R, W); + let mut buf = [0; 1]; + + tokio_test::block_on(async move { + assert_eq!(1, rw.read(&mut buf).await.unwrap()); + assert_eq!(b'z', buf[0]); + + assert_eq!(1, rw.write(&[b'x']).await.unwrap()); + assert_eq!( + 2, + rw.write_vectored(&[io::IoSlice::new(&[b'x'])]) + .await + .unwrap() + ); + assert!(rw.is_write_vectored()); + + assert!(rw.flush().await.is_ok()); + assert!(rw.shutdown().await.is_ok()); + }); +}