-
Notifications
You must be signed in to change notification settings - Fork 135
Agrona Wait Free Queues
Info on how to configure simple-react LazyFutureStream to use an Agron wait-free queue with mechanical sympathy.
NB : LazyFutureStream makes use of Queues internally for some operations, this article details how & why to configure internal Queues for LazyFutureStream. Advice on bound sizes would not apply to external Queues you may wish to set up (even if LazyFutureStreams read from the those Queues). For external Queues your own business rules will determine max bounds.
0.96 of simple-react adds support for simple-react Queue’s backed by a ManyToOneConcurrentArrayQueue.This is a Java implementation of an algorithm from Fast Flow, by the leading Java experts in this area (Martin Thompson, Richard Warburton, Todd Montgomery) via their Agrona project (which provides data structures and utilities used in the ultra-low-latency Aeron messaging system.
In simple benchmarking, LazyFutureStreams backed by an Agrona ManyToOneConcurrentArrayQueue can perform up to 40% faster than LazyFutureStreams backed by a JDK bounded wait free Queue (ConcurrentLinkedQueue). While results for most queue types showed significant variation in performance, throughput from LazyFutureStreams that are backed by Agrona ManyToOneConcurrentArrayQueue’s were much more stable. Differences in performance for LazyFutureStreams backed by ManyToOneConcurrentArrayQueue and ConcurrentLinkedQueue varied between 0 and over 40%. Non-blocking Queues performed up to twice as well as blocking queues (also with a lot of variation).
Creating a bounded queue with a large buffer capacity can be expensive. Agrona ManyToOneConcurrentArrayQueue’s with a bound size of 200,000 entries were ~10 times slower than Agrona ManyToOneConcurrentArrayQueue’s with a bound size of 110.
Despite the improvements apparent from using the Agrona ManyToOneConcurrentArrayQueue as the backing Queue, we continue to use JDK ConcurrentLinkedQueue as the default. This is because it is impossible to tell what the bound size should be for all operations. For many / most operations, the bound size can be very similar to the concurrency level as determined by the MaxActive settings. But for other operators such as flatMap, it isn’t possible to tell how many elements would need to be buffered on the queue. For that reason, the backing queue is configurable per stage via two simple operators.
This operator tells LazyFutureStream to use a bounded wait free queue for the subsequent stages (until configured otherwise). E.g.
new LazyReact(10,100) //configure a LazyFutureStream builder with a thread pool of 10 threads, that accepts 100 concurrent tasks
.react(toMyData) //define the initial Suppliers to asynchronously react to
.boundedWaitFree(110) //use a bounded queue factory, with max queue size slightly above the max number of concurrent tasks
.limit(400) //take the first 400 elements from this async Stream. The limit operator makes use of an async Queue and can thus benefit from being backed by an Agrona wait free queue.
.toList();
This is the default operator and it tells LazyFutureStream to use an unbounded wait free Queue. Although algorithmically wait-free this Queue is more likely to experience contention at lower levels (e.g. via garbage collector activity, or invalidation of CPU caches), it is however, unbounded and as such
-
does not incur significant queue creation costs
-
less likely to cause data loss should flatMap based expansions turn out to be large.
new LazyReact(10,100) //configure a LazyFutureStream builder with a thread pool of 10 threads, that accepts 100 concurrent tasks .react(toMyData) //define the initial Suppliers to asynchronously react to .boundedWaitFree(110) //use a bounded queue factory, with max queue size slightly above the max number of concurrent tasks .limit(400) //take the first 400 elements from this async Stream. The limit operator makes use of an async Queue and can thus benefit from being backed by an Agrona wait free queue. .unboundedWaitFree() //switch to unboundedWaitFree before performing an large flatMap operation .flatMap(this::loadAndStream) .toList();
Use a boundedWaitFree queue where bound sizes can be reasonably asserted to be low, otherwise favor an unbounded wait free queue. Test, test, test your assumptions on performance critical paths.
simple-react works by perforrming aggregate operations over Streams of CompletableFutures. To perform more complex operations we often need the result that will be asynchronously populated in the Future. In order to keep data flowing smoothly through your Stream, we ‘plumb’ multiple different streams together internally, asynchronously populatng and extracting data from simple-react async.Queues. While the simple-react async.Queue provides the necessary logic to manage this, it isn’t in itself an actual Queue data structure implementation. The backing Queue is pluggable, and simple-react QueueFactories are provided for a range of blocking and non-blocking queue implementations.
oops - my bad