Skip to content

Commit

Permalink
Simplified distributed interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tbetcke committed Dec 30, 2024
1 parent 7708ae0 commit 8b122d7
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 80 deletions.
67 changes: 32 additions & 35 deletions examples/evaluate_distributed.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
//! Distributed evaluation of sources and targets.
use bempp_distributed_tools::IndexLayoutFromLocalCounts;
use green_kernels::traits::*;
use green_kernels::{laplace_3d::Laplace3dKernel, types::GreenKernelEvalType};
use mpi::traits::Communicator;
use mpi::traits::{Communicator, Root};
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use rlst::prelude::*;
use rlst::{
assert_array_relative_eq, rlst_dynamic_array1, DistributedVector, RawAccess, RawAccessMut,
};
use rlst::{assert_array_relative_eq, rlst_dynamic_array1, RawAccess, RawAccessMut};

fn main() {
// Create the MPI communicator
Expand All @@ -27,66 +24,66 @@ fn main() {
// Create a Laplace kernel.
let kernel = Laplace3dKernel::<f64>::default();

// We create index layout for sources and targets.
let source_layout = IndexLayoutFromLocalCounts::new(3 * n_sources, &world);
let target_layout = IndexLayoutFromLocalCounts::new(3 * n_targets, &world);
let charge_layout = IndexLayoutFromLocalCounts::new(n_sources, &world);
let result_layout = IndexLayoutFromLocalCounts::new(n_targets, &world);
let mut sources = rlst_dynamic_array1!(f64, [3 * n_sources]);
let mut targets = rlst_dynamic_array1!(f64, [3 * n_targets]);
let mut charges = rlst_dynamic_array1!(f64, [n_sources]);

// Create the sources and charges.
let sources = DistributedVector::<_, f64>::new(&source_layout);
let targets = DistributedVector::<_, f64>::new(&target_layout);

sources.local_mut().fill_from_equally_distributed(&mut rng);
targets.local_mut().fill_from_equally_distributed(&mut rng);

// Create the charges.
let charges = DistributedVector::<_, f64>::new(&charge_layout);
charges.local_mut().fill_from_equally_distributed(&mut rng);
sources.fill_from_equally_distributed(&mut rng);
targets.fill_from_equally_distributed(&mut rng);
charges.fill_from_equally_distributed(&mut rng);

// Create the result vector.
let mut result = DistributedVector::<_, f64>::new(&result_layout);

// Evaluate the kernel.
let mut result = rlst_dynamic_array1!(f64, [n_targets]);

kernel.evaluate_distributed(
GreenKernelEvalType::Value,
&sources,
&targets,
&charges,
&mut result,
sources.data(),
targets.data(),
charges.data(),
result.data_mut(),
false,
&world,
);

// We now check the result with an evaluation only on the first rank.

if world.rank() != 0 {
sources.gather_to_rank(0);
targets.gather_to_rank(0);
charges.gather_to_rank(0);
result.gather_to_rank(0);
let root_process = world.process_at_rank(0);

root_process.gather_into(sources.data());
root_process.gather_into(targets.data());
root_process.gather_into(charges.data());
root_process.gather_into(result.data());
} else {
let sources = {
let mut tmp = rlst_dynamic_array1!(f64, [3 * n_sources * world.size() as usize]);
sources.gather_to_rank_root(tmp.r_mut());
world
.this_process()
.gather_into_root(sources.data(), tmp.data_mut());
tmp
};

let targets = {
let mut tmp = rlst_dynamic_array1!(f64, [3 * n_targets * world.size() as usize]);
targets.gather_to_rank_root(tmp.r_mut());
world
.this_process()
.gather_into_root(targets.data(), tmp.data_mut());
tmp
};

let charges = {
let mut tmp = rlst_dynamic_array1!(f64, [n_sources * world.size() as usize]);
charges.gather_to_rank_root(tmp.r_mut());
world
.this_process()
.gather_into_root(charges.data(), tmp.data_mut());
tmp
};

let result = {
let mut tmp = rlst_dynamic_array1!(f64, [n_targets * world.size() as usize]);
result.gather_to_rank_root(tmp.r_mut());
world
.this_process()
.gather_into_root(result.data(), tmp.data_mut());
tmp
};

Expand Down
83 changes: 38 additions & 45 deletions src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::types::GreenKernelEvalType;
use mpi::traits::{Communicator, Equivalence, Root};
use rlst::RlstScalar;
#[cfg(feature = "mpi")]
use rlst::{rlst_dynamic_array1, DistributedVector, IndexLayout, RawAccess, RawAccessMut};
use rlst::{rlst_dynamic_array1, RawAccess, RawAccessMut};

/// Interface to evaluating Green's functions for given sources and targets.
pub trait Kernel: Sync {
Expand Down Expand Up @@ -126,65 +126,58 @@ pub trait Kernel: Sync {
/// Otherwise, the evaluation on each rank is single-threaded.
#[cfg(feature = "mpi")]
pub trait DistributedKernelEvaluator: Kernel {
fn evaluate_distributed<
SourceLayout: IndexLayout,
TargetLayout: IndexLayout,
ChargeLayout: IndexLayout,
ResultLayout: IndexLayout,
>(
fn evaluate_distributed<C: Communicator>(
&self,
eval_type: GreenKernelEvalType,
sources: &DistributedVector<'_, SourceLayout, <Self::T as RlstScalar>::Real>,
targets: &DistributedVector<'_, TargetLayout, <Self::T as RlstScalar>::Real>,
charges: &DistributedVector<'_, ChargeLayout, Self::T>,
result: &mut DistributedVector<'_, ResultLayout, Self::T>,
sources: &[<Self::T as RlstScalar>::Real],
targets: &[<Self::T as RlstScalar>::Real],
charges: &[Self::T],
result: &mut [Self::T],
use_multithreaded: bool,
comm: &C,
) where
Self::T: Equivalence,
<Self::T as RlstScalar>::Real: Equivalence,
{
// We want that everything has the same communicator
assert!(std::ptr::addr_eq(
sources.index_layout().comm(),
charges.index_layout().comm()
));
assert!(std::ptr::addr_eq(
sources.index_layout().comm(),
targets.index_layout().comm()
));
assert!(std::ptr::addr_eq(
sources.index_layout().comm(),
result.index_layout().comm()
));
// Check that the number of sources and number of charges are compatible.
assert_eq!(sources.len(), 3 * charges.len());

// Check that the output vector has the correct size.
// Multiply result by 3 since targets have 3 components (x, y, z) direction.
assert_eq!(
self.range_component_count(eval_type)
* targets.index_layout().number_of_local_indices(),
3 * result.index_layout().number_of_local_indices()
self.range_component_count(eval_type) * targets.len(),
3 * result.len()
);

let size = sources.index_layout().comm().size();
let size = comm.size();

// We now iterate through each rank associated with the sources and communicate from that rank
// the sources to all target ranks.

for rank in 0..size as usize {
// Communicate the sources and charges from `rank` to all ranks.

let root_process = sources.index_layout().comm().process_at_rank(rank as i32);
let source_range = sources.index_layout().index_range(rank).unwrap();
let charge_range = charges.index_layout().index_range(rank).unwrap();
let nsources = source_range.1 - source_range.0;
let ncharges = charge_range.1 - charge_range.0;
// Make sure that number of sources and charges are compatible.
assert_eq!(nsources, 3 * ncharges);
let mut root_sources = rlst_dynamic_array1!(<Self::T as RlstScalar>::Real, [nsources]);
let mut root_charges = rlst_dynamic_array1!(Self::T, [ncharges]);
// If we are on `rank` fill the sources and charges.
if sources.index_layout().comm().rank() == rank as i32 {
root_sources.fill_from(sources.local().r());
root_charges.fill_from(charges.local().r());
// We first need to tell all ranks how many sources and charges we have.
let root_process = comm.process_at_rank(rank as i32);

let nsources = {
let mut nsources;
if comm.rank() == rank as i32 {
nsources = charges.len();
} else {
nsources = 0;
}
root_process.broadcast_into(&mut nsources);
nsources
};

let mut root_sources =
rlst_dynamic_array1!(<Self::T as RlstScalar>::Real, [3 * nsources]);
let mut root_charges = rlst_dynamic_array1!(Self::T, [nsources]);

if comm.rank() == rank as i32 {
root_sources.data_mut().copy_from_slice(sources);
root_charges.data_mut().copy_from_slice(charges);
}

root_process.broadcast_into(&mut root_sources.data_mut()[..]);
Expand All @@ -196,17 +189,17 @@ pub trait DistributedKernelEvaluator: Kernel {
self.evaluate_mt(
eval_type,
&root_sources.data()[..],
targets.local().data(),
targets,
&root_charges.data()[..],
result.local_mut().data_mut(),
result,
);
} else {
self.evaluate_st(
eval_type,
&root_sources.data()[..],
targets.local().data(),
targets,
&root_charges.data()[..],
result.local_mut().data_mut(),
result,
);
}
}
Expand Down

0 comments on commit 8b122d7

Please sign in to comment.