Skip to content

Commit

Permalink
Stream::cycle implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Aug 15, 2019
1 parent 487811e commit 4b1c188
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
25 changes: 25 additions & 0 deletions examples/stream-cycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Repeats given stream values and sum them
#![feature(async_await)]

use async_std::io;
use async_std::prelude::*;
use async_std::stream;
use async_std::task;

fn main() -> io::Result<()> {
task::block_on(async {
let mut s = stream::cycle(vec![6, 7, 8]);
let mut total = 0;

while let Some(v) = s.next().await {
total += v;
if total == 42 {
println!("Found {} the meaning of life!", total);
break;
}
}

Ok(())
})
}
57 changes: 57 additions & 0 deletions src/stream/cycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::pin::Pin;
use std::sync::Mutex;

use crate::task::{Context, Poll};

/// Creates a stream that yields the same elements continually.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let mut s = stream::cycle(vec![1, 2, 3]);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(3));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// #
/// # }) }
/// ```
pub fn cycle<T>(items: Vec<T>) -> Cycle<T>
where
T: Clone,
{
Cycle {
items,
cursor: Mutex::new(0_usize),
}
}

/// A stream that yields the same elements continually.
///
/// This stream is constructed by the [`cycle`] function.
///
/// [`cycle`]: fn.cycle.html
#[derive(Debug)]
pub struct Cycle<T> {
items: Vec<T>,
cursor: Mutex<usize>,
}

impl<T: Clone> futures::Stream for Cycle<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let cursor = &mut *self.cursor.lock().unwrap();
let p = Poll::Ready(self.items.get(*cursor).map(|x| x.to_owned()));
*cursor = (*cursor + 1_usize) % self.items.len();
p
}
}
2 changes: 2 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
//! # }) }
//! ```
pub use cycle::{cycle, Cycle};
pub use empty::{empty, Empty};
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use stream::{Stream, Take};

mod cycle;
mod empty;
mod once;
mod repeat;
Expand Down

0 comments on commit 4b1c188

Please sign in to comment.