Skip to content

Commit

Permalink
Merge pull request #831 from wagnerf42/blocks
Browse files Browse the repository at this point in the history
added by_blocks method
  • Loading branch information
cuviper authored Feb 27, 2024
2 parents 89f885d + 4d2f0b6 commit bacd468
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 9 deletions.
199 changes: 190 additions & 9 deletions rayon-demo/src/find/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/// Simple benchmarks of `find_any()` performance
/// Simple benchmarks of `find_any()` and `find_first` performance

macro_rules! make_tests {
($n:expr, $m:ident) => {
Expand All @@ -21,56 +21,237 @@ macro_rules! make_tests {
.collect()
});

// this is a very dumb find_first algorithm.
// no early aborts so we have a linear best case cost.
fn find_dumb<I: ParallelIterator, P: Fn(&I::Item) -> bool + Send + Sync>(
iter: I,
cond: P,
) -> Option<I::Item> {
iter.map(|e| if cond(&e) { Some(e) } else { None })
.reduce(|| None, |left, right| left.or(right))
}

#[bench]
fn parallel_find_any_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first(b: &mut Bencher) {
fn parallel_find_first_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn serial_find_first(b: &mut Bencher) {
fn parallel_find_first_blocks_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_start(b: &mut Bencher) {
let needle = HAYSTACK[0][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_last(b: &mut Bencher) {
fn parallel_find_any_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}
#[bench]
fn parallel_find_first_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}
#[bench]
fn parallel_find_first_blocks_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_last(b: &mut Bencher) {
fn serial_find_end(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() - 1][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_middle(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
fn parallel_find_any_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn parallel_find_dumb_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(
|| assert!(find_dumb(HAYSTACK.par_iter(), (|&&x| x[0] == needle)).is_some()),
);
}

#[bench]
fn parallel_find_first_blocks_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_third(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_any_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn parallel_find_dumb_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(
|| assert!(find_dumb(HAYSTACK.par_iter(), (|&&x| x[0] == needle)).is_some()),
);
}

#[bench]
fn parallel_find_first_blocks_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_middle(b: &mut Bencher) {
let needle = HAYSTACK[(HAYSTACK.len() / 2).saturating_sub(1)][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_any_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_first_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn parallel_find_first_blocks_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_some())
});
}

#[bench]
fn serial_find_two_thirds(b: &mut Bencher) {
let needle = HAYSTACK[HAYSTACK.len() / 3 * 2][0];
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_some()));
}

#[bench]
fn parallel_find_missing(b: &mut Bencher) {
fn parallel_find_any_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| assert!(HAYSTACK.par_iter().find_any(|&&x| x[0] == needle).is_none()));
}

#[bench]
fn parallel_find_first_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.find_first(|&&x| x[0] == needle)
.is_none())
});
}

#[bench]
fn parallel_find_first_blocks_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| {
assert!(HAYSTACK
.par_iter()
.by_exponential_blocks()
.find_first(|&&x| x[0] == needle)
.is_none())
});
}

#[bench]
fn serial_find_missing(b: &mut Bencher) {
let needle = HAYSTACK.iter().map(|v| v[0]).max().unwrap() + 1;
b.iter(|| assert!(HAYSTACK.iter().find(|&&x| x[0] == needle).is_none()));
}

#[bench]
fn parallel_find_common(b: &mut Bencher) {
fn parallel_find_any_common(b: &mut Bencher) {
b.iter(|| {
assert!(HAYSTACK
.par_iter()
Expand Down
2 changes: 2 additions & 0 deletions src/compile_fail/must_use.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ macro_rules! must_use {
}

must_use! {
by_exponential_blocks /** v.par_iter().by_exponential_blocks(); */
by_uniform_blocks /** v.par_iter().by_uniform_blocks(2); */
step_by /** v.par_iter().step_by(2); */
chain /** v.par_iter().chain(&v); */
chunks /** v.par_iter().chunks(2); */
Expand Down
131 changes: 131 additions & 0 deletions src/iter/blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use super::plumbing::*;
use super::*;

struct BlocksCallback<S, C> {
sizes: S,
consumer: C,
len: usize,
}

impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
where
C: UnindexedConsumer<T>,
S: Iterator<Item = usize>,
{
type Output = C::Result;

fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
let mut remaining_len = self.len;
let mut consumer = self.consumer;

// we need a local variable for the accumulated results
// we call the reducer's identity by splitting at 0
let (left_consumer, right_consumer, _) = consumer.split_at(0);
let mut leftmost_res = left_consumer.into_folder().complete();
consumer = right_consumer;

// now we loop on each block size
while remaining_len > 0 && !consumer.full() {
// we compute the next block's size
let size = self.sizes.next().unwrap_or(std::usize::MAX);
let capped_size = remaining_len.min(size);
remaining_len -= capped_size;

// split the producer
let (left_producer, right_producer) = producer.split_at(capped_size);
producer = right_producer;

// split the consumer
let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
consumer = right_consumer;

leftmost_res = consumer.to_reducer().reduce(
leftmost_res,
bridge_producer_consumer(capped_size, left_producer, left_consumer),
);
}
leftmost_res
}
}

/// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence
/// of parallel blocks of increasing sizes (exponentially).
///
/// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`]
///
/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
#[derive(Debug, Clone)]
pub struct ExponentialBlocks<I> {
base: I,
}

impl<I> ExponentialBlocks<I> {
pub(super) fn new(base: I) -> Self {
Self { base }
}
}

impl<I> ParallelIterator for ExponentialBlocks<I>
where
I: IndexedParallelIterator,
{
type Item = I::Item;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let first = crate::current_num_threads();
let callback = BlocksCallback {
consumer,
sizes: std::iter::successors(Some(first), exponential_size),
len: self.base.len(),
};
self.base.with_producer(callback)
}
}

fn exponential_size(size: &usize) -> Option<usize> {
Some(size.saturating_mul(2))
}

/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
/// of parallel blocks of constant sizes.
///
/// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`]
///
/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
#[derive(Debug, Clone)]
pub struct UniformBlocks<I> {
base: I,
block_size: usize,
}

impl<I> UniformBlocks<I> {
pub(super) fn new(base: I, block_size: usize) -> Self {
Self { base, block_size }
}
}

impl<I> ParallelIterator for UniformBlocks<I>
where
I: IndexedParallelIterator,
{
type Item = I::Item;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let callback = BlocksCallback {
consumer,
sizes: std::iter::repeat(self.block_size),
len: self.base.len(),
};
self.base.with_producer(callback)
}
}
Loading

0 comments on commit bacd468

Please sign in to comment.