Skip to content

Commit

Permalink
Draft multi node error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robadob committed Jul 24, 2023
1 parent dc399c0 commit 2780638
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 35 deletions.
9 changes: 2 additions & 7 deletions include/flamegpu/simulation/detail/MPISimRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ namespace detail {
* There may be multiple instances per GPU, if running small models on large GPUs.
*/
class MPISimRunner : public AbstractSimRunner {
std::vector<int>& err_detail_rank;
const int world_rank;

public:
enum Signal : unsigned int {
// MPISimRunner sets this to notify manager that it wants a new job
Expand All @@ -56,7 +53,7 @@ class MPISimRunner : public AbstractSimRunner {
* @param err_detail Structure to store error details on fast failure for main thread rethrow
* @param err_detail_rank Structure to world rank tied to err_detail
* @param world_rank MPI world rank
* @param err_detail Structure to store error details on fast failure for main thread rethrow
* @param err_detail)local Structure to store error details on failure for main thread to handle
* @param _total_runners Total number of runners executing
* @param _isSWIG Flag denoting whether it's a Python build of FLAMEGPU
*/
Expand All @@ -73,9 +70,7 @@ class MPISimRunner : public AbstractSimRunner {
std::queue<unsigned int> &log_export_queue,
std::mutex &log_export_queue_mutex,
std::condition_variable &log_export_queue_cdn,
std::vector<ErrorDetail> &err_detail,
std::vector<int> &err_detail_rank,
int world_rank,
std::vector<ErrorDetail> &err_detail_local,
unsigned int _total_runners,
bool _isSWIG);
/**
Expand Down
64 changes: 43 additions & 21 deletions src/flamegpu/simulation/CUDAEnsemble.cu
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
std::queue<unsigned int> log_export_queue;
std::mutex log_export_queue_mutex;
std::condition_variable log_export_queue_cdn;
// In OMP mode, Rank 0 will collect errors from all ranks
std::vector<detail::AbstractSimRunner::ErrorDetail> err_detail = {};
#ifdef FLAMEGPU_ENABLE_MPI
std::vector<int> err_detail_rank = {};
// In OMP mode, Rank 0 will collect errors from all ranks
std::multimap<int, detail::AbstractSimRunner::ErrorDetail> err_detail = {};
#endif
std::vector<detail::AbstractSimRunner::ErrorDetail> err_detail_local = {};

// Init log worker
detail::SimLogger* log_worker = nullptr;
Expand Down Expand Up @@ -236,7 +236,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
step_log_config, exit_log_config,
d, j,
config.verbosity,
run_logs, log_export_queue, log_export_queue_mutex, log_export_queue_cdn, err_detail, err_detail_rank, world_rank, TOTAL_RUNNERS, isSWIG);
run_logs, log_export_queue, log_export_queue_mutex, log_export_queue_cdn, err_detail_local, TOTAL_RUNNERS, isSWIG);
runners[i]->start();
++i;
}
Expand Down Expand Up @@ -272,12 +272,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
EnvelopeTag::RequestJob, // int tag
MPI_COMM_WORLD, // MPI_Comm communicator
&status); // MPI_Status*
{
// log_export_mutex is treated as our protection for race conditions on err_detail
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
err_detail.push_back(e_detail);
err_detail_rank.push_back(status.MPI_SOURCE);
}
err_detail.emplace(status.MPI_SOURCE, e_detail);
++err_count;
if (config.error_level == EnsembleConfig::Fast) {
// Skip to end to kill workers
Expand All @@ -294,15 +289,29 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
}
}
// Check whether local runners require a job assignment
for (auto &r : next_runs) {
for (unsigned int i = 0; i < next_runs.size(); ++i) {
auto &r = next_runs[i];
unsigned int run_id = r.load();
if (run_id == detail::MPISimRunner::Signal::RunFailed) {
// Fetch error detail
detail::AbstractSimRunner::ErrorDetail e_detail;
{
// log_export_mutex is treated as our protection for race conditions on err_detail
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
e_detail = err_detail[failed_run_id];
// Fetch corresponding error detail
bool success = false;
for (auto it = err_detail_local.begin(); it != err_detail_local.end(); ++it) {
if (it->runner_id == i) {
e_detail = *it;
err_detail.emplace(world_rank, e_detail);
err_detail_local.erase(it);
success = true;
break;
}
}
if (!success) {
THROW exception::UnknownInternalError("Management thread failed to locate reported error from runner %u, in CUDAEnsemble::simulate()", i);
}
}
++err_count;
if (config.error_level == EnsembleConfig::Fast) {
Expand All @@ -316,7 +325,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
}
if (config.verbosity > Verbosity::Quiet)
fprintf(stderr, "Warning: Run %u failed on rank %d, device %d, thread %u with exception: \n%s\n",
e_detail.run_id, e_detail.world_rank, e_detail.device_id, e_detail.runner_id, e_detail.exception_string);
e_detail.run_id, world_rank, e_detail.device_id, e_detail.runner_id, e_detail.exception_string);
}
run_id = detail::MPISimRunner::Signal::RequestJob;
}
Expand Down Expand Up @@ -377,13 +386,25 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
if (runner_status == detail::MPISimRunner::Signal::RunFailed) {
// Fetch the job id, increment local error counter
const unsigned int failed_run_id = err_cts[i].exchange(UINT_MAX);
++error_total;
++err_count;
// Fetch error detail
AbstractSimRunner::ErrorDetail e_detail;
detail::AbstractSimRunner::ErrorDetail e_detail;
{
// log_export_mutex is treated as our protection for race conditions on err_detail
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
e_detail = err_detail[failed_run_id];
// Fetch corresponding error detail
bool success = false;
for (auto it = err_detail_local.begin(); it != err_detail_local.end(); ++it) {
if (it->runner_id == i) {
e_detail = *it;
err_detail_local.erase(it);
success = true;
break;
}
}
if (!success) {
THROW exception::UnknownInternalError("Management thread failed to locate reported error from runner %u, in CUDAEnsemble::simulate()", i);
}
}
// Notify 0 that an error occurred, with the error detail
MPI_Send(
Expand All @@ -396,7 +417,6 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
runner_status = detail::MPISimRunner::Signal::RequestJob;
}
if (runner_status == detail::MPISimRunner::Signal::RequestJob) {
mpi_request_job:
// Send a job request to 0, these have no data
MPI_Send(
nullptr, // void* data
Expand Down Expand Up @@ -450,7 +470,7 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
step_log_config, exit_log_config,
d, j,
config.verbosity, config.error_level == EnsembleConfig::Fast,
run_logs, log_export_queue, log_export_queue_mutex, log_export_queue_cdn, err_detail, TOTAL_RUNNERS, isSWIG);
run_logs, log_export_queue, log_export_queue_mutex, log_export_queue_cdn, err_detail_local, TOTAL_RUNNERS, isSWIG);
runners[i++]->start();
}
}
Expand Down Expand Up @@ -525,12 +545,14 @@ unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
if (config.error_level == EnsembleConfig::Fast && err_count) {
if (config.mpi) {
#ifdef FLAMEGPU_ENABLE_MPI
THROW exception::EnsembleError("Run %u failed on rank %d, device %d, thread %u with exception: \n%s\n",
err_detail[0].run_id, err_detail_rank[0], err_detail[0].device_id, err_detail[0].runner_id, err_detail[0].exception_string);
for (const auto &e : err_detail) {
THROW exception::EnsembleError("Run %u failed on rank %d, device %d, thread %u with exception: \n%s\n",
e.second.run_id, e.first, e.second.device_id, e.second.runner_id, e.second.exception_string);
}
#endif
} else {
THROW exception::EnsembleError("Run %u failed on device %d, thread %u with exception: \n%s\n",
err_detail[0].run_id, err_detail[0].device_id, err_detail[0].runner_id, err_detail[0].exception_string);
err_detail_local[0].run_id, err_detail_local[0].device_id, err_detail_local[0].runner_id, err_detail_local[0].exception_string);
}
} else if (config.error_level == EnsembleConfig::Slow && err_count) {
THROW exception::EnsembleError("%u/%u runs failed!\n.", err_count, static_cast<unsigned int>(plans.size()));
Expand Down
10 changes: 3 additions & 7 deletions src/flamegpu/simulation/detail/MPISimRunner.cu
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ MPISimRunner::MPISimRunner(const std::shared_ptr<const ModelData> _model,
std::queue<unsigned int>& _log_export_queue,
std::mutex& _log_export_queue_mutex,
std::condition_variable& _log_export_queue_cdn,
std::vector<ErrorDetail>& _err_detail,
std::vector<int> &_err_detail_rank,
int _world_rank,
std::vector<ErrorDetail>& _err_detail_local,
const unsigned int _total_runners,
bool _isSWIG)
: AbstractSimRunner(
Expand All @@ -47,11 +45,10 @@ MPISimRunner::MPISimRunner(const std::shared_ptr<const ModelData> _model,
_log_export_queue,
_log_export_queue_mutex,
_log_export_queue_cdn,
_err_detail,
_err_detail_local,
_total_runners,
_isSWIG)
, err_detail_rank(_err_detail_rank)
, world_rank(_world_rank) { }
{ }


void MPISimRunner::main() {
Expand All @@ -71,7 +68,6 @@ void MPISimRunner::main() {
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
log_export_queue.push(UINT_MAX);
// Build the error detail (fixed len char array for string)
err_detail_rank.push_back(world_rank);
err_detail.push_back(ErrorDetail{run_id, static_cast<unsigned int>(device_id), runner_id, });
strncpy(err_detail.back().exception_string, e.what(), sizeof(ErrorDetail::exception_string));
err_detail.back().exception_string[sizeof(ErrorDetail::exception_string) - 1] = '\0';
Expand Down

0 comments on commit 2780638

Please sign in to comment.