-
Notifications
You must be signed in to change notification settings - Fork 51
cyclops streams : Scheduling Streams (ReactiveSeq, jOOλ Javaslang JDK)
Since cyclops 7.1.0 it is possible to schedule Stream events (whereby the next element will travel through the pipeline), using a cron expression, fixed rate delimiter or fixed delay delimiter.
Supported Stream types are
- JDK 8 Stream (java.util.stream.Stream) via StreamUtils
- Javaslang Stream (via cyclops-javaslang StreamUtils)
- ReactiveSeq (java.util.stream.Stream and jOOλ Seq extension).
ReactiveSeq equivalent for Javaslang Streams
Send one element of a Stream through every second.
ReactiveSeq.of(1,2,3,4)
.peek(System.out::println)
.schedule("* * * * * ?", ex)
This will print 1 2 3 4 With a new line per second.
We can connect to the output of this stream
HotStream<Integer> connectable = ReactiveSeq.of(1,2,3,4)
.peek(System.out::println)
.schedule("* * * * * ?", ex);
Stream<Integer> connected = connectable.connect();
And further process the connected Stream, in this case only processing one element per day via the debounce operator
ReactiveSeq.of(1,2,3,4)
.peek(System.out::println)
.schedule("* * * * * ?", ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()
The final example again with JDK 8 via the static methods in StreamUtils.
StreamUtils.debounce(StreamUtils.schedule(Stream.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
,"* * * * * ?", ex)
.connect()
,1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()
The final example again with Javaslang via the static methods in cyclops-javaslang StreamUtils.
StreamUtils.debounce(StreamUtils.schedule(Stream.ofAll(1,2,3,4)
.peek(System.out::println)
,"* * * * * ?", ex)
.connect()
,1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()
This time we will execute the Stream every second using a Fixed Rate delimiter
ReactiveSeq.of(1,2,3,4)
.peek(System.out::println)
.scheduleFixedRate(1000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()
StreamUtils.debounce(StreamUtils.scheduleFixedRate(Stream.of(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
,1000, ex)
.connect()
,1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()
StreamUtils.debounce(StreamUtils.scheduleFixedRate(Stream.ofAll(1,2,3,4)
.peek(i->count.incrementAndGet())
.peek(System.out::println)
,1000, ex)
.connect()
,1,TimeUnit.DAYS)
.peek(System.out::println)
.toList()
And now the same again with a 2 second delay between events
ReactiveSeq.of(1,2,3,4)
.peek(System.out::println)
.scheduleFixedDelay(2000, ex)
.connect()
.debounce(1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()
StreamUtils.debounce(StreamUtils.scheduleFixedDelay(Stream.of(1,2,3,4)
.peek(System.out::println)
,2000, ex)
.connect()
,1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()
StreamUtils.debounce(StreamUtils.scheduleFixedDelay(Stream.ofAll(1,2,3,4)
.peek(System.out::println)
,2000, ex)
.connect()
,1,TimeUnit.DAYS)
.peek(this::writeToDB)
.toList()