Skip to content

cyclops streams : Scheduling Streams (ReactiveSeq, jOOλ Javaslang JDK)

johnmcclean-aol edited this page Nov 21, 2016 · 1 revision

Scheduling Streams

Since cyclops 7.1.0 it is possible to schedule Stream events (whereby the next element will travel through the pipeline), using a cron expression, fixed rate delimiter or fixed delay delimiter.

Supported Stream types are

Coming Soon

ReactiveSeq equivalent for Javaslang Streams

Examples

Cron

ReactiveSeq

Send one element of a Stream through every second.

ReactiveSeq.of(1,2,3,4)
	 .peek(System.out::println)
	.schedule("* * * * * ?", ex)

This will print 1 2 3 4 With a new line per second.

We can connect to the output of this stream

HotStream<Integer> connectable = ReactiveSeq.of(1,2,3,4)
				.peek(System.out::println)
				.schedule("* * * * * ?", ex);

Stream<Integer> connected = connectable.connect();
				
				

And further process the connected Stream, in this case only processing one element per day via the debounce operator

ReactiveSeq.of(1,2,3,4)
	 .peek(System.out::println)
	 .schedule("* * * * * ?", ex)
	 .connect()
	 .debounce(1,TimeUnit.DAYS)
	 .peek(this::writeToDB)
	 .toList()

JDK 8

The final example again with JDK 8 via the static methods in StreamUtils.

StreamUtils.debounce(StreamUtils.schedule(Stream.of(1,2,3,4)
				.peek(i->count.incrementAndGet())
				.peek(System.out::println)
				,"* * * * * ?", ex)
				.connect()
				,1,TimeUnit.DAYS)
				.peek(this::writeToDB)
				.toList()

Javaslang

The final example again with Javaslang via the static methods in cyclops-javaslang StreamUtils.

StreamUtils.debounce(StreamUtils.schedule(Stream.ofAll(1,2,3,4)
				.peek(System.out::println)
				,"* * * * * ?", ex)
				.connect()
				,1,TimeUnit.DAYS)
				.peek(this::writeToDB)
				.toList()

Fixed Rate

ReactiveSeq

This time we will execute the Stream every second using a Fixed Rate delimiter

ReactiveSeq.of(1,2,3,4)
	 .peek(System.out::println)
	 .scheduleFixedRate(1000, ex)
	 .connect()
	 .debounce(1,TimeUnit.DAYS)
	 .peek(this::writeToDB)
	 .toList()

JDK 8

StreamUtils.debounce(StreamUtils.scheduleFixedRate(Stream.of(1,2,3,4)
				.peek(i->count.incrementAndGet())
				.peek(System.out::println)
				,1000, ex)
				.connect()
				,1,TimeUnit.DAYS)
				.peek(this::writeToDB)
				.toList()

Javaslang

StreamUtils.debounce(StreamUtils.scheduleFixedRate(Stream.ofAll(1,2,3,4)
				.peek(i->count.incrementAndGet())
				.peek(System.out::println)
				,1000, ex)
				.connect()
				,1,TimeUnit.DAYS)
				.peek(System.out::println)
				.toList()

FixedDelay

And now the same again with a 2 second delay between events

ReactiveSeq

ReactiveSeq.of(1,2,3,4)
	 .peek(System.out::println)
	 .scheduleFixedDelay(2000, ex)
	 .connect()
	 .debounce(1,TimeUnit.DAYS)
	 .peek(this::writeToDB)
	 .toList()

JDK 8

StreamUtils.debounce(StreamUtils.scheduleFixedDelay(Stream.of(1,2,3,4)
				.peek(System.out::println)
				,2000, ex)
				.connect()
				,1,TimeUnit.DAYS)
				.peek(this::writeToDB)
				.toList()

Javaslang

StreamUtils.debounce(StreamUtils.scheduleFixedDelay(Stream.ofAll(1,2,3,4)
				.peek(System.out::println)
				,2000, ex)
				.connect()
				,1,TimeUnit.DAYS)
				.peek(this::writeToDB)
				.toList()
Clone this wiki locally