Skip to content

Commit cac8afd

Browse files
committed
stream: add examples to wrapper types
1 parent eb72ddd commit cac8afd

File tree

7 files changed

+150
-0
lines changed

7 files changed

+150
-0
lines changed

tokio-stream/src/wrappers/broadcast.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,28 @@ use std::task::{ready, Context, Poll};
1010

1111
/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
1212
///
13+
/// # Example
14+
///
15+
/// ```
16+
/// use tokio::sync::broadcast;
17+
/// use tokio_stream::wrappers::BroadcastStream;
18+
/// use tokio_stream::StreamExt;
19+
///
20+
/// #[tokio::main]
21+
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
22+
/// let (tx, rx) = broadcast::channel(16);
23+
/// tx.send(10)?;
24+
/// tx.send(20)?;
25+
/// drop(tx);
26+
///
27+
/// let mut stream = BroadcastStream::new(rx);
28+
/// assert_eq!(stream.next().await, Some(Ok(10)));
29+
/// assert_eq!(stream.next().await, Some(Ok(20)));
30+
/// assert_eq!(stream.next().await, None);
31+
/// Ok(())
32+
/// }
33+
/// ```
34+
///
1335
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
1436
/// [`Stream`]: trait@futures_core::Stream
1537
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]

tokio-stream/src/wrappers/interval.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,26 @@ use tokio::time::{Instant, Interval};
55

66
/// A wrapper around [`Interval`] that implements [`Stream`].
77
///
8+
/// # Example
9+
///
10+
/// ```
11+
/// use tokio::time::{Duration, Instant};
12+
/// use tokio_stream::wrappers::IntervalStream;
13+
/// use tokio_stream::StreamExt;
14+
///
15+
/// #[tokio::main]
16+
/// async fn main() {
17+
/// let interval = tokio::time::interval(Duration::from_millis(10));
18+
/// let mut stream = IntervalStream::new(interval);
19+
/// let start = Instant::now();
20+
/// for _ in 0..3 {
21+
/// if let Some(instant) = stream.next().await {
22+
/// println!("elapsed: {:.1?}", instant.duration_since(start));
23+
/// }
24+
/// }
25+
/// }
26+
/// ```
27+
///
828
/// [`Interval`]: struct@tokio::time::Interval
929
/// [`Stream`]: trait@crate::Stream
1030
#[derive(Debug)]

tokio-stream/src/wrappers/lines.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,25 @@ use tokio::io::{AsyncBufRead, Lines};
88
pin_project! {
99
/// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
1010
///
11+
/// # Example
12+
///
13+
/// ```
14+
/// use tokio::io::AsyncBufReadExt;
15+
/// use tokio_stream::wrappers::LinesStream;
16+
/// use tokio_stream::StreamExt;
17+
///
18+
/// #[tokio::main]
19+
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
20+
/// let input = b"Hello\nWorld\n";
21+
/// let lines = input.lines(); // tokio::io::util::Lines
22+
/// let mut stream = LinesStream::new(lines);
23+
/// while let Some(line) = stream.next().await {
24+
/// println!("{}", line?);
25+
/// }
26+
/// Ok(())
27+
/// }
28+
/// ```
29+
///
1130
/// [`tokio::io::Lines`]: struct@tokio::io::Lines
1231
/// [`Stream`]: trait@crate::Stream
1332
#[derive(Debug)]

tokio-stream/src/wrappers/mpsc_bounded.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,28 @@ use tokio::sync::mpsc::Receiver;
55

66
/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
77
///
8+
/// # Example
9+
///
10+
/// ```
11+
/// use tokio::sync::mpsc;
12+
/// use tokio_stream::wrappers::ReceiverStream;
13+
/// use tokio_stream::StreamExt;
14+
///
15+
/// #[tokio::main]
16+
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
17+
/// let (tx, rx) = mpsc::channel(2);
18+
/// tx.send(10).await?;
19+
/// tx.send(20).await?;
20+
/// drop(tx);
21+
///
22+
/// let mut stream = ReceiverStream::new(rx);
23+
/// assert_eq!(stream.next().await, Some(10));
24+
/// assert_eq!(stream.next().await, Some(20));
25+
/// assert_eq!(stream.next().await, None);
26+
/// Ok(())
27+
/// }
28+
/// ```
29+
///
830
/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
931
/// [`Stream`]: trait@crate::Stream
1032
#[derive(Debug)]

tokio-stream/src/wrappers/mpsc_unbounded.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,27 @@ use tokio::sync::mpsc::UnboundedReceiver;
55

66
/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
77
///
8+
/// # Example
9+
///
10+
/// ```
11+
/// use tokio::sync::mpsc;
12+
/// use tokio_stream::wrappers::UnboundedReceiverStream;
13+
/// use tokio_stream::StreamExt;
14+
///
15+
/// #[tokio::main]
16+
/// async fn main() {
17+
/// let (tx, rx) = mpsc::unbounded_channel();
18+
/// tx.send(10).unwrap();
19+
/// tx.send(20).unwrap();
20+
/// drop(tx);
21+
///
22+
/// let mut stream = UnboundedReceiverStream::new(rx);
23+
/// assert_eq!(stream.next().await, Some(10));
24+
/// assert_eq!(stream.next().await, Some(20));
25+
/// assert_eq!(stream.next().await, None);
26+
/// }
27+
/// ```
28+
///
829
/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
930
/// [`Stream`]: trait@crate::Stream
1031
#[derive(Debug)]

tokio-stream/src/wrappers/read_dir.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,24 @@ use tokio::fs::{DirEntry, ReadDir};
66

77
/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
88
///
9+
/// # Example
10+
///
11+
/// ```
12+
/// use tokio::fs::read_dir;
13+
/// use tokio_stream::{wrappers::ReadDirStream, StreamExt};
14+
///
15+
/// #[tokio::main]
16+
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
17+
/// let dirs = read_dir("examples").await?;
18+
/// let mut dirs = ReadDirStream::new(dirs);
19+
/// while let Some(dir) = dirs.next().await {
20+
/// let dir = dir?;
21+
/// println!("{}", dir.path().display());
22+
/// }
23+
/// Ok(())
24+
/// }
25+
/// ```
26+
///
927
/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
1028
/// [`Stream`]: trait@crate::Stream
1129
#[derive(Debug)]

tokio-stream/src/wrappers/tcp_listener.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,34 @@ use tokio::net::{TcpListener, TcpStream};
66

77
/// A wrapper around [`TcpListener`] that implements [`Stream`].
88
///
9+
/// # Example
10+
///
11+
/// Accept connections from both IPv4 and IPv6 listeners in the same loop:
12+
///
13+
/// ```no_run
14+
/// use std::net::{Ipv4Addr, Ipv6Addr};
15+
///
16+
/// use tokio::net::TcpListener;
17+
/// use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
18+
///
19+
/// #[tokio::main]
20+
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
21+
/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
22+
/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
23+
/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
24+
/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
25+
/// let mut connections = ipv4_connections.chain(ipv6_connections);
26+
/// while let Some(tcp_stream) = connections.next().await {
27+
/// let stream = tcp_stream?;
28+
/// println!(
29+
/// "accepted connection; peer address = {:?}",
30+
/// stream.peer_addr()?
31+
/// );
32+
/// }
33+
/// Ok(())
34+
/// }
35+
/// ```
36+
///
937
/// [`TcpListener`]: struct@tokio::net::TcpListener
1038
/// [`Stream`]: trait@crate::Stream
1139
#[derive(Debug)]

0 commit comments

Comments
 (0)