Skip to content

Queues explained

johnmcclean-aol edited this page Feb 16, 2015 · 6 revisions

SimpleReact Queues are a way of joining two or more JDK 8 Streams. Producing Streams add to the Queue, and Consuming Streams remove data from the Queue. Consuming Streams will block if there is no data.

SimpleReact Queues can be backed by any BlockingQueue implementation. With unbounded BlockingQueues Producers can limitlessly (until OutOfMemory) add to a Queue. With bounded Blocking Queues Producers will be blocked (using queue.offer() ) until Consumers can remove data.

Simple example :

Stream<String> stream = Stream.of("1", "2", "3");
Queue<String> q = new Queue(new LinkedBlockingQueue());
q.fromStream(stream);
Stream<String> dq = q.stream();
Integer dequeued = q.stream().limit(3).map(it -> Integer.valueOf(it))
			.reduce(0, (acc, next) -> acc + next);

In this example we populate a Queue from a finite Stream, and then build another Steam from the Queue. Both Streams are operating in the same thread.

Streaming across threads

Queues can be very useful when infinite (or large) Producing Streams provide data to infinite (or large) Consuming Streams on separate threads.

Generating an infinite Stream using SimpleReact

Queue<Integer> q = new Queue(new LinkedBlockingQueue());
AtomicInteger count = new AtomicInteger(0);
new SimpleReact().react(() -> q.fromStream(
                                  Stream.generate(() -> count.incrementAndGet())));

In this example, we are simply returning the next number in a sequence, but we could use similar code to read data from file, remote service, or database. Similarly we could populate the Stream on a listening thread triggered by external events (incoming REST Calls, or messages from a message queue).

We can extract a finite set of values from the queue

q.stream().limit(1000)
      .peek(it -> System.out.println(it))
      .collect(Collectors.toList());

Or build an infinite lazy Stream

    SimpleReact.lazy()
	 .fromStream(q.streamCompletableFutures())
	 .then(it -> "*" + it).peek(it -> incrementFound())
	 .peek(it -> System.out.println(it))
	 .run(Executors.newSingleThreadExecutor());

This will move Stream management unto a single thread (not the current thread) - via .run(Executors.newSingleThreadExecutor()), while SimpleReact will react to the Stream in a multithreaded fashion o with SimpleReact's ExecutorService (by default configured to have parallelism equal to the number of cores).

Clone this wiki locally