Skip to content

Commit 423ecc1

Browse files
authored
io: add copy_buf (#2884)
1 parent fb28caa commit 423ecc1

File tree

3 files changed

+106
-2
lines changed

3 files changed

+106
-2
lines changed

tokio/src/io/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,9 @@ cfg_io_util! {
232232
pub use split::{split, ReadHalf, WriteHalf};
233233

234234
pub(crate) mod seek;
235-
236235
pub(crate) mod util;
237236
pub use util::{
238-
copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
237+
copy, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
239238
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
240239
};
241240
}

tokio/src/io/util/copy_buf.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use crate::io::{AsyncBufRead, AsyncWrite};
2+
use std::future::Future;
3+
use std::io;
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
7+
cfg_io_util! {
8+
/// A future that asynchronously copies the entire contents of a reader into a
9+
/// writer.
10+
///
11+
/// This struct is generally created by calling [`copy_buf`][copy_buf]. Please
12+
/// see the documentation of `copy_buf()` for more details.
13+
///
14+
/// [copy_buf]: copy_buf()
15+
#[derive(Debug)]
16+
#[must_use = "futures do nothing unless you `.await` or poll them"]
17+
struct CopyBuf<'a, R: ?Sized, W: ?Sized> {
18+
reader: &'a mut R,
19+
writer: &'a mut W,
20+
amt: u64,
21+
}
22+
23+
/// Asynchronously copies the entire contents of a reader into a writer.
24+
///
25+
/// This function returns a future that will continuously read data from
26+
/// `reader` and then write it into `writer` in a streaming fashion until
27+
/// `reader` returns EOF.
28+
///
29+
/// On success, the total number of bytes that were copied from `reader` to
30+
/// `writer` is returned.
31+
///
32+
///
33+
/// # Errors
34+
///
35+
/// The returned future will finish with an error will return an error
36+
/// immediately if any call to `poll_fill_buf` or `poll_write` returns an
37+
/// error.
38+
///
39+
/// # Examples
40+
///
41+
/// ```
42+
/// use tokio::io;
43+
///
44+
/// # async fn dox() -> std::io::Result<()> {
45+
/// let mut reader: &[u8] = b"hello";
46+
/// let mut writer: Vec<u8> = vec![];
47+
///
48+
/// io::copy_buf(&mut reader, &mut writer).await?;
49+
///
50+
/// assert_eq!(b"hello", &writer[..]);
51+
/// # Ok(())
52+
/// # }
53+
/// ```
54+
pub async fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
55+
where
56+
R: AsyncBufRead + Unpin + ?Sized,
57+
W: AsyncWrite + Unpin + ?Sized,
58+
{
59+
CopyBuf {
60+
reader,
61+
writer,
62+
amt: 0,
63+
}.await
64+
}
65+
}
66+
67+
impl<R, W> Future for CopyBuf<'_, R, W>
68+
where
69+
R: AsyncBufRead + Unpin + ?Sized,
70+
W: AsyncWrite + Unpin + ?Sized,
71+
{
72+
type Output = io::Result<u64>;
73+
74+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75+
loop {
76+
let me = &mut *self;
77+
let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?;
78+
if buffer.is_empty() {
79+
ready!(Pin::new(&mut self.writer).poll_flush(cx))?;
80+
return Poll::Ready(Ok(self.amt));
81+
}
82+
83+
let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?;
84+
if i == 0 {
85+
return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
86+
}
87+
self.amt += i as u64;
88+
Pin::new(&mut *self.reader).consume(i);
89+
}
90+
}
91+
}
92+
93+
#[cfg(test)]
94+
mod tests {
95+
use super::*;
96+
97+
#[test]
98+
fn assert_unpin() {
99+
use std::marker::PhantomPinned;
100+
crate::is_unpin::<CopyBuf<'_, PhantomPinned, PhantomPinned>>();
101+
}
102+
}

tokio/src/io/util/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ cfg_io_util! {
2727
mod copy;
2828
pub use copy::copy;
2929

30+
mod copy_buf;
31+
pub use copy_buf::copy_buf;
32+
3033
mod empty;
3134
pub use empty::{empty, Empty};
3235

0 commit comments

Comments
 (0)