From cac8afd6000660a3f2beb63e0b1bd1cc22dbae73 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Sun, 8 Dec 2024 13:39:16 -0800 Subject: [PATCH 1/5] stream: add examples to wrapper types --- tokio-stream/src/wrappers/broadcast.rs | 22 ++++++++++++++++ tokio-stream/src/wrappers/interval.rs | 20 +++++++++++++++ tokio-stream/src/wrappers/lines.rs | 19 ++++++++++++++ tokio-stream/src/wrappers/mpsc_bounded.rs | 22 ++++++++++++++++ tokio-stream/src/wrappers/mpsc_unbounded.rs | 21 ++++++++++++++++ tokio-stream/src/wrappers/read_dir.rs | 18 +++++++++++++ tokio-stream/src/wrappers/tcp_listener.rs | 28 +++++++++++++++++++++ 7 files changed, 150 insertions(+) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index b3900db8ff6..3349adb2414 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -10,6 +10,28 @@ use std::task::{ready, Context, Poll}; /// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::sync::broadcast; +/// use tokio_stream::wrappers::BroadcastStream; +/// use tokio_stream::StreamExt; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let (tx, rx) = broadcast::channel(16); +/// tx.send(10)?; +/// tx.send(20)?; +/// drop(tx); +/// +/// let mut stream = BroadcastStream::new(rx); +/// assert_eq!(stream.next().await, Some(Ok(10))); +/// assert_eq!(stream.next().await, Some(Ok(20))); +/// assert_eq!(stream.next().await, None); +/// Ok(()) +/// } +/// ``` +/// /// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver /// [`Stream`]: trait@futures_core::Stream #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index c7a0b1f1e2a..9244ad5986f 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -5,6 +5,26 @@ use tokio::time::{Instant, Interval}; /// A wrapper around [`Interval`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::time::{Duration, Instant}; +/// use tokio_stream::wrappers::IntervalStream; +/// use tokio_stream::StreamExt; +/// +/// #[tokio::main] +/// async fn main() { +/// let interval = tokio::time::interval(Duration::from_millis(10)); +/// let mut stream = IntervalStream::new(interval); +/// let start = Instant::now(); +/// for _ in 0..3 { +/// if let Some(instant) = stream.next().await { +/// println!("elapsed: {:.1?}", instant.duration_since(start)); +/// } +/// } +/// } +/// ``` +/// /// [`Interval`]: struct@tokio::time::Interval /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs index 4850429a72d..acc7cc923f1 100644 --- a/tokio-stream/src/wrappers/lines.rs +++ b/tokio-stream/src/wrappers/lines.rs @@ -8,6 +8,25 @@ use tokio::io::{AsyncBufRead, Lines}; pin_project! { /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`]. /// + /// # Example + /// + /// ``` + /// use tokio::io::AsyncBufReadExt; + /// use tokio_stream::wrappers::LinesStream; + /// use tokio_stream::StreamExt; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let input = b"Hello\nWorld\n"; + /// let lines = input.lines(); // tokio::io::util::Lines + /// let mut stream = LinesStream::new(lines); + /// while let Some(line) = stream.next().await { + /// println!("{}", line?); + /// } + /// Ok(()) + /// } + /// ``` + /// /// [`tokio::io::Lines`]: struct@tokio::io::Lines /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index 18d799e98b1..1e6f148d7c7 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -5,6 +5,28 @@ use tokio::sync::mpsc::Receiver; /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::sync::mpsc; +/// use tokio_stream::wrappers::ReceiverStream; +/// use tokio_stream::StreamExt; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let (tx, rx) = mpsc::channel(2); +/// tx.send(10).await?; +/// tx.send(20).await?; +/// drop(tx); +/// +/// let mut stream = ReceiverStream::new(rx); +/// assert_eq!(stream.next().await, Some(10)); +/// assert_eq!(stream.next().await, Some(20)); +/// assert_eq!(stream.next().await, None); +/// Ok(()) +/// } +/// ``` +/// /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index 6945b08717c..380b359db38 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -5,6 +5,27 @@ use tokio::sync::mpsc::UnboundedReceiver; /// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::sync::mpsc; +/// use tokio_stream::wrappers::UnboundedReceiverStream; +/// use tokio_stream::StreamExt; +/// +/// #[tokio::main] +/// async fn main() { +/// let (tx, rx) = mpsc::unbounded_channel(); +/// tx.send(10).unwrap(); +/// tx.send(20).unwrap(); +/// drop(tx); +/// +/// let mut stream = UnboundedReceiverStream::new(rx); +/// assert_eq!(stream.next().await, Some(10)); +/// assert_eq!(stream.next().await, Some(20)); +/// assert_eq!(stream.next().await, None); +/// } +/// ``` +/// /// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs index b5cf54f79e1..9e50a2ffc5c 100644 --- a/tokio-stream/src/wrappers/read_dir.rs +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -6,6 +6,24 @@ use tokio::fs::{DirEntry, ReadDir}; /// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`]. /// +/// # Example +/// +/// ``` +/// use tokio::fs::read_dir; +/// use tokio_stream::{wrappers::ReadDirStream, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let dirs = read_dir("examples").await?; +/// let mut dirs = ReadDirStream::new(dirs); +/// while let Some(dir) = dirs.next().await { +/// let dir = dir?; +/// println!("{}", dir.path().display()); +/// } +/// Ok(()) +/// } +/// ``` +/// /// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs index ce7cb163507..d5fbc2a2e36 100644 --- a/tokio-stream/src/wrappers/tcp_listener.rs +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -6,6 +6,34 @@ use tokio::net::{TcpListener, TcpStream}; /// A wrapper around [`TcpListener`] that implements [`Stream`]. /// +/// # Example +/// +/// Accept connections from both IPv4 and IPv6 listeners in the same loop: +/// +/// ```no_run +/// use std::net::{Ipv4Addr, Ipv6Addr}; +/// +/// use tokio::net::TcpListener; +/// use tokio_stream::{wrappers::TcpListenerStream, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?; +/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?; +/// let ipv4_connections = TcpListenerStream::new(ipv4_listener); +/// let ipv6_connections = TcpListenerStream::new(ipv6_listener); +/// let mut connections = ipv4_connections.chain(ipv6_connections); +/// while let Some(tcp_stream) = connections.next().await { +/// let stream = tcp_stream?; +/// println!( +/// "accepted connection; peer address = {:?}", +/// stream.peer_addr()? +/// ); +/// } +/// Ok(()) +/// } +/// ``` +/// /// [`TcpListener`]: struct@tokio::net::TcpListener /// [`Stream`]: trait@crate::Stream #[derive(Debug)] From 3e51766d6dcb60a1f7a6448ce5267066dd7e5783 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Mon, 9 Dec 2024 16:04:23 -0800 Subject: [PATCH 2/5] Tweak result types and hide main function --- tokio-stream/src/wrappers/broadcast.rs | 25 +++++++++--------- tokio-stream/src/wrappers/interval.rs | 20 +++++++------- tokio-stream/src/wrappers/lines.rs | 17 ++++++------ tokio-stream/src/wrappers/mpsc_bounded.rs | 25 +++++++++--------- tokio-stream/src/wrappers/mpsc_unbounded.rs | 24 +++++++++-------- tokio-stream/src/wrappers/read_dir.rs | 18 ++++++------- tokio-stream/src/wrappers/tcp_listener.rs | 29 ++++++++++----------- 7 files changed, 80 insertions(+), 78 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 3349adb2414..11a64bac0f0 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -17,19 +17,20 @@ use std::task::{ready, Context, Poll}; /// use tokio_stream::wrappers::BroadcastStream; /// use tokio_stream::StreamExt; /// -/// #[tokio::main] -/// async fn main() -> Result<(), Box> { -/// let (tx, rx) = broadcast::channel(16); -/// tx.send(10)?; -/// tx.send(20)?; -/// drop(tx); +/// # #[tokio::main] +/// # async fn main() -> Result<(), tokio::sync::broadcast::error::SendError> { +/// let (tx, rx) = broadcast::channel(16); +/// tx.send(10)?; +/// tx.send(20)?; +/// # // prevent the doc test from hanging +/// drop(tx); /// -/// let mut stream = BroadcastStream::new(rx); -/// assert_eq!(stream.next().await, Some(Ok(10))); -/// assert_eq!(stream.next().await, Some(Ok(20))); -/// assert_eq!(stream.next().await, None); -/// Ok(()) -/// } +/// let mut stream = BroadcastStream::new(rx); +/// assert_eq!(stream.next().await, Some(Ok(10))); +/// assert_eq!(stream.next().await, Some(Ok(20))); +/// assert_eq!(stream.next().await, None); +/// # Ok(()) +/// # } /// ``` /// /// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index 9244ad5986f..9deeace2f73 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -8,21 +8,21 @@ use tokio::time::{Instant, Interval}; /// # Example /// /// ``` -/// use tokio::time::{Duration, Instant}; +/// use tokio::time::{Duration, Instant, interval}; /// use tokio_stream::wrappers::IntervalStream; /// use tokio_stream::StreamExt; /// -/// #[tokio::main] -/// async fn main() { -/// let interval = tokio::time::interval(Duration::from_millis(10)); -/// let mut stream = IntervalStream::new(interval); -/// let start = Instant::now(); -/// for _ in 0..3 { -/// if let Some(instant) = stream.next().await { -/// println!("elapsed: {:.1?}", instant.duration_since(start)); -/// } +/// # #[tokio::main] +/// # async fn main() { +/// let start = Instant::now(); +/// let interval = interval(Duration::from_millis(10)); +/// let mut stream = IntervalStream::new(interval); +/// for _ in 0..3 { +/// if let Some(instant) = stream.next().await { +/// println!("elapsed: {:.1?}", instant.duration_since(start)); /// } /// } +/// # } /// ``` /// /// [`Interval`]: struct@tokio::time::Interval diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs index acc7cc923f1..254dc4c31fd 100644 --- a/tokio-stream/src/wrappers/lines.rs +++ b/tokio-stream/src/wrappers/lines.rs @@ -15,16 +15,15 @@ pin_project! { /// use tokio_stream::wrappers::LinesStream; /// use tokio_stream::StreamExt; /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// let input = b"Hello\nWorld\n"; - /// let lines = input.lines(); // tokio::io::util::Lines - /// let mut stream = LinesStream::new(lines); - /// while let Some(line) = stream.next().await { - /// println!("{}", line?); - /// } - /// Ok(()) + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// let input = b"Hello\nWorld\n"; + /// let mut stream = LinesStream::new(input.lines()); + /// while let Some(line) = stream.next().await { + /// println!("{}", line?); /// } + /// # Ok(()) + /// # } /// ``` /// /// [`tokio::io::Lines`]: struct@tokio::io::Lines diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index 1e6f148d7c7..df8134f3606 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -12,19 +12,20 @@ use tokio::sync::mpsc::Receiver; /// use tokio_stream::wrappers::ReceiverStream; /// use tokio_stream::StreamExt; /// -/// #[tokio::main] -/// async fn main() -> Result<(), Box> { -/// let (tx, rx) = mpsc::channel(2); -/// tx.send(10).await?; -/// tx.send(20).await?; -/// drop(tx); +/// # #[tokio::main] +/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError> { +/// let (tx, rx) = mpsc::channel(2); +/// tx.send(10).await?; +/// tx.send(20).await?; +/// # // prevent the doc test from hanging +/// drop(tx); /// -/// let mut stream = ReceiverStream::new(rx); -/// assert_eq!(stream.next().await, Some(10)); -/// assert_eq!(stream.next().await, Some(20)); -/// assert_eq!(stream.next().await, None); -/// Ok(()) -/// } +/// let mut stream = ReceiverStream::new(rx); +/// assert_eq!(stream.next().await, Some(10)); +/// assert_eq!(stream.next().await, Some(20)); +/// assert_eq!(stream.next().await, None); +/// # Ok(()) +/// # } /// ``` /// /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index 380b359db38..5c36687c705 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -12,18 +12,20 @@ use tokio::sync::mpsc::UnboundedReceiver; /// use tokio_stream::wrappers::UnboundedReceiverStream; /// use tokio_stream::StreamExt; /// -/// #[tokio::main] -/// async fn main() { -/// let (tx, rx) = mpsc::unbounded_channel(); -/// tx.send(10).unwrap(); -/// tx.send(20).unwrap(); -/// drop(tx); +/// # #[tokio::main] +/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError> { +/// let (tx, rx) = mpsc::unbounded_channel(); +/// tx.send(10)?; +/// tx.send(20)?; +/// # // prevent the doc test from hanging +/// drop(tx); /// -/// let mut stream = UnboundedReceiverStream::new(rx); -/// assert_eq!(stream.next().await, Some(10)); -/// assert_eq!(stream.next().await, Some(20)); -/// assert_eq!(stream.next().await, None); -/// } +/// let mut stream = UnboundedReceiverStream::new(rx); +/// assert_eq!(stream.next().await, Some(10)); +/// assert_eq!(stream.next().await, Some(20)); +/// assert_eq!(stream.next().await, None); +/// # Ok(()) +/// # } /// ``` /// /// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs index 9e50a2ffc5c..23c6905925d 100644 --- a/tokio-stream/src/wrappers/read_dir.rs +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -12,16 +12,16 @@ use tokio::fs::{DirEntry, ReadDir}; /// use tokio::fs::read_dir; /// use tokio_stream::{wrappers::ReadDirStream, StreamExt}; /// -/// #[tokio::main] -/// async fn main() -> Result<(), Box> { -/// let dirs = read_dir("examples").await?; -/// let mut dirs = ReadDirStream::new(dirs); -/// while let Some(dir) = dirs.next().await { -/// let dir = dir?; -/// println!("{}", dir.path().display()); -/// } -/// Ok(()) +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// let dirs = read_dir(".").await?; +/// let mut dirs = ReadDirStream::new(dirs); +/// while let Some(dir) = dirs.next().await { +/// let dir = dir?; +/// println!("{}", dir.path().display()); /// } +/// # Ok(()) +/// # } /// ``` /// /// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs index d5fbc2a2e36..6bd79659077 100644 --- a/tokio-stream/src/wrappers/tcp_listener.rs +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -16,22 +16,21 @@ use tokio::net::{TcpListener, TcpStream}; /// use tokio::net::TcpListener; /// use tokio_stream::{wrappers::TcpListenerStream, StreamExt}; /// -/// #[tokio::main] -/// async fn main() -> Result<(), Box> { -/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?; -/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?; -/// let ipv4_connections = TcpListenerStream::new(ipv4_listener); -/// let ipv6_connections = TcpListenerStream::new(ipv6_listener); -/// let mut connections = ipv4_connections.chain(ipv6_connections); -/// while let Some(tcp_stream) = connections.next().await { -/// let stream = tcp_stream?; -/// println!( -/// "accepted connection; peer address = {:?}", -/// stream.peer_addr()? -/// ); -/// } -/// Ok(()) +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?; +/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?; +/// let ipv4_connections = TcpListenerStream::new(ipv4_listener); +/// let ipv6_connections = TcpListenerStream::new(ipv6_listener); +/// +/// let mut connections = ipv4_connections.chain(ipv6_connections); +/// while let Some(tcp_stream) = connections.next().await { +/// let stream = tcp_stream?; +/// let peer_addr = stream.peer_addr()?; +/// println!("accepted connection; peer address = {peer_addr}"); /// } +/// # Ok(()) +/// # } /// ``` /// /// [`TcpListener`]: struct@tokio::net::TcpListener From 1ae3206fe9f48ccce7aba6b707189fd4f40756a7 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Mon, 9 Dec 2024 16:47:42 -0800 Subject: [PATCH 3/5] Add examples for signals and fix formatting --- tokio-stream/src/wrappers/read_dir.rs | 2 +- tokio-stream/src/wrappers/signal_unix.rs | 16 ++++++++++ tokio-stream/src/wrappers/signal_windows.rs | 34 +++++++++++++++++++++ tokio-stream/src/wrappers/split.rs | 18 +++++++++++ tokio-stream/src/wrappers/tcp_listener.rs | 2 +- tokio-stream/src/wrappers/unix_listener.rs | 19 ++++++++++++ 6 files changed, 89 insertions(+), 2 deletions(-) diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs index 23c6905925d..b21dee0c957 100644 --- a/tokio-stream/src/wrappers/read_dir.rs +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -10,7 +10,7 @@ use tokio::fs::{DirEntry, ReadDir}; /// /// ``` /// use tokio::fs::read_dir; -/// use tokio_stream::{wrappers::ReadDirStream, StreamExt}; +/// use tokio_stream::{StreamExt, wrappers::ReadDirStream}; /// /// # #[tokio::main] /// # async fn main() -> std::io::Result<()> { diff --git a/tokio-stream/src/wrappers/signal_unix.rs b/tokio-stream/src/wrappers/signal_unix.rs index 6dcdff7fc55..64de12283b8 100644 --- a/tokio-stream/src/wrappers/signal_unix.rs +++ b/tokio-stream/src/wrappers/signal_unix.rs @@ -5,6 +5,22 @@ use tokio::signal::unix::Signal; /// A wrapper around [`Signal`] that implements [`Stream`]. /// +/// # Example +/// +/// ```no_run +/// use tokio::signal::unix::{signal, SignalKind}; +/// use tokio_stream::{StreamExt, wrappers::SignalStream}; +/// +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// let signals = signal(SignalKind::hangup())?; +/// let mut stream = SignalStream::new(signals); +/// while stream.next().await.is_some() { +/// println!("hangup signal received"); +/// } +/// # Ok(()) +/// # } +/// ``` /// [`Signal`]: struct@tokio::signal::unix::Signal /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/signal_windows.rs b/tokio-stream/src/wrappers/signal_windows.rs index 4631fbad8dc..b041684ecf6 100644 --- a/tokio-stream/src/wrappers/signal_windows.rs +++ b/tokio-stream/src/wrappers/signal_windows.rs @@ -7,6 +7,23 @@ use tokio::signal::windows::{CtrlBreak, CtrlC}; /// /// [`CtrlC`]: struct@tokio::signal::windows::CtrlC /// [`Stream`]: trait@crate::Stream +/// +/// # Example +/// +/// ```no_run +/// use tokio::signal::windows::ctrl_c; +/// use tokio_stream::{StreamExt, wrappers::CtrlCStream}; +/// +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// let signals = ctrl_c()?; +/// let mut stream = CtrlCStream::new(signals); +/// while stream.next().await.is_some() { +/// println!("ctrl-c received"); +/// } +/// # Ok(()) +/// # } +/// ``` #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] pub struct CtrlCStream { @@ -47,6 +64,23 @@ impl AsMut for CtrlCStream { /// A wrapper around [`CtrlBreak`] that implements [`Stream`]. /// +/// # Example +/// +/// ```no_run +/// use tokio::signal::windows::ctrl_break; +/// use tokio_stream::{StreamExt, wrappers::CtrlBreakStream}; +/// +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// let signals = ctrl_break()?; +/// let mut stream = CtrlBreakStream::new(signals); +/// while stream.next().await.is_some() { +/// println!("ctrl-break received"); +/// } +/// # Ok(()) +/// # } +/// ``` +/// /// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs index ac46a8ba6ff..b3e87204d2d 100644 --- a/tokio-stream/src/wrappers/split.rs +++ b/tokio-stream/src/wrappers/split.rs @@ -8,6 +8,24 @@ use tokio::io::{AsyncBufRead, Split}; pin_project! { /// A wrapper around [`tokio::io::Split`] that implements [`Stream`]. /// + /// # Example + /// + /// ``` + /// use tokio::io::AsyncBufReadExt; + /// use tokio_stream::{StreamExt, wrappers::SplitStream}; + /// + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// let input = "Hello\nWorld\n".as_bytes(); + /// let lines = AsyncBufReadExt::split(input, b'\n'); + /// + /// let mut stream = SplitStream::new(lines); + /// while let Some(line) = stream.next().await { + /// println!("length = {}", line?.len()) + /// } + /// # Ok(()) + /// # } + /// ``` /// [`tokio::io::Split`]: struct@tokio::io::Split /// [`Stream`]: trait@crate::Stream #[derive(Debug)] diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs index 6bd79659077..43365b03375 100644 --- a/tokio-stream/src/wrappers/tcp_listener.rs +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -14,7 +14,7 @@ use tokio::net::{TcpListener, TcpStream}; /// use std::net::{Ipv4Addr, Ipv6Addr}; /// /// use tokio::net::TcpListener; -/// use tokio_stream::{wrappers::TcpListenerStream, StreamExt}; +/// use tokio_stream::{StreamExt, wrappers::TcpListenerStream}; /// /// # #[tokio::main] /// # async fn main() -> std::io::Result<()> { diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs index 0beba588c20..98e6761e0de 100644 --- a/tokio-stream/src/wrappers/unix_listener.rs +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -6,6 +6,25 @@ use tokio::net::{UnixListener, UnixStream}; /// A wrapper around [`UnixListener`] that implements [`Stream`]. /// +/// # Example +/// +/// ```no_run +/// use tokio::net::UnixListener; +/// use tokio_stream::{StreamExt, wrappers::UnixListenerStream}; +/// +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// let listener = UnixListener::bind("/tmp/sock")?; +/// let mut incoming = UnixListenerStream::new(listener); +/// +/// while let Some(stream) = incoming.next().await { +/// let stream = stream?; +/// let peer_addr = stream.peer_addr()?; +/// println!("Accepted connection from: {peer_addr:?}"); +/// } +/// # Ok(()) +/// # } +/// ``` /// [`UnixListener`]: struct@tokio::net::UnixListener /// [`Stream`]: trait@crate::Stream #[derive(Debug)] From ae6af875dea2ed702b468429961df47ebfa9696b Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Tue, 10 Dec 2024 02:02:58 -0800 Subject: [PATCH 4/5] use current thread runtime --- tokio-stream/src/wrappers/broadcast.rs | 2 +- tokio-stream/src/wrappers/interval.rs | 2 +- tokio-stream/src/wrappers/lines.rs | 2 +- tokio-stream/src/wrappers/mpsc_bounded.rs | 2 +- tokio-stream/src/wrappers/mpsc_unbounded.rs | 2 +- tokio-stream/src/wrappers/read_dir.rs | 2 +- tokio-stream/src/wrappers/signal_unix.rs | 2 +- tokio-stream/src/wrappers/signal_windows.rs | 2 +- tokio-stream/src/wrappers/split.rs | 2 +- tokio-stream/src/wrappers/tcp_listener.rs | 2 +- tokio-stream/src/wrappers/unix_listener.rs | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 11a64bac0f0..3474cff7718 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -17,7 +17,7 @@ use std::task::{ready, Context, Poll}; /// use tokio_stream::wrappers::BroadcastStream; /// use tokio_stream::StreamExt; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> Result<(), tokio::sync::broadcast::error::SendError> { /// let (tx, rx) = broadcast::channel(16); /// tx.send(10)?; diff --git a/tokio-stream/src/wrappers/interval.rs b/tokio-stream/src/wrappers/interval.rs index 9deeace2f73..faac5e78a3e 100644 --- a/tokio-stream/src/wrappers/interval.rs +++ b/tokio-stream/src/wrappers/interval.rs @@ -12,7 +12,7 @@ use tokio::time::{Instant, Interval}; /// use tokio_stream::wrappers::IntervalStream; /// use tokio_stream::StreamExt; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() { /// let start = Instant::now(); /// let interval = interval(Duration::from_millis(10)); diff --git a/tokio-stream/src/wrappers/lines.rs b/tokio-stream/src/wrappers/lines.rs index 254dc4c31fd..57b41fbc736 100644 --- a/tokio-stream/src/wrappers/lines.rs +++ b/tokio-stream/src/wrappers/lines.rs @@ -15,7 +15,7 @@ pin_project! { /// use tokio_stream::wrappers::LinesStream; /// use tokio_stream::StreamExt; /// - /// # #[tokio::main] + /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let input = b"Hello\nWorld\n"; /// let mut stream = LinesStream::new(input.lines()); diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index df8134f3606..34b2e020d75 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -12,7 +12,7 @@ use tokio::sync::mpsc::Receiver; /// use tokio_stream::wrappers::ReceiverStream; /// use tokio_stream::StreamExt; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError> { /// let (tx, rx) = mpsc::channel(2); /// tx.send(10).await?; diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index 5c36687c705..904c98bef95 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -12,7 +12,7 @@ use tokio::sync::mpsc::UnboundedReceiver; /// use tokio_stream::wrappers::UnboundedReceiverStream; /// use tokio_stream::StreamExt; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError> { /// let (tx, rx) = mpsc::unbounded_channel(); /// tx.send(10)?; diff --git a/tokio-stream/src/wrappers/read_dir.rs b/tokio-stream/src/wrappers/read_dir.rs index b21dee0c957..21522c03d2d 100644 --- a/tokio-stream/src/wrappers/read_dir.rs +++ b/tokio-stream/src/wrappers/read_dir.rs @@ -12,7 +12,7 @@ use tokio::fs::{DirEntry, ReadDir}; /// use tokio::fs::read_dir; /// use tokio_stream::{StreamExt, wrappers::ReadDirStream}; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let dirs = read_dir(".").await?; /// let mut dirs = ReadDirStream::new(dirs); diff --git a/tokio-stream/src/wrappers/signal_unix.rs b/tokio-stream/src/wrappers/signal_unix.rs index 64de12283b8..ac873f92b3f 100644 --- a/tokio-stream/src/wrappers/signal_unix.rs +++ b/tokio-stream/src/wrappers/signal_unix.rs @@ -11,7 +11,7 @@ use tokio::signal::unix::Signal; /// use tokio::signal::unix::{signal, SignalKind}; /// use tokio_stream::{StreamExt, wrappers::SignalStream}; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let signals = signal(SignalKind::hangup())?; /// let mut stream = SignalStream::new(signals); diff --git a/tokio-stream/src/wrappers/signal_windows.rs b/tokio-stream/src/wrappers/signal_windows.rs index b041684ecf6..49f28dc8cd4 100644 --- a/tokio-stream/src/wrappers/signal_windows.rs +++ b/tokio-stream/src/wrappers/signal_windows.rs @@ -14,7 +14,7 @@ use tokio::signal::windows::{CtrlBreak, CtrlC}; /// use tokio::signal::windows::ctrl_c; /// use tokio_stream::{StreamExt, wrappers::CtrlCStream}; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let signals = ctrl_c()?; /// let mut stream = CtrlCStream::new(signals); diff --git a/tokio-stream/src/wrappers/split.rs b/tokio-stream/src/wrappers/split.rs index b3e87204d2d..5d6d77b6787 100644 --- a/tokio-stream/src/wrappers/split.rs +++ b/tokio-stream/src/wrappers/split.rs @@ -14,7 +14,7 @@ pin_project! { /// use tokio::io::AsyncBufReadExt; /// use tokio_stream::{StreamExt, wrappers::SplitStream}; /// - /// # #[tokio::main] + /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let input = "Hello\nWorld\n".as_bytes(); /// let lines = AsyncBufReadExt::split(input, b'\n'); diff --git a/tokio-stream/src/wrappers/tcp_listener.rs b/tokio-stream/src/wrappers/tcp_listener.rs index 43365b03375..c463ef1426c 100644 --- a/tokio-stream/src/wrappers/tcp_listener.rs +++ b/tokio-stream/src/wrappers/tcp_listener.rs @@ -16,7 +16,7 @@ use tokio::net::{TcpListener, TcpStream}; /// use tokio::net::TcpListener; /// use tokio_stream::{StreamExt, wrappers::TcpListenerStream}; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?; /// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?; diff --git a/tokio-stream/src/wrappers/unix_listener.rs b/tokio-stream/src/wrappers/unix_listener.rs index 98e6761e0de..6c4cd43eb12 100644 --- a/tokio-stream/src/wrappers/unix_listener.rs +++ b/tokio-stream/src/wrappers/unix_listener.rs @@ -12,7 +12,7 @@ use tokio::net::{UnixListener, UnixStream}; /// use tokio::net::UnixListener; /// use tokio_stream::{StreamExt, wrappers::UnixListenerStream}; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let listener = UnixListener::bind("/tmp/sock")?; /// let mut incoming = UnixListenerStream::new(listener); From 88647dfe744548a7efd258fbf3157e94f6929a85 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Tue, 10 Dec 2024 02:50:13 -0800 Subject: [PATCH 5/5] signal windows example uses current thread runtime --- tokio-stream/src/wrappers/signal_windows.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/src/wrappers/signal_windows.rs b/tokio-stream/src/wrappers/signal_windows.rs index 49f28dc8cd4..168ed181d34 100644 --- a/tokio-stream/src/wrappers/signal_windows.rs +++ b/tokio-stream/src/wrappers/signal_windows.rs @@ -70,7 +70,7 @@ impl AsMut for CtrlCStream { /// use tokio::signal::windows::ctrl_break; /// use tokio_stream::{StreamExt, wrappers::CtrlBreakStream}; /// -/// # #[tokio::main] +/// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> std::io::Result<()> { /// let signals = ctrl_break()?; /// let mut stream = CtrlBreakStream::new(signals);