Skip to content

Commit 3e51766

Browse files
committed
Tweak result types and hide main function
1 parent cac8afd commit 3e51766

File tree

7 files changed

+80
-78
lines changed

7 files changed

+80
-78
lines changed

tokio-stream/src/wrappers/broadcast.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,20 @@ use std::task::{ready, Context, Poll};
1717
/// use tokio_stream::wrappers::BroadcastStream;
1818
/// use tokio_stream::StreamExt;
1919
///
20-
/// #[tokio::main]
21-
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
22-
/// let (tx, rx) = broadcast::channel(16);
23-
/// tx.send(10)?;
24-
/// tx.send(20)?;
25-
/// drop(tx);
20+
/// # #[tokio::main]
21+
/// # async fn main() -> Result<(), tokio::sync::broadcast::error::SendError<u8>> {
22+
/// let (tx, rx) = broadcast::channel(16);
23+
/// tx.send(10)?;
24+
/// tx.send(20)?;
25+
/// # // prevent the doc test from hanging
26+
/// drop(tx);
2627
///
27-
/// let mut stream = BroadcastStream::new(rx);
28-
/// assert_eq!(stream.next().await, Some(Ok(10)));
29-
/// assert_eq!(stream.next().await, Some(Ok(20)));
30-
/// assert_eq!(stream.next().await, None);
31-
/// Ok(())
32-
/// }
28+
/// let mut stream = BroadcastStream::new(rx);
29+
/// assert_eq!(stream.next().await, Some(Ok(10)));
30+
/// assert_eq!(stream.next().await, Some(Ok(20)));
31+
/// assert_eq!(stream.next().await, None);
32+
/// # Ok(())
33+
/// # }
3334
/// ```
3435
///
3536
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver

tokio-stream/src/wrappers/interval.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,21 @@ use tokio::time::{Instant, Interval};
88
/// # Example
99
///
1010
/// ```
11-
/// use tokio::time::{Duration, Instant};
11+
/// use tokio::time::{Duration, Instant, interval};
1212
/// use tokio_stream::wrappers::IntervalStream;
1313
/// use tokio_stream::StreamExt;
1414
///
15-
/// #[tokio::main]
16-
/// async fn main() {
17-
/// let interval = tokio::time::interval(Duration::from_millis(10));
18-
/// let mut stream = IntervalStream::new(interval);
19-
/// let start = Instant::now();
20-
/// for _ in 0..3 {
21-
/// if let Some(instant) = stream.next().await {
22-
/// println!("elapsed: {:.1?}", instant.duration_since(start));
23-
/// }
15+
/// # #[tokio::main]
16+
/// # async fn main() {
17+
/// let start = Instant::now();
18+
/// let interval = interval(Duration::from_millis(10));
19+
/// let mut stream = IntervalStream::new(interval);
20+
/// for _ in 0..3 {
21+
/// if let Some(instant) = stream.next().await {
22+
/// println!("elapsed: {:.1?}", instant.duration_since(start));
2423
/// }
2524
/// }
25+
/// # }
2626
/// ```
2727
///
2828
/// [`Interval`]: struct@tokio::time::Interval

tokio-stream/src/wrappers/lines.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@ pin_project! {
1515
/// use tokio_stream::wrappers::LinesStream;
1616
/// use tokio_stream::StreamExt;
1717
///
18-
/// #[tokio::main]
19-
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
20-
/// let input = b"Hello\nWorld\n";
21-
/// let lines = input.lines(); // tokio::io::util::Lines
22-
/// let mut stream = LinesStream::new(lines);
23-
/// while let Some(line) = stream.next().await {
24-
/// println!("{}", line?);
25-
/// }
26-
/// Ok(())
18+
/// # #[tokio::main]
19+
/// # async fn main() -> std::io::Result<()> {
20+
/// let input = b"Hello\nWorld\n";
21+
/// let mut stream = LinesStream::new(input.lines());
22+
/// while let Some(line) = stream.next().await {
23+
/// println!("{}", line?);
2724
/// }
25+
/// # Ok(())
26+
/// # }
2827
/// ```
2928
///
3029
/// [`tokio::io::Lines`]: struct@tokio::io::Lines

tokio-stream/src/wrappers/mpsc_bounded.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,20 @@ use tokio::sync::mpsc::Receiver;
1212
/// use tokio_stream::wrappers::ReceiverStream;
1313
/// use tokio_stream::StreamExt;
1414
///
15-
/// #[tokio::main]
16-
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
17-
/// let (tx, rx) = mpsc::channel(2);
18-
/// tx.send(10).await?;
19-
/// tx.send(20).await?;
20-
/// drop(tx);
15+
/// # #[tokio::main]
16+
/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
17+
/// let (tx, rx) = mpsc::channel(2);
18+
/// tx.send(10).await?;
19+
/// tx.send(20).await?;
20+
/// # // prevent the doc test from hanging
21+
/// drop(tx);
2122
///
22-
/// let mut stream = ReceiverStream::new(rx);
23-
/// assert_eq!(stream.next().await, Some(10));
24-
/// assert_eq!(stream.next().await, Some(20));
25-
/// assert_eq!(stream.next().await, None);
26-
/// Ok(())
27-
/// }
23+
/// let mut stream = ReceiverStream::new(rx);
24+
/// assert_eq!(stream.next().await, Some(10));
25+
/// assert_eq!(stream.next().await, Some(20));
26+
/// assert_eq!(stream.next().await, None);
27+
/// # Ok(())
28+
/// # }
2829
/// ```
2930
///
3031
/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver

tokio-stream/src/wrappers/mpsc_unbounded.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,20 @@ use tokio::sync::mpsc::UnboundedReceiver;
1212
/// use tokio_stream::wrappers::UnboundedReceiverStream;
1313
/// use tokio_stream::StreamExt;
1414
///
15-
/// #[tokio::main]
16-
/// async fn main() {
17-
/// let (tx, rx) = mpsc::unbounded_channel();
18-
/// tx.send(10).unwrap();
19-
/// tx.send(20).unwrap();
20-
/// drop(tx);
15+
/// # #[tokio::main]
16+
/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
17+
/// let (tx, rx) = mpsc::unbounded_channel();
18+
/// tx.send(10)?;
19+
/// tx.send(20)?;
20+
/// # // prevent the doc test from hanging
21+
/// drop(tx);
2122
///
22-
/// let mut stream = UnboundedReceiverStream::new(rx);
23-
/// assert_eq!(stream.next().await, Some(10));
24-
/// assert_eq!(stream.next().await, Some(20));
25-
/// assert_eq!(stream.next().await, None);
26-
/// }
23+
/// let mut stream = UnboundedReceiverStream::new(rx);
24+
/// assert_eq!(stream.next().await, Some(10));
25+
/// assert_eq!(stream.next().await, Some(20));
26+
/// assert_eq!(stream.next().await, None);
27+
/// # Ok(())
28+
/// # }
2729
/// ```
2830
///
2931
/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver

tokio-stream/src/wrappers/read_dir.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@ use tokio::fs::{DirEntry, ReadDir};
1212
/// use tokio::fs::read_dir;
1313
/// use tokio_stream::{wrappers::ReadDirStream, StreamExt};
1414
///
15-
/// #[tokio::main]
16-
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
17-
/// let dirs = read_dir("examples").await?;
18-
/// let mut dirs = ReadDirStream::new(dirs);
19-
/// while let Some(dir) = dirs.next().await {
20-
/// let dir = dir?;
21-
/// println!("{}", dir.path().display());
22-
/// }
23-
/// Ok(())
15+
/// # #[tokio::main]
16+
/// # async fn main() -> std::io::Result<()> {
17+
/// let dirs = read_dir(".").await?;
18+
/// let mut dirs = ReadDirStream::new(dirs);
19+
/// while let Some(dir) = dirs.next().await {
20+
/// let dir = dir?;
21+
/// println!("{}", dir.path().display());
2422
/// }
23+
/// # Ok(())
24+
/// # }
2525
/// ```
2626
///
2727
/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir

tokio-stream/src/wrappers/tcp_listener.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,21 @@ use tokio::net::{TcpListener, TcpStream};
1616
/// use tokio::net::TcpListener;
1717
/// use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
1818
///
19-
/// #[tokio::main]
20-
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
21-
/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
22-
/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
23-
/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
24-
/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
25-
/// let mut connections = ipv4_connections.chain(ipv6_connections);
26-
/// while let Some(tcp_stream) = connections.next().await {
27-
/// let stream = tcp_stream?;
28-
/// println!(
29-
/// "accepted connection; peer address = {:?}",
30-
/// stream.peer_addr()?
31-
/// );
32-
/// }
33-
/// Ok(())
19+
/// # #[tokio::main]
20+
/// # async fn main() -> std::io::Result<()> {
21+
/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
22+
/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
23+
/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
24+
/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
25+
///
26+
/// let mut connections = ipv4_connections.chain(ipv6_connections);
27+
/// while let Some(tcp_stream) = connections.next().await {
28+
/// let stream = tcp_stream?;
29+
/// let peer_addr = stream.peer_addr()?;
30+
/// println!("accepted connection; peer address = {peer_addr}");
3431
/// }
32+
/// # Ok(())
33+
/// # }
3534
/// ```
3635
///
3736
/// [`TcpListener`]: struct@tokio::net::TcpListener

0 commit comments

Comments
 (0)