Skip to content

Runtime independent broadcast, which only polls it's underlying stream if no pending data is available

Notifications You must be signed in to change notification settings

mineichen/stream-broadcast-rs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Runtime independent broadcast, which only polls it's underlying stream if no pending data is available.

use futures::StreamExt;
use stream_broadcast::StreamBroadcastExt;

#[tokio::main]
async fn main() {
    let broadcast = futures::stream::iter('a'..='d').fuse().broadcast(3);
    let broadcast2 = broadcast.clone();
    assert_eq!(4, broadcast.count().await);
    // Letter 'a' wasn't available anymore due to `broadcast(3)`, which limits the buffer to 3 items
    // Left side of tuple represents number of missed items
    assert_eq!(vec![(1, 'b'), (0, 'c'), (0, 'd')], broadcast2.collect::<Vec<_>>().await);
}

Uses #![forbid(unsafe_code)]

Difference to other libraries:

shared_stream:

  • Caches the entire stream from start, which is not practical for big datasets. This crate streams from the same position where the clone-origin is currently at
  • shared_stream never skips an entry. This library only provides information about missing data
  • High risk of leaking memory

tokio::sync::broadcast:

  • Broadcasts don't implement Stream directly, but tokio_stream provides a wrapper.
  • Entries are pushed actively to the sender (No Lazy evaluation when stream is paused). This requires a subroutine, which has to be managed somehow.
  • Instead of returning missing frames in the ErrorVariant (tokio_stream), this library returns a tuple (missing_frames_since_last_frame, TData) to mitigate errors when doing stuff like stream.count()

About

Runtime independent broadcast, which only polls it's underlying stream if no pending data is available

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages