From 0aee00546706c6574531ca56720fc332c2c32814 Mon Sep 17 00:00:00 2001 From: Ian Halim Date: Wed, 31 Jul 2024 18:16:24 -0600 Subject: [PATCH] Tpetra: TAFC Converted to use Kokkos Kokkos versions of doPosts(), doPostsAllToALl(), and doPostsNbrAllToAllV() added to Tpetra_Details_DistributorActor.hpp. Kokkos version of doPosts() added to Tpetra_Distributor.hpp. Tpetra_CrsMatrix_def.hpp edited to use these new methods. Some syncs have been removed as they are now superfluous. Signed-off-by: Ian Halim --- .../tpetra/core/src/Tpetra_CrsMatrix_def.hpp | 90 +-- .../src/Tpetra_Details_DistributorActor.hpp | 652 +++++++++++++++++- .../tpetra/core/src/Tpetra_Distributor.hpp | 89 ++- 3 files changed, 757 insertions(+), 74 deletions(-) diff --git a/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp b/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp index f0eef6b3b32e..a88b5ca649ba 100644 --- a/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp +++ b/packages/tpetra/core/src/Tpetra_CrsMatrix_def.hpp @@ -47,6 +47,7 @@ #include "KokkosBlas1_scal.hpp" #include "KokkosSparse_getDiagCopy.hpp" #include "KokkosSparse_spmv.hpp" +#include "Kokkos_StdAlgorithms.hpp" #include #include @@ -8301,24 +8302,16 @@ CrsMatrix:: << std::endl; std::cerr << os.str (); } - // Make sure that host has the latest version, since we're - // using the version on host. If host has the latest - // version, syncing to host does nothing. - destMat->numExportPacketsPerLID_.sync_host (); - Teuchos::ArrayView numExportPacketsPerLID = - getArrayViewFromDualView (destMat->numExportPacketsPerLID_); - destMat->numImportPacketsPerLID_.sync_host (); - Teuchos::ArrayView numImportPacketsPerLID = - getArrayViewFromDualView (destMat->numImportPacketsPerLID_); - + destMat->numExportPacketsPerLID_.sync_device(); + auto numExportPacketsPerLID = destMat->numExportPacketsPerLID_.view_device(); + auto numImportPacketsPerLID = destMat->numImportPacketsPerLID_.view_device(); if (verbose) { std::ostringstream os; os << *verbosePrefix << "Calling 3-arg doReversePostsAndWaits" << std::endl; std::cerr << os.str (); } - Distor.doReversePostsAndWaits(destMat->numExportPacketsPerLID_.view_host(), 1, - destMat->numImportPacketsPerLID_.view_host()); + Distor.doReversePostsAndWaits(numExportPacketsPerLID, 1, numImportPacketsPerLID); if (verbose) { std::ostringstream os; os << *verbosePrefix << "Finished 3-arg doReversePostsAndWaits" @@ -8326,34 +8319,26 @@ CrsMatrix:: std::cerr << os.str (); } - size_t totalImportPackets = 0; - for (Array_size_type i = 0; i < numImportPacketsPerLID.size (); ++i) { - totalImportPackets += numImportPacketsPerLID[i]; - } + size_t totalImportPackets = Kokkos::Experimental::reduce(typename Node::execution_space(), numImportPacketsPerLID); // Reallocation MUST go before setting the modified flag, // because it may clear out the flags. destMat->reallocImportsIfNeeded (totalImportPackets, verbose, verbosePrefix.get ()); destMat->imports_.modify_host (); - auto hostImports = destMat->imports_.view_host(); - // This is a legacy host pack/unpack path, so use the host - // version of exports_. - destMat->exports_.sync_host (); - auto hostExports = destMat->exports_.view_host(); + auto deviceImports = destMat->imports_.view_device(); + auto deviceExports = destMat->exports_.view_device(); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Calling 4-arg doReversePostsAndWaits" + os << *verbosePrefix << "Calling 4-arg doReversePostsAndWaitsKokkos" << std::endl; std::cerr << os.str (); } - Distor.doReversePostsAndWaits (hostExports, - numExportPacketsPerLID, - hostImports, - numImportPacketsPerLID); + destMat->imports_.sync_device(); + Distor.doReversePostsAndWaitsKokkos (deviceExports, numExportPacketsPerLID, deviceImports, numImportPacketsPerLID); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Finished 4-arg doReversePostsAndWaits" + os << *verbosePrefix << "Finished 4-arg doReversePostsAndWaitsKokkos" << std::endl; std::cerr << os.str (); } @@ -8396,23 +8381,16 @@ CrsMatrix:: << std::endl; std::cerr << os.str (); } - // Make sure that host has the latest version, since we're - // using the version on host. If host has the latest - // version, syncing to host does nothing. - destMat->numExportPacketsPerLID_.sync_host (); - Teuchos::ArrayView numExportPacketsPerLID = - getArrayViewFromDualView (destMat->numExportPacketsPerLID_); - destMat->numImportPacketsPerLID_.sync_host (); - Teuchos::ArrayView numImportPacketsPerLID = - getArrayViewFromDualView (destMat->numImportPacketsPerLID_); + destMat->numExportPacketsPerLID_.sync_device (); + auto numExportPacketsPerLID = destMat->numExportPacketsPerLID_.view_device(); + auto numImportPacketsPerLID = destMat->numImportPacketsPerLID_.view_device(); if (verbose) { std::ostringstream os; os << *verbosePrefix << "Calling 3-arg doPostsAndWaits" << std::endl; std::cerr << os.str (); } - Distor.doPostsAndWaits(destMat->numExportPacketsPerLID_.view_host(), 1, - destMat->numImportPacketsPerLID_.view_host()); + Distor.doPostsAndWaits(numExportPacketsPerLID, 1, numImportPacketsPerLID); if (verbose) { std::ostringstream os; os << *verbosePrefix << "Finished 3-arg doPostsAndWaits" @@ -8420,34 +8398,26 @@ CrsMatrix:: std::cerr << os.str (); } - size_t totalImportPackets = 0; - for (Array_size_type i = 0; i < numImportPacketsPerLID.size (); ++i) { - totalImportPackets += numImportPacketsPerLID[i]; - } + size_t totalImportPackets = Kokkos::Experimental::reduce(typename Node::execution_space(), numImportPacketsPerLID); // Reallocation MUST go before setting the modified flag, // because it may clear out the flags. destMat->reallocImportsIfNeeded (totalImportPackets, verbose, verbosePrefix.get ()); destMat->imports_.modify_host (); - auto hostImports = destMat->imports_.view_host(); - // This is a legacy host pack/unpack path, so use the host - // version of exports_. - destMat->exports_.sync_host (); - auto hostExports = destMat->exports_.view_host(); + auto deviceImports = destMat->imports_.view_device(); + auto deviceExports = destMat->exports_.view_device(); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Calling 4-arg doPostsAndWaits" + os << *verbosePrefix << "Calling 4-arg doPostsAndWaitsKokkos" << std::endl; std::cerr << os.str (); } - Distor.doPostsAndWaits (hostExports, - numExportPacketsPerLID, - hostImports, - numImportPacketsPerLID); + destMat->imports_.sync_device (); + Distor.doPostsAndWaitsKokkos (deviceExports, numExportPacketsPerLID, deviceImports, numImportPacketsPerLID); if (verbose) { std::ostringstream os; - os << *verbosePrefix << "Finished 4-arg doPostsAndWaits" + os << *verbosePrefix << "Finished 4-arg doPostsAndWaitsKokkos" << std::endl; std::cerr << os.str (); } @@ -8494,12 +8464,6 @@ CrsMatrix:: Teuchos::Array RemotePids; if (runOnHost) { Teuchos::Array TargetPids; - // Backwards compatibility measure. We'll use this again below. - - // TODO JHU Need to track down why numImportPacketsPerLID_ has not been corrently marked as modified on host (which it has been) - // TODO JHU somewhere above, e.g., call to Distor.doPostsAndWaits(). - // TODO JHU This only becomes apparent as we begin to convert TAFC to run on device. - destMat->numImportPacketsPerLID_.modify_host(); //FIXME # ifdef HAVE_TPETRA_MMM_TIMINGS RCP tmCopySPRdata = rcp(new TimeMonitor(*TimeMonitor::getNewTimer(prefix + std::string("TAFC unpack-count-resize + copy same-perm-remote data")))); @@ -8691,14 +8655,6 @@ CrsMatrix:: } else { // run on device - - // Backwards compatibility measure. We'll use this again below. - - // TODO JHU Need to track down why numImportPacketsPerLID_ has not been corrently marked as modified on host (which it has been) - // TODO JHU somewhere above, e.g., call to Distor.doPostsAndWaits(). - // TODO JHU This only becomes apparent as we begin to convert TAFC to run on device. - destMat->numImportPacketsPerLID_.modify_host(); //FIXME - # ifdef HAVE_TPETRA_MMM_TIMINGS RCP tmCopySPRdata = rcp(new TimeMonitor(*TimeMonitor::getNewTimer(prefix + std::string("TAFC unpack-count-resize + copy same-perm-remote data")))); # endif diff --git a/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp b/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp index 9b021ac53e9b..24e8351a6133 100644 --- a/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp +++ b/packages/tpetra/core/src/Tpetra_Details_DistributorActor.hpp @@ -22,6 +22,7 @@ #include "Teuchos_Time.hpp" #include "Kokkos_TeuchosCommAdapters.hpp" +#include "Kokkos_StdAlgorithms.hpp" #ifdef HAVE_TPETRA_MPI #include "mpi.h" @@ -53,6 +54,13 @@ class DistributorActor { const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); + template + void doPostsAndWaitsKokkos(const DistributorPlan& plan, + const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); + template void doPosts(const DistributorPlan& plan, const ExpView& exports, @@ -66,6 +74,27 @@ class DistributorActor { const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); + template + void doPostsKokkos(const DistributorPlan& plan, + const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); + + template + void doPostsAllToAllKokkos( + const DistributorPlan &plan, const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); + + template + void doPostsNbrAllToAllVKokkos( + const DistributorPlan &plan, const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); + void doWaits(const DistributorPlan& plan); bool isReady() const; @@ -147,6 +176,22 @@ void DistributorActor::doPostsAndWaits(const DistributorPlan& plan, doWaits(plan); } + +template +void DistributorActor::doPostsAndWaitsKokkos(const DistributorPlan& plan, + const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID) +{ + static_assert(areKokkosViews, + "Data arrays for DistributorActor::doPostsAndWaitsKokkos must be Kokkos::Views"); + static_assert(areKokkosViews, + "Num packets arrays for DistributorActor::doPostsAndWaitsKokkos must be Kokkos::Views"); + doPostsKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); + doWaits(plan); +} + template using HostAccessibility = Kokkos::SpaceAccessibility; @@ -760,6 +805,140 @@ void DistributorActor::doPostsAllToAll( << "\"."); } +template +void DistributorActor::doPostsAllToAllKokkos( + const DistributorPlan &plan, const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID) { + TEUCHOS_TEST_FOR_EXCEPTION( + !plan.getIndicesTo().is_null(), std::runtime_error, + "Send Type=\"Alltoall\" only works for fast-path communication."); + + using size_type = Teuchos::Array::size_type; + using ExpExecSpace = typename ExpPacketsView::execution_space; + using ImpExecSpace = typename ImpPacketsView::execution_space; + + auto comm = plan.getComm(); + Kokkos::View sendcounts("sendcounts", comm->getSize()); + Kokkos::View sdispls("sdispls", comm->getSize()); + Kokkos::View recvcounts("recvcounts", comm->getSize()); + Kokkos::View rdispls("rdispls", comm->getSize()); + + auto sendcounts_d = Kokkos::create_mirror_view(ExpExecSpace(), sendcounts); + auto sdispls_d = Kokkos::create_mirror_view(ExpExecSpace(), sdispls); + auto recvcounts_d = Kokkos::create_mirror_view(ImpExecSpace(), recvcounts); + auto rdispls_d = Kokkos::create_mirror_view(ImpExecSpace(), rdispls); + + auto getStartsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getStartsTo()); + auto getLengthsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsTo()); + auto getProcsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getProcsTo()); + + size_t curPKToffset = 0; + Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& offset, bool is_final) { + sdispls_d(getProcsTo(pp)) = offset; + size_t numPackets = 0; + for (size_t j = getStartsTo(pp); j < getStartsTo(pp) + getLengthsTo(pp); ++j) { + numPackets += numExportPacketsPerLID(j); + } + sendcounts_d(getProcsTo(pp)) = static_cast(numPackets); + offset += numPackets; + }, curPKToffset); + + int overflow; + Kokkos::parallel_reduce(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, int& index) { + if(sendcounts_d(getProcsTo(pp)) < 0) { + index = pp+1; + } + }, overflow); + + // numPackets is converted down to int, so make sure it can be represented + TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, + "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " + "Send count for send " + << overflow-1 << " is too large " + "to be represented as int."); + + const size_type actualNumReceives = + Teuchos::as(plan.getNumReceives()) + + Teuchos::as(plan.hasSelfMessage() ? 1 : 0); + + auto getLengthsFrom = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsFrom()); + auto getProcsFrom = Kokkos::Compat::getKokkosViewDeepCopy(plan.getProcsFrom()); + + Kokkos::View curLIDoffset("curLIDoffset", actualNumReceives); + Kokkos::parallel_scan(Kokkos::RangePolicy(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, size_t& offset, bool is_final) { + if(is_final) curLIDoffset(i) = offset; + offset += getLengthsFrom(i); + }); + + Kokkos::parallel_scan(Kokkos::RangePolicy(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, size_t& curBufferOffset, bool is_final) { + size_t totalPacketsFrom_i = 0; + for(size_t j = 0; j < getLengthsFrom(i); j++) { + totalPacketsFrom_i += numImportPacketsPerLID(curLIDoffset(i) + j); + } + + if(is_final) rdispls_d(getProcsFrom(i)) = curBufferOffset; + if(is_final) recvcounts_d(getProcsFrom(i)) = static_cast(totalPacketsFrom_i); + curBufferOffset += totalPacketsFrom_i; + }); + + Kokkos::parallel_reduce(Kokkos::RangePolicy(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, int& index) { + if(recvcounts_d(getProcsFrom(i)) < 0) { + index = i+1; + } + }, overflow); + + // totalPacketsFrom_i is converted down to int, so make sure it can be + // represented + TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, + "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " + "Recv count for receive " + << overflow-1 << " is too large " + "to be represented as int."); + + Kokkos::deep_copy(sendcounts, sendcounts_d); + Kokkos::deep_copy(sdispls, sdispls_d); + Kokkos::deep_copy(recvcounts, recvcounts_d); + Kokkos::deep_copy(rdispls, rdispls_d); + + Teuchos::RCP> mpiComm = + Teuchos::rcp_dynamic_cast>(comm); + Teuchos::RCP> rawComm = + mpiComm->getRawMpiComm(); + using T = typename ExpView::non_const_value_type; + MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits::getType(T()); + +#if defined(HAVE_TPETRACORE_MPI_ADVANCE) + if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) { + MPIX_Comm *mpixComm = *plan.getMPIXComm(); + TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error, + "MPIX_Comm is null in doPostsAllToAll \"" + << __FILE__ << ":" << __LINE__); + + const int err = MPIX_Alltoallv( + exports.data(), sendcounts.data(), sdispls.data(), rawType, + imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm); + + TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error, + "MPIX_Alltoallv failed with error \"" + << Teuchos::mpiErrorCodeToString(err) + << "\"."); + + return; + } +#endif // HAVE_TPETRACORE_MPI_ADVANCE + + const int err = MPI_Alltoallv( + exports.data(), sendcounts.data(), sdispls.data(), rawType, + imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)()); + + TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error, + "MPI_Alltoallv failed with error \"" + << Teuchos::mpiErrorCodeToString(err) + << "\"."); +} + #if defined(HAVE_TPETRACORE_MPI_ADVANCE) template void DistributorActor::doPostsNbrAllToAllV( @@ -840,6 +1019,117 @@ void DistributorActor::doPostsNbrAllToAllV( << Teuchos::mpiErrorCodeToString(err) << "\"."); } + +template +void DistributorActor::doPostsNbrAllToAllVKokkos( + const DistributorPlan &plan, const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID) { + TEUCHOS_TEST_FOR_EXCEPTION( + !plan.getIndicesTo().is_null(), std::runtime_error, + "Send Type=\"Alltoall\" only works for fast-path communication."); + + const Teuchos_Ordinal numSends = plan.getProcsTo().size(); + const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size(); + + auto comm = plan.getComm(); + Kokkos::View sendcounts("sendcounts", comm->getSize()); + Kokkos::View sdispls("sdispls", comm->getSize()); + Kokkos::View recvcounts("recvcounts", comm->getSize()); + Kokkos::View rdispls("rdispls", comm->getSize()); + + auto sendcounts_d = Kokkos::create_mirror_view(ExpExecSpace(), sendcounts); + auto sdispls_d = Kokkos::create_mirror_view(ExpExecSpace(), sdispls); + auto recvcounts_d = Kokkos::create_mirror_view(ImpExecSpace(), recvcounts); + auto rdispls_d = Kokkos::create_mirror_view(ImpExecSpace(), rdispls); + + auto getStartsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getStartsTo()); + auto getLengthsTo = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsTo()); + + Teuchos::RCP> mpiComm = + Teuchos::rcp_dynamic_cast>(comm); + Teuchos::RCP> rawComm = + mpiComm->getRawMpiComm(); + using T = typename ExpView::non_const_value_type; + using ExpExecSpace = typename ExpPacketsView::execution_space; + using ImpExecSpace = typename ImpPacketsView::execution_space; + MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits::getType(T()); + + // unlike standard alltoall, entry `i` in sdispls and sendcounts + // refer to the ith participating rank, rather than rank i + Kokkos::parallel_scan(Kokkos::RangePolicy(0, numSends), KOKKOS_LAMBDA(const Teuchos_Ordinal pp, size_t& curPKToffset, bool is_final) { + sdispls_d(pp) = curPKToffset; + size_t numPackets = 0; + for (size_t j = getStartsTo(pp); j < getStartsTo(pp) + getLengthsTo(pp); ++j) { + numPackets += numExportPacketsPerLID(j); + } + sendcounts_d(pp) = static_cast(numPackets); + curPKToffset += numPackets; + }); + + int overflow; + Kokkos::parallel_reduce(Kokkos::RangePolicy(0, numSends), KOKKOS_LAMBDA(const Teuchos_Ordinal pp, int& index) { + if(sendcounts_d(pp) < 0) { + index = i+1; + } + }, overflow); + + // numPackets is converted down to int, so make sure it can be represented + TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, + "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " + "Send count for send " + << overflow-1 << " is too large " + "to be represented as int."); + + auto getLengthsFrom = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsFrom()); + + Kokkos::View curLIDoffset("curLIDoffset", numRecvs); + Kokkos::parallel_scan(Kokkos::RangePolicy(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, size_t& offset, bool is_final) { + if(is_final) curLIDoffset(i) = offset; + offset += getLengthsFrom(i); + }); + + Kokkos::parallel_scan(Kokkos::RangePolicy(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, size_t& curBufferOffset, bool is_final) { + rdispls_d(i) = curBufferOffset; + size_t totalPacketsFrom_i = 0; + for(size_t j = 0; j < getLengthsFrom(i); j++) { + totalPacketsFrom_i += numImportPacketsPerLID(curLIDoffset(i) + j); + } + + recvcounts_d(i) = static_cast(totalPacketsFrom_i); + curBufferOffset += totalPacketsFrom_i; + }); + + Kokkos::parallel_reduce(Kokkos::RangePolicy(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, int& index) { + if(recvcounts_d(pp) < 0) { + index = i+1; + } + }, overflow); + + // totalPacketsFrom_i is converted down to int, so make sure it can be + // represented + TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error, + "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " + "Recv count for receive " + << overflow-1 << ") is too large " + "to be represented as int."); + + Kokkos::deep_copy(sendcounts, sendcounts_d); + Kokkos::deep_copy(sdispls, sdispls_d); + Kokkos::deep_copy(recvcounts, recvcounts_d); + Kokkos::deep_copy(rdispls, rdispls_d); + + MPIX_Comm *mpixComm = *plan.getMPIXComm(); + const int err = MPIX_Neighbor_alltoallv( + exports.data(), sendcounts.data(), sdispls.data(), rawType, + imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm); + + TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error, + "MPIX_Neighbor_alltoallv failed with error \"" + << Teuchos::mpiErrorCodeToString(err) + << "\"."); +} #endif // HAVE_TPETRACORE_MPI_ADVANCE #endif // HAVE_TPETRA_MPI // clang-format off @@ -1107,16 +1397,16 @@ void DistributorActor::doPosts(const DistributorPlan& plan, // This buffer is long enough for only one message at a time. // Thus, we use DISTRIBUTOR_SEND always in this case, regardless - // of sendType requested by user. + // of sendType requested by user. // This code path formerly errored out with message: - // Tpetra::Distributor::doPosts(4-arg, Kokkos): + // Tpetra::Distributor::doPosts(4-arg, Kokkos): // The "send buffer" code path // doesn't currently work with nonblocking sends. // Now, we opt to just do the communication in a way that works. #ifdef HAVE_TPETRA_DEBUG if (sendType != Details::DISTRIBUTOR_SEND) { if (plan.getComm()->getRank() == 0) - std::cout << "The requested Tpetra send type " + std::cout << "The requested Tpetra send type " << DistributorSendTypeEnumToString(sendType) << " requires Distributor data to be ordered by" << " the receiving processor rank. Since these" @@ -1125,7 +1415,7 @@ void DistributorActor::doPosts(const DistributorPlan& plan, } #endif - Kokkos::View sendArray ("sendArray", + Kokkos::View sendArray ("sendArray", maxNumPackets); Array indicesOffsets (numExportPacketsPerLID.size(), 0); @@ -1180,6 +1470,360 @@ void DistributorActor::doPosts(const DistributorPlan& plan, } } +template +void DistributorActor::doPostsKokkos(const DistributorPlan& plan, + const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID) +{ + static_assert(areKokkosViews, + "Data arrays for DistributorActor::doPostsKokkos must be Kokkos::Views"); + static_assert(areKokkosViews, + "Num packets arrays for DistributorActor::doPostsKokkos must be Kokkos::Views"); + using Teuchos::Array; + using Teuchos::as; + using Teuchos::ireceive; + using Teuchos::isend; + using Teuchos::send; + using Teuchos::TypeNameTraits; + using std::endl; + using Kokkos::Compat::create_const_view; + using Kokkos::Compat::create_view; + using Kokkos::Compat::subview_offset; + using Kokkos::Compat::deep_copy_offset; + using ExpExecSpace = typename ExpPacketsView::execution_space; + using ImpExecSpace = typename ImpPacketsView::execution_space; + typedef Array::size_type size_type; + typedef ExpView exports_view_type; + typedef ImpView imports_view_type; + +#ifdef KOKKOS_ENABLE_CUDA + static_assert (! std::is_same::value && + ! std::is_same::value, + "Please do not use Tpetra::Distributor with UVM " + "allocations. See GitHub issue #1088."); +#endif // KOKKOS_ENABLE_CUDA + +#ifdef KOKKOS_ENABLE_SYCL + static_assert (! std::is_same::value && + ! std::is_same::value, + "Please do not use Tpetra::Distributor with SharedUSM " + "allocations. See GitHub issue #1088 (corresponding to CUDA)."); +#endif // KOKKOS_ENABLE_SYCL + +#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS + Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_); +#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS + + // Run-time configurable parameters that come from the input + // ParameterList set by setParameterList(). + const Details::EDistributorSendType sendType = plan.getSendType(); + +#ifdef HAVE_TPETRA_MPI + // All-to-all communication layout is quite different from + // point-to-point, so we handle it separately. + if (sendType == Details::DISTRIBUTOR_ALLTOALL) { + doPostsAllToAllKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); + return; + } +#ifdef HAVE_TPETRACORE_MPI_ADVANCE + else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) + { + doPostsAllToAllKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); + return; + } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) { + doPostsNbrAllToAllVKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); + return; + } +#endif + +#else // HAVE_TPETRA_MPI + if (plan.hasSelfMessage()) { + size_t packetsPerSend; + Kokkos::parallel_reduce(Kokkos::RangePolicy(plan.getStartsTo()[0], plan.getStartsTo()[0]+plan.getLengthsTo()[0]), KOKKOS_LAMBDA(const size_t j, size_t& packets) { + packets += numExportPacketsPerLID(j); + }, packetsPerSend); + + deep_copy_offset(imports, exports, (size_t)0, (size_t)0, packetsPerSend); + } +#endif // HAVE_TPETRA_MPI + + const int myProcID = plan.getComm()->getRank (); + size_t selfReceiveOffset = 0; + +#ifdef HAVE_TPETRA_DEBUG + // Different messages may have different numbers of packets. + size_t totalNumImportPackets = Kokkos::Experimental::reduce(ImpExecSpace(), numImportPacketsPerLID); + TEUCHOS_TEST_FOR_EXCEPTION( + imports.extent (0) < totalNumImportPackets, std::runtime_error, + "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): The 'imports' array must have " + "enough entries to hold the expected number of import packets. " + "imports.extent(0) = " << imports.extent (0) << " < " + "totalNumImportPackets = " << totalNumImportPackets << "."); + TEUCHOS_TEST_FOR_EXCEPTION + (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::" + "doPostsKokkos(4 args, Kokkos): Process " << myProcID << ": requests_.size () = " + << requests_.size () << " != 0."); +#endif // HAVE_TPETRA_DEBUG + // Distributor uses requests_.size() as the number of outstanding + // nonblocking message requests, so we resize to zero to maintain + // this invariant. + // + // getNumReceives() does _not_ include the self message, if there is + // one. Here, we do actually send a message to ourselves, so we + // include any self message in the "actual" number of receives to + // post. + // + // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts() + // doesn't (re)allocate its array of requests. That happens in + // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on + // demand), or Resize_(). + const size_type actualNumReceives = as (plan.getNumReceives()) + + as (plan.hasSelfMessage() ? 1 : 0); + requests_.resize (0); + + // Post the nonblocking receives. It's common MPI wisdom to post + // receives before sends. In MPI terms, this means favoring + // adding to the "posted queue" (of receive requests) over adding + // to the "unexpected queue" (of arrived messages not yet matched + // with a receive). + { +#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS + Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_); +#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS + + size_t curBufferOffset = 0; + size_t curLIDoffset = 0; + for (size_type i = 0; i < actualNumReceives; ++i) { + size_t totalPacketsFrom_i = 0; + Kokkos::parallel_reduce(Kokkos::RangePolicy(0, plan.getLengthsFrom()[i]), KOKKOS_LAMBDA(const size_t j, size_t& total) { + total += numImportPacketsPerLID(curLIDoffset+j); + }, totalPacketsFrom_i); + // totalPacketsFrom_i is converted down to int, so make sure it can be represented + TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX), + std::logic_error, "Tpetra::Distributor::doPostsKokkos(3 args, Kokkos): " + "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large " + "to be represented as int."); + curLIDoffset += plan.getLengthsFrom()[i]; + if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) { + // If my process is receiving these packet(s) from another + // process (not a self-receive), and if there is at least + // one packet to receive: + // + // 1. Set up the persisting view (recvBuf) into the imports + // array, given the offset and size (total number of + // packets from process getProcsFrom()[i]). + // 2. Start the Irecv and save the resulting request. + imports_view_type recvBuf = + subview_offset (imports, curBufferOffset, totalPacketsFrom_i); + requests_.push_back (ireceive (recvBuf, plan.getProcsFrom()[i], + mpiTag_, *plan.getComm())); + } + else { // Receiving these packet(s) from myself + selfReceiveOffset = curBufferOffset; // Remember the offset + } + curBufferOffset += totalPacketsFrom_i; + } + } + +#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS + Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_); +#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS + + // setup views containing starting-offsets into exports for each send, + // and num-packets-to-send for each send. + Kokkos::View sendPacketOffsets("sendPacketOffsets", plan.getNumSends()); + Kokkos::View packetsPerSend("packetsPerSend", plan.getNumSends()); + auto sendPacketOffsets_d = Kokkos::create_mirror_view(ExpExecSpace(), sendPacketOffsets); + auto packetsPerSend_d = Kokkos::create_mirror_view(ExpExecSpace(), packetsPerSend); + + auto starts = Kokkos::Compat::getKokkosViewDeepCopy(plan.getStartsTo()); + auto lengths = Kokkos::Compat::getKokkosViewDeepCopy(plan.getLengthsTo()); + + Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& curPKToffset, bool final_pass) { + if(final_pass) sendPacketOffsets_d(pp) = curPKToffset; + size_t numPackets = 0; + for(size_t j = starts(pp); j < starts(pp) + lengths(pp); j++) { + numPackets += numExportPacketsPerLID(j); + } + if(final_pass) packetsPerSend_d(pp) = numPackets; + curPKToffset += numPackets; + }); + + size_t maxNumPackets; + Kokkos::parallel_reduce(Kokkos::RangePolicy(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& max) { + if(packetsPerSend_d(pp) > max) { + max = packetsPerSend_d(pp); + } + }, Kokkos::Max(maxNumPackets)); + + // numPackets will be used as a message length, so make sure it can be represented as int + TEUCHOS_TEST_FOR_EXCEPTION(maxNumPackets > size_t(INT_MAX), + std::logic_error, "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): " + "numPackets = " << maxNumPackets << " is too large " + "to be represented as int."); + + Kokkos::deep_copy(sendPacketOffsets, sendPacketOffsets_d); + Kokkos::deep_copy(packetsPerSend, packetsPerSend_d); + + // setup scan through getProcsTo() list starting with higher numbered procs + // (should help balance message traffic) + size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage(); + size_t procIndex = 0; + while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) { + ++procIndex; + } + if (procIndex == numBlocks) { + procIndex = 0; + } + + size_t selfNum = 0; + size_t selfIndex = 0; + if (plan.getIndicesTo().is_null()) { + +#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS + Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_); +#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS + + // Data are already blocked (laid out) by process, so we don't + // need a separate send buffer (besides the exports array). + for (size_t i = 0; i < numBlocks; ++i) { + size_t p = i + procIndex; + if (p > (numBlocks - 1)) { + p -= numBlocks; + } + + if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) { + exports_view_type tmpSend = + subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]); + + if (sendType == Details::DISTRIBUTOR_ISEND) { + exports_view_type tmpSendBuf = + subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]); + requests_.push_back (isend (tmpSendBuf, plan.getProcsTo()[p], + mpiTag_, *plan.getComm())); + } + else { // DISTRIBUTOR_SEND + send (tmpSend, + as (tmpSend.size ()), + plan.getProcsTo()[p], mpiTag_, *plan.getComm()); + } + } + else { // "Sending" the message to myself + selfNum = p; + } + } + + if (plan.hasSelfMessage()) { + deep_copy_offset(imports, exports, selfReceiveOffset, + sendPacketOffsets[selfNum], packetsPerSend[selfNum]); + } + } + else { // data are not blocked by proc, use send buffer + +#ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS + Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_); +#endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS + + // FIXME (mfh 05 Mar 2013) This may be broken for Isend. + typedef typename ExpView::non_const_value_type Packet; + typedef typename ExpView::array_layout Layout; + typedef typename ExpView::device_type Device; + typedef typename ExpView::memory_traits Mem; + + // This buffer is long enough for only one message at a time. + // Thus, we use DISTRIBUTOR_SEND always in this case, regardless + // of sendType requested by user. + // This code path formerly errored out with message: + // Tpetra::Distributor::doPostsKokkos(4-arg, Kokkos): + // The "send buffer" code path + // doesn't currently work with nonblocking sends. + // Now, we opt to just do the communication in a way that works. +#ifdef HAVE_TPETRA_DEBUG + if (sendType != Details::DISTRIBUTOR_SEND) { + if (plan.getComm()->getRank() == 0) + std::cout << "The requested Tpetra send type " + << DistributorSendTypeEnumToString(sendType) + << " requires Distributor data to be ordered by" + << " the receiving processor rank. Since these" + << " data are not ordered, Tpetra will use Send" + << " instead." << std::endl; + } +#endif + + Kokkos::View sendArray ("sendArray", + maxNumPackets); + + Kokkos::View indicesOffsets ("indicesOffsets", numExportPacketsPerLID.extent(0)); + size_t ioffset = 0; + Kokkos::parallel_scan(Kokkos::RangePolicy(0, numExportPacketsPerLID.extent(0)), KOKKOS_LAMBDA(const size_t j, size_t& offset, bool is_final) { + if(is_final) indicesOffsets(j) = offset; + offset += numExportPacketsPerLID(j); + }, ioffset); + + for (size_t i = 0; i < numBlocks; ++i) { + size_t p = i + procIndex; + if (p > (numBlocks - 1)) { + p -= numBlocks; + } + + if (plan.getProcsTo()[p] != myProcID) { + size_t j = plan.getStartsTo()[p]; + size_t numPacketsTo_p = 0; + //mirror in case execspaces are different + auto sendArrayMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), sendArray); + auto exportsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), exports); + Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getLengthsTo()[p]), KOKKOS_LAMBDA(const size_t k, size_t& offset, bool is_final) { + if(is_final) { + const size_t dst_end = offset + numExportPacketsPerLID(j + k); + const size_t src_end = indicesOffsets(j + k) + numExportPacketsPerLID(j + k); + auto dst_sub = Kokkos::subview(sendArrayMirror, Kokkos::make_pair(offset, dst_end)); + auto src_sub = Kokkos::subview(exportsMirror, Kokkos::make_pair(indicesOffsets(j + k), src_end)); + Kokkos::Experimental::local_deep_copy(dst_sub, src_sub); + } + offset += numExportPacketsPerLID(j + k); + }, numPacketsTo_p); + Kokkos::deep_copy(sendArray, sendArrayMirror); + typename ExpView::execution_space().fence(); + + if (numPacketsTo_p > 0) { + ImpView tmpSend = + subview_offset(sendArray, size_t(0), numPacketsTo_p); + + send (tmpSend, + as (tmpSend.size ()), + plan.getProcsTo()[p], mpiTag_, *plan.getComm()); + } + } + else { // "Sending" the message to myself + selfNum = p; + selfIndex = plan.getStartsTo()[p]; + } + } + + if (plan.hasSelfMessage()) { + //mirror in case execspaces are different + auto importsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), imports); + auto exportsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), exports); + size_t temp; + Kokkos::parallel_scan(Kokkos::RangePolicy(0, plan.getLengthsTo()[selfNum]), KOKKOS_LAMBDA(const size_t k, size_t& offset, bool is_final) { + if(is_final) { + const size_t dst_end = selfReceiveOffset + offset + numExportPacketsPerLID(selfIndex + k); + const size_t src_end = indicesOffsets(selfIndex + k) + numExportPacketsPerLID(selfIndex + k); + auto dst_sub = Kokkos::subview(importsMirror, Kokkos::make_pair(selfReceiveOffset + offset, dst_end)); + auto src_sub = Kokkos::subview(exportsMirror, Kokkos::make_pair(indicesOffsets(selfIndex + k), src_end)); + Kokkos::Experimental::local_deep_copy(dst_sub, src_sub); + } + offset += numExportPacketsPerLID(selfIndex + k); + }, temp); + Kokkos::deep_copy(imports, importsMirror); + selfIndex += plan.getLengthsTo()[selfNum]; + selfReceiveOffset += temp; + } + } +} + } } diff --git a/packages/tpetra/core/src/Tpetra_Distributor.hpp b/packages/tpetra/core/src/Tpetra_Distributor.hpp index c0c31a0f8b54..a8beece8ee9d 100644 --- a/packages/tpetra/core/src/Tpetra_Distributor.hpp +++ b/packages/tpetra/core/src/Tpetra_Distributor.hpp @@ -23,6 +23,7 @@ #include "KokkosCompat_View.hpp" #include "Kokkos_Core.hpp" #include "Kokkos_TeuchosCommAdapters.hpp" +#include "Kokkos_StdAlgorithms.hpp" #include #include #include @@ -426,6 +427,13 @@ namespace Tpetra { const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + doPostsAndWaitsKokkos (const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); + /// \brief Post the data for a forward plan, but do not execute the waits yet. /// /// Call this overload when you have the same number of Packets @@ -480,6 +488,13 @@ namespace Tpetra { const Teuchos::ArrayView& numExportPacketsPerLID, const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); + + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + doPostsKokkos (const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); /// \brief Execute the reverse communication plan. /// @@ -501,7 +516,14 @@ namespace Tpetra { const Teuchos::ArrayView& numExportPacketsPerLID, const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - + + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + doReversePostsAndWaitsKokkos (const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); + /// \brief Post the data for a reverse plan, but do not execute the waits yet. /// /// This method takes the same arguments as the three-argument @@ -522,7 +544,14 @@ namespace Tpetra { const Teuchos::ArrayView& numExportPacketsPerLID, const ImpView &imports, const Teuchos::ArrayView& numImportPacketsPerLID); - + + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + doReversePostsKokkos (const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID); + //@} //! @name Implementation of Teuchos::Describable //@{ @@ -640,6 +669,16 @@ namespace Tpetra { actor_.doPostsAndWaits(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); } + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + Distributor:: + doPostsAndWaitsKokkos (const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID) + { + actor_.doPostsAndWaitsKokkos(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); + } template typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type @@ -661,6 +700,17 @@ namespace Tpetra { { actor_.doPosts(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); } + + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + Distributor:: + doPostsKokkos (const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID) + { + actor_.doPostsKokkos(plan_, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); + } template typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type @@ -685,6 +735,19 @@ namespace Tpetra { numImportPacketsPerLID); doReverseWaits (); } + + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + Distributor:: + doReversePostsAndWaitsKokkos (const ExpView& exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView& imports, + const ImpPacketsView &numImportPacketsPerLID) + { + doReversePostsKokkos (exports, numExportPacketsPerLID, imports, + numImportPacketsPerLID); + doReverseWaits (); + } template typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type @@ -723,7 +786,27 @@ namespace Tpetra { reverseDistributor_->doPosts (exports, numExportPacketsPerLID, imports, numImportPacketsPerLID); } - + + template + typename std::enable_if<(Kokkos::is_view::value && Kokkos::is_view::value)>::type + Distributor:: + doReversePostsKokkos (const ExpView &exports, + const ExpPacketsView &numExportPacketsPerLID, + const ImpView &imports, + const ImpPacketsView &numImportPacketsPerLID) + { + // FIXME (mfh 29 Mar 2012) WHY? + TEUCHOS_TEST_FOR_EXCEPTION( + ! plan_.getIndicesTo().is_null(), std::runtime_error, + "Tpetra::Distributor::doReversePosts(3 args): Can only do " + "reverse communication when original data are blocked by process."); + if (reverseDistributor_.is_null ()) { + createReverseDistributor (); + } + reverseDistributor_->doPostsKokkos (exports, numExportPacketsPerLID, + imports, numImportPacketsPerLID); + } + template void Distributor:: computeSends(const Teuchos::ArrayView& importGIDs,