High performance, configurable and expressive parallel computation library.
Parallel computation is defined using the parallel iterator trait ParIter
.
The goal is to convert an expressive sequential program into an efficient parallel program only by replacing iter
with par
; and into_iter
with into_par
.
The following is a naive traveling salesperson algorithm which randomly generates sequences and picks the one with the minimum duration as the best tour. The example demonstrates chaining of very common and useful map
, filter
and reduce
(min_by_key
) operations. Notice that the only difference between the sequential and parallel programs is the par()
call.
use orx_parallel::*;
use rand::prelude::*;
struct Tour(Vec<usize>);
impl Tour {
fn random(n: usize) -> Self {
let mut cities: Vec<_> = (0..n).collect();
cities.shuffle(&mut rand::rng());
Self(cities)
}
fn not_in_standard_order(&self) -> bool {
self.0.iter().enumerate().any(|(i, c)| i != *c)
}
fn duration(&self) -> usize {
let mut total = 0;
let links = self.0.iter().zip(self.0.iter().skip(1));
for (a, b) in links {
total += (*a as i64 - *b as i64).abs() as usize;
}
total
}
}
let num_tours = 1_000_000;
let num_cities = 10;
// sequential
let best_tour = (0..num_tours)
.map(|_| Tour::random(num_cities))
.filter(|t| t.not_in_standard_order())
.min_by_key(|t| t.duration())
.unwrap();
// parallel
let best_tour = (0..num_tours)
.par() // parallelized !!
.map(|_| Tour::random(num_cities))
.filter(|t| t.not_in_standard_order())
.min_by_key(|t| t.duration())
.unwrap();
Inputs that can be used in parallel computations can be categorized in three groups:
- i. directly parallelizable collections
- ii. parallelization of any iterator
- iii. parallelization of any collection
These are collections which are parallelized by utilizing their specific structure to achieve high performance.
orx-parallel crate provides direct implementations of std collections. Implementations of custom collections must belong to the respective crates as they most likely require to access the internals. The following is the most recent table of direct implementations.
Type | Over References-> ParIter<Item = &T> |
Over Owned Values-> ParIter<Item = T> |
---|---|---|
v: Vec<T> |
v.par() |
v.into_par() |
s: &[T] |
s.par() s.into_par() |
|
r: Range<usize> |
r.par() r.into_par() |
Since these implementations are particularly optimized for the collection type, it is preferable to start defining parallel computation from the collection whenever available. In other words, for a vector v
,
v.par().map(...).filter(...).reduce(...)
is a better approach thanv.iter().iter_into_par().map(...).filter(...).reduce(...)
, which will be explained in the next subsection.
extensibility: Note that any input collection or generator that implements
IntoConcurrentIter
automatically implementsIntoParIter
. Therefore, a new collection can be directly parallelized provided that its concurrent iterator is implemented.
Any arbitrary sequential Iterator implements IterIntoParIter
trait and can be converted into a parallel iterator using the iter_into_par
method.
This is very powerful since it allows to parallelize all iterables, which includes pretty much every collection and more.
On the other hand, due to being a generic implementation without collection specific optimizations, parallelized computation might underperform its sequential counterpart if the work to be done on each input element is insignificant. For instance, i
being an arbitrary iterator of numbers, i.sum()
will most likely be faster than i.iter_into_par().sum()
.
This being said, ParIter
takes advantage of certain optimizations, such as buffering and chunk size optimization, in order to improve performance. Therefore, whenever the computation on the iterator elements is more involved than just returning them or adding numbers, we can benefit from parallelization. The respective section of benchmarks present significant improvements achieved consistently.
Lastly, consider a collection which does not provide a direct concurrent iterator implementation. This might be our custom collection, say MyCollection
; or an external collection without a concurrent iterator implementation, such as the HashSet<T>
.
There are two methods to parallelize computations over such collections:
- (ii) parallelize using the collection's iterator, or
- (i) collect the elements in a vector and then parallelize the vector.
The following table demonstrates these methods for the HashSet
; however, they are applicable to any collection with iter
and into_iter
methods.
Type | Method | Over References-> ParIter<Item = &T> |
Over Owned Values-> ParIter<Item = T> |
---|---|---|---|
h: HashSet<T> |
ii | h.iter() .iter_into_par() |
h.into_iter() .iter_into_par() |
i | h.iter() .collect::<Vec<_>>() .par() |
h.into_iter() .collect::<Vec<_>>() .into_par() |
Note that each approach can be more efficient in different scenarios. As a rule of thumb, the less insignificant the work to be done on elements is, the less critical is the choice, in which case parallelization over iterator (ii) is preferable since it avoids the allocation of the vector.
You may find some sample parallel programs in examples directory. These examples allow to express parallel computations as iterator method compositions and run quick experiments with different approaches. Examples use GenericIterator
. As the name suggests, it is a generalization of sequential iterator, rayon's parallel iterator and orx-parallel's parallel iterator, and hence, allows for convenient experiments. You may play with the code, update the tested computations and run these examples by including generic_iterator feature, such as:
cargo run --release --features generic_iterator --example benchmark_collect -- --len 123456 --num-repetitions 10
Actual benchmark files are located in benches directory. Tables below report average execution times in microseconds. The numbers in parentheses represent the ratio of execution time to that of sequential computation which is used as the baseline (1.00). Parallelized executions of all benchmarks are carried out with default settings.
Computations are separated into three categories with respect to how the iterator is consumed: collect, reduce and early-exit. Further, two additional categories are created to test parallelization of arbitrary iterators (ii) and flexibility in composition of computations.
In this group of benchmarks, outputs of parallel computations are collected into vectors. The arguments of transformations such as filter
, filter_map
or map
are example functions applied to the input. Details of the functions can be found in the respective benchmark files (you may use the link in the file column).
(s) Outputs can also be collected into a
SplitVec
, which can provide further improvements by avoiding memory copies. Note that a split vector provides constant time random access; and despite the fact that it is split to fragments, it asymptotically inherits advantages of contiguous vectors.
file | computation | sequential | rayon | orx-parallel | orx-parallel (s) |
---|---|---|---|---|---|
⇨ | inputs.into_par() .filter(filter) .collect() |
5.92 (1.00) | 12.58 (2.12) | 2.50 (0.42) | 2.47 (0.42) |
⇨ | inputs.into_par() .filter_map(filter_map) .collect() |
15.95 (1.00) | 12.62 (0.79) | 6.75 (0.42) | 6.37 (0.40) |
⇨ | inputs.into_par() .flat_map(flat_map) .collect() |
187.37 (1.00) | 492.18 (2.63) | 57.00 (0.30) | 50.34 (0.27) |
⇨ | inputs.into_par() .map(map).filter(filter) .collect() |
47.97 (1.00) | 14.69 (0.31) | 11.98 (0.25) | 10.29 (0.21) |
⇨ | inputs.into_par() .map(map) .collect() |
36.21 (1.00) | 14.36 (0.40) | 11.76 (0.32) | 14.47 (0.40) |
In this group, instead of collecting outputs, the results are reduced to a single value. Some common reductions are sum
, count
, min
, etc.
file | computation | sequential | rayon | orx-parallel |
---|---|---|---|---|
⇨ | inputs.into_par() .map(map).filter(filter) .reduce(reduce) |
31.92 (1.00) | 16.16 (0.51) | 7.07 (0.22) |
⇨ | inputs.into_par() .map(map) .reduce(reduce) |
32.26 (1.00) | 7.85 (0.24) | 6.96 (0.22) |
⇨ | inputs.into_par() .reduce(reduce) |
2.00 (1.00) | 12.18 (6.09) | 1.09 (0.55) |
In the last category of computations, computations that allow for early exit or short-circuit are investigated. As an example, experiments on find
method are presented; methods such as find_any
, any
or all
lead to similar results.
file | computation | sequential | rayon | orx-parallel |
---|---|---|---|---|
⇨ | inputs.into_par() .flat_map(flat_map) .find(find) |
170.80 (1.00) | 120.63 (0.71) | 27.53 (0.16) |
⇨ | inputs.into_par() .map(map).filter(filter) .find(find) |
46.28 (1.00) | 11.96 (0.26) | 9.67 (0.21) |
⇨ | inputs.into_par() .find(find) |
2.51 (1.00) | 12.15 (4.85) | 1.24 (0.49) |
As discussed in ii, parallelization of regular iterators is a very powerful feature. The benchmarks in this category demonstrate that significant improvements can be achieved provided that the computation on elements is not insignificant. Note that every computation defined after iter_into_par()
are parallelized; and hence, the work on elements here are the map
and filter
computations.
file | computation | sequential | rayon | orx-parallel |
---|---|---|---|---|
⇨ | inputs.iter().iter_into_par() .map(map).filter(filter) .collect() |
48.09 (1.00) | 59.54 (1.24) | 16.84 (0.35) |
⇨ | inputs.iter_into_par() .map(map).filter(filter) .reduce(reduce) |
33.07 (1.00) | 213.36 (6.45) | 7.53 (0.23) |
⇨ | inputs.into_iter().iter_into_par() .map(map).filter(filter) .find(&find) |
44.53 (1.00) | 55.31 (1.24) | 11.78 (0.26) |
In the final category of benchmarks, impact of long chains of transformations on computation time is tested. You may see such example long chains in the benchmark computations below. Notice that the caller can actually shorten the chains by composing some of them. An obvious one is the .map(map3).map(map4)
call which could have been one call like map(map3-then-map4)
.
However, the results suggest that this is not required and the functions are efficiently composed by the parallel iterator. Therefore, the focus on the caller side can be on the expressiveness of the computation.
file | computation | sequential | rayon | orx-parallel |
---|---|---|---|---|
⇨ | inputs.into_par() .map(map1).filter(filter1).map(map2) .filter(filter2).map(map3).map(map4) .filter(filter4).collect() |
35.89 (1.00) | 10.11 (0.28) | 7.40 (0.21) |
⇨ | inputs.into_par() .map(map1).filter(filter1).map(map2) .filter(filter2).map(map3).map(map4) .filter(filter4).reduce(reduce) |
32.28 (1.00) | 8.99 (0.28) | 6.63 (0.21) |
Parallel execution is governed by two main straightforward parameters.
NumThreads
is the degree of parallelization. This is a capacity parameter used to limit the resources that can be used by the computation.Auto
: All available threads can be used, but not necessarily.Max(n)
: The computation can spawn at most n threads.Max(1)
: Falls back to sequential execution on the main thread.
ChunkSize
represents the number of elements a parallel worker will pull and process every time it becomes idle. This is an optimization parameter that can be tuned to balance the overhead of parallelization and cost of heterogeneity of tasks.Auto
: Let the parallel executor dynamically decide, achieves high performance in general and can be used unless we have useful computation specific knowledge.Exact(c)
: Chunks will have c elements; gives complete control to the caller. Useful when we have a very good knowledge or want to tune the computation for certain data.Min(c)
: Chunk will have at least c elements. Parallel executor; however, might decide to pull more if each computation is handled very fast.
See also the last parameter IterationOrder
with variants Ordered
(default) and Arbitrary
which is another useful optimization parameter for specific use cases.
When omitted, NumThreads::Auto
and ChunkSize::Auto
will be used. Configuring parallel computation is straightforward and specific to computation rather than through a global setting.
use orx_parallel::*;
use std::num::NonZeroUsize;
let n = 1024;
_ = (0..n).par().sum(); // NumThreads::Auto & ChunkSize::Auto
_ = (0..n).par().num_threads(4).sum(); // <= 4 threads
_ = (0..n).par().num_threads(1).sum(); // sequential
_ = (0..n).par().num_threads(0).sum(); // shorthand for NumThreads::Auto
_ = (0..n).par().chunk_size(64).sum(); // chunks of exactly 64 elements
let c = ChunkSize::Min(NonZeroUsize::new(16).unwrap());
_ = (0..n).par().chunk_size(c).sum(); // chunks of at least 16 elements
_ = (0..n).par().num_threads(4).chunk_size(16).sum(); // set both params
Note that NumThreads::Max(1)
executes the computation sequentially, without any parallelization overhead and benefiting from optimizations of regular iterators.
This gives the consumer, who actually executes the defined computation, complete control to:
- execute in parallel with the given configuration, or
- execute sequentially, or
- execute in parallel with any number of threads that it decides.
This is guaranteed by the fact that both consuming computation calls and configuration methods require ownership (self
) of the iterator.
This crate defines parallel computation by combining two basic aspects.
- Pulling inputs in parallel is achieved through
ConcurrentIter
. Concurrent iterator implementations are lock-free, efficient and support pull-by-chunks optimization to reduce the parallelization overhead. A thread can pull any number of inputs from the concurrent iterator every time it becomes idle. This provides the means to dynamically decide on the chunk sizes. - Writing outputs in parallel is handled using thread-safe containers such as
ConcurrentBag
andConcurrentOrderedBag
. Similarly, these are lock-free collections that aim for high performance collection of results.
Finally, ParallelRunner
trait manages parallelization of the given computation with desired configuration. The objective of the parallel runner is to optimize the chunk sizes to solve the tradeoff between impact of heterogeneity of individual computations and overhead of parallelization.
Since it is a trait, parallel runner is customizable. It is possible to implement and use your own runner simply by calling with_runner
transformation method on the parallel iterator. Default parallel runner targets to be efficient in general. When we have a use case with special characteristics, we can implement a ParallelRunner
optimized for this scenario and use with the parallel iterators.
Contributions are welcome!
Please open an issue or create a PR,
- if you notice an error,
- have a question or think something could be improved,
- have an input collection or generator that needs to be parallelized, or
- having trouble representing a particular parallel computation with parallel iterators,
- or anything else:)
Finally, feel free to contact me if you are interested in optimization of the parallel runner to further improve performance, by maybe dynamic optimization of chunk size decisions with respect to online collection and analysis of metrics.
Dual-licensed under Apache 2.0 or MIT.