Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for zerocopy write to the TCP socket #237

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ impl TcpStream {
pub async fn write<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
self.inner.write(buf).await
}
/// Write some data to the stream from the buffer. Will attempt to do so without intermediate copies.
///
/// On success, returns the number of bytes written.
///
/// See the linux [kernel docs](https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html)
/// for a discussion on when this might be appropriate. In particular:
///
/// > Copy avoidance is not a free lunch. As implemented, with page pinning,
/// > it replaces per byte copy cost with page accounting and completion
/// > notification overhead. As a result, zero copy is generally only effective
/// > at writes over around 10 KB.
///
/// Note: Using fixed buffers [#54](https://github.com/tokio-rs/tokio-uring/pull/54), avoids the page-pinning overhead
pub async fn write_zc<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of calling this write_zc? Where does that come from? Does the std or tokio libraries have a name like that for a TCP connection? If not, at first glance it seems better to stick with the names of either the io_uring interface or maybe the io-uring crate API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_zc is zero copy version of existing write, similar to how UdpSocket::send_zc is zero copy version of UdpSocket::send.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tokio doesn't have this, but Linux sockets do, and it's a very useful optimisation. Offloading data copy from your CPU to your NIC can vastly reduce CPU overhead

self.inner.send_zc(buf).await
}

/// Attempts to write an entire buffer to the stream.
///
Expand All @@ -118,39 +134,61 @@ impl TcpStream {
///
/// # Examples
///
/// ```no_run
/// use std::net::SocketAddr;
/// use tokio_uring::net::TcpListener;
/// ```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll let you figure out why your test and the CI test are different. But this seems to hijack a documentation example for write_all with one for write_zc. That doesn't seem to fly.

/// use std::net::{Shutdown, SocketAddr};
/// use tokio_uring::net::{TcpListener, TcpStream};
/// use tokio_uring::buf::BoundedBuf;
/// use tokio::task::JoinSet;
///
/// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); ///
///
/// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
///
/// tokio_uring::start(async {
/// tokio_uring::start(async move {
/// let listener = TcpListener::bind(addr).unwrap();
///
/// println!("Listening on {}", listener.local_addr().unwrap());
/// let addr = listener.local_addr().unwrap();
/// println!("Listening on {}", addr);
///
/// loop {
/// let local = tokio::task::LocalSet::new();
///
/// let srv = tokio_uring::spawn(async move {
/// let (stream, _) = listener.accept().await.unwrap();
/// tokio_uring::spawn(async move {
/// let mut n = 0;
/// let mut buf = vec![0u8; 4096];
/// loop {
/// let (result, nbuf) = stream.read(buf).await;
/// buf = nbuf;
/// let read = result.unwrap();
/// if read == 0 {
/// break;
/// return;
/// }
///
/// let (res, slice) = stream.write_all(buf.slice(..read)).await;
/// let _ = res.unwrap();
/// buf = slice.into_inner();
/// n += read;
/// }
/// });
/// }
/// });
///
/// // Connect to a peer
/// let mut stream = TcpStream::connect(addr).await.unwrap();
///
/// // Write some data.
/// let (result, _) = stream.write_zc(b"hello world!".as_slice()).await;
/// let w_bytes = result.unwrap();
///
/// let mut buf = vec![0u8; 4096];
/// // Read back data.
/// let (result, buf) = stream.read(buf).await;
/// let r_bytes = result.unwrap();
/// assert_eq!(w_bytes, r_bytes);
///
/// assert_eq!(b"hello world!", &buf[..r_bytes]);
/// stream.shutdown(Shutdown::Both);
///
/// srv.await.unwrap();
/// });
///
/// ```
///
/// [`write`]: Self::write
Expand Down