Skip to content

Commit

Permalink
Add more variants of LCI parcelport (mainly for research purposes).
Browse files Browse the repository at this point in the history
Add support for the zero-copy receive optimization.

- hpx.parcel.lci.protocol: Determine the communication primitives to use.
    - sendrecv: Only use LCI send and recv.
    - putsendrecv: Use LCI put, send, and recv.
    - putva: Use LCI put iovec.
- hpx.parcel.lci.comp_type: Determine the completion mechanism to use.
    - sync: Use LCI synchronizer.
    - queue: Use LCI completion queue.
- hpx.parcel.lci.progress_type: Determine the way to call LCI_progress (make progress on the LCI background works).
    - worker: Use worker threads to call LCI_progress.
    - pthread: Use a dedicated pthread to call LCI_progress.
    - rp: Use HPX resource partitioner to create a dedicate thread to call LCI_progress.
- hpx.parcel.lci.sendimm: Boolean. Whether to bypass the connection cache and parcel queues.
- hpx.parcel.lci.backlog_queue: Boolean. Whether to enable the backlog queue.
- hpx.parcel.lci.use_two_device: Boolean. Whether to use two LCI devices for the putva protocol.
- hpx.parcel.lci.prepost_recv_num: Int. Number of receives to post for the header messages. Only applies to the sendrecv protocol.
  • Loading branch information
JiakunYan authored and hkaiser committed May 22, 2023
1 parent f4ae7e9 commit e6d0ded
Show file tree
Hide file tree
Showing 32 changed files with 2,526 additions and 545 deletions.
64 changes: 11 additions & 53 deletions libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,75 +30,33 @@ namespace hpx { namespace util {

static void init_config(runtime_configuration& cfg);
static void init(int* argc, char*** argv, runtime_configuration& cfg);
static void setup(runtime_configuration& cfg);
static void finalize();

static void join_prg_thread_if_running();
static void progress_fn(LCI_device_t device);
static bool do_progress(LCI_device_t device);
static bool do_progress();

static bool enabled();

static int rank();
static int size();

static LCI_device_t& get_device_eager();
static LCI_device_t& get_device_iovec();

static LCI_endpoint_t& get_endpoint_eager();
static LCI_endpoint_t& get_endpoint_iovec();

static LCI_comp_t& get_scq();

static LCI_comp_t& get_rcq();

static std::string get_processor_name();

struct HPX_EXPORT scoped_lock
// Configurations:
// Log level
enum class log_level_t
{
scoped_lock();
scoped_lock(scoped_lock const&) = delete;
scoped_lock& operator=(scoped_lock const&) = delete;
~scoped_lock();
void unlock();
none,
profile,
debug
};

struct HPX_EXPORT scoped_try_lock
{
scoped_try_lock();
scoped_try_lock(scoped_try_lock const&) = delete;
scoped_try_lock& operator=(scoped_try_lock const&) = delete;
~scoped_try_lock();
void unlock();
bool locked;
};

typedef hpx::spinlock mutex_type;
static log_level_t log_level;
// Output filename of log
static FILE* log_outfile;
static void log(log_level_t level, const char* format, ...);

private:
static mutex_type mtx_;
static bool enabled_;
static bool setuped_;
static LCI_device_t device_eager;
static LCI_device_t device_iovec;
static LCI_endpoint_t ep_eager;
static LCI_endpoint_t ep_iovec;
static LCI_comp_t scq;
static LCI_comp_t rcq;
static std::unique_ptr<std::thread> prg_thread_eager_p;
static std::unique_ptr<std::thread> prg_thread_iovec_p;
static std::atomic<bool> prg_thread_flag;

public:
// configurations:
// whether to use separate devices/progress threads for eager and iovec messages.
static bool use_two_device;
// whether to bypass the parcel queue and connection cache.
static bool enable_send_immediate;
// whether to use HPX resource partitioner to run the LCI progress function.
static bool enable_lci_progress_pool;
// whether to enable the backlog queue and eager message aggregation
static bool enable_lci_backlog_queue;
};
}} // namespace hpx::util

Expand Down
255 changes: 55 additions & 200 deletions libs/core/lci_base/src/lci_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cstddef>
#include <cstdlib>
#include <memory>
#include <stdarg.h>
#include <string>

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -91,70 +92,56 @@ namespace hpx { namespace util {

namespace hpx { namespace util {

lci_environment::mutex_type lci_environment::mtx_;
bool lci_environment::enabled_ = false;
bool lci_environment::setuped_ = false;
LCI_device_t lci_environment::device_eager;
LCI_device_t lci_environment::device_iovec;
LCI_endpoint_t lci_environment::ep_eager;
LCI_endpoint_t lci_environment::ep_iovec;
LCI_comp_t lci_environment::scq;
LCI_comp_t lci_environment::rcq;
// We need this progress thread to send early parcels
std::unique_ptr<std::thread> lci_environment::prg_thread_eager_p = nullptr;
std::unique_ptr<std::thread> lci_environment::prg_thread_iovec_p = nullptr;
std::atomic<bool> lci_environment::prg_thread_flag = false;

bool lci_environment::use_two_device = false;
bool lci_environment::enable_send_immediate = false;
bool lci_environment::enable_lci_progress_pool = false;
bool lci_environment::enable_lci_backlog_queue = false;
lci_environment::log_level_t lci_environment::log_level = log_level_t::none;
FILE* lci_environment::log_outfile = nullptr;

///////////////////////////////////////////////////////////////////////////
void lci_environment::init_config(util::runtime_configuration& rtcfg)
{
enable_lci_progress_pool = hpx::util::get_entry_as<bool>(
rtcfg, "hpx.parcel.lci.rp_prg_pool", false /* Does not matter*/);
// The default value here does not matter here
// the key "hpx.parcel.lci.sendimm" is guaranteed to exist
enable_send_immediate = hpx::util::get_entry_as<bool>(
rtcfg, "hpx.parcel.lci.sendimm", false /* Does not matter*/);
enable_lci_backlog_queue = hpx::util::get_entry_as<bool>(
rtcfg, "hpx.parcel.lci.backlog_queue", false /* Does not matter*/);
use_two_device =
get_entry_as(rtcfg, "hpx.parcel.lci.use_two_device", false);

if (!enable_send_immediate && enable_lci_backlog_queue)
{
throw std::runtime_error(
"Backlog queue must be used with send_immediate enabled");
}
std::size_t num_threads =
hpx::util::get_entry_as<size_t>(rtcfg, "hpx.os_threads", 1);
if (enable_lci_progress_pool && num_threads <= 1)
{
enable_lci_progress_pool = false;
fprintf(
stderr, "WARNING: set enable_lci_progress_pool to false!\n");
}
if (use_two_device && enable_lci_progress_pool && num_threads <= 2)
{
use_two_device = false;
fprintf(stderr, "WARNING: set use_two_device to false!\n");
}
}

void set_affinity(pthread_t pthread_handler, size_t target)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(target, &cpuset);
int rv =
pthread_setaffinity_np(pthread_handler, sizeof(cpuset), &cpuset);
if (rv != 0)
std::string log_level_str = get_entry_as<std::string>(
rtcfg, "hpx.parcel.lci.log_level", "" /* Does not matter*/);
if (log_level_str == "none")
log_level = log_level_t::none;
else if (log_level_str == "profile")
log_level = log_level_t::profile;
else if (log_level_str == "debug")
log_level = log_level_t::debug;
else
throw std::runtime_error("Unknown log level " + log_level_str);
std::string log_filename = get_entry_as<std::string>(
rtcfg, "hpx.parcel.lci.log_outfile", "" /* Does not matter*/);
if (log_filename == "stderr")
log_outfile = stderr;
else if (log_filename == "stdout")
log_outfile = stdout;
else
{
fprintf(stderr, "ERROR %d thread affinity didn't work.\n", rv);
exit(1);
const int filename_max = 256;
char filename[filename_max];
char* p0_old = log_filename.data();
char* p0_new = strchr(log_filename.data(), '%');
char* p1 = filename;
while (p0_new)
{
long nbytes = p0_new - p0_old;
HPX_ASSERT(p1 + nbytes < filename + filename_max);
memcpy(p1, p0_old, nbytes);
p1 += nbytes;
nbytes =
snprintf(p1, filename + filename_max - p1, "%d", LCI_RANK);
p1 += nbytes;
p0_old = p0_new + 1;
p0_new = strchr(p0_old, '%');
}
strncat(p1, p0_old, filename + filename_max - p1 - 1);
log_outfile = fopen(filename, "w+");
if (log_outfile == nullptr)
{
throw std::runtime_error(
"Cannot open the logfile " + std::string(filename));
}
}
}

Expand Down Expand Up @@ -199,52 +186,8 @@ namespace hpx { namespace util {

rtcfg.add_entry("hpx.parcel.bootstrap", "lci");
rtcfg.add_entry("hpx.parcel.lci.rank", std::to_string(this_rank));
enabled_ = true;
}

void lci_environment::setup(util::runtime_configuration& rtcfg)
{
if (!enabled_)
return;
if (setuped_)
return; // don't call twice

init_config(rtcfg);
// create ep, scq, rcq
device_eager = LCI_UR_DEVICE;
if (use_two_device)
LCI_device_init(&device_iovec);
else
device_iovec = LCI_UR_DEVICE;
LCI_queue_create(LCI_UR_DEVICE /* no use */, &scq);
LCI_queue_create(LCI_UR_DEVICE /* no use */, &rcq);
LCI_plist_t plist_;
LCI_plist_create(&plist_);
LCI_plist_set_default_comp(plist_, rcq);
LCI_plist_set_comp_type(plist_, LCI_PORT_COMMAND, LCI_COMPLETION_QUEUE);
LCI_plist_set_comp_type(plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE);
LCI_endpoint_init(&ep_eager, device_eager, plist_);
LCI_endpoint_init(&ep_iovec, device_iovec, plist_);
LCI_plist_free(&plist_);

// create progress thread
HPX_ASSERT(prg_thread_flag == false);
HPX_ASSERT(prg_thread_eager_p == nullptr);
HPX_ASSERT(prg_thread_iovec_p == nullptr);
prg_thread_flag = true;
prg_thread_eager_p =
std::make_unique<std::thread>(progress_fn, get_device_eager());
if (use_two_device)
prg_thread_iovec_p =
std::make_unique<std::thread>(progress_fn, get_device_iovec());
int target = get_entry_as(rtcfg, "hpx.parcel.lci.prg_thread_core", -1);
if (target >= 0)
{
set_affinity(prg_thread_eager_p->native_handle(), target);
if (use_two_device)
set_affinity(prg_thread_iovec_p->native_handle(), target + 1);
}
setuped_ = true;
enabled_ = true;
}

std::string lci_environment::get_processor_name()
Expand All @@ -256,55 +199,21 @@ namespace hpx { namespace util {
{
if (enabled())
{
enabled_ = false;
// for some reasons, this code block can be entered twice when HPX exits
int lci_init = 0;
LCI_initialized(&lci_init);
if (lci_init)
{
if (setuped_)
{
join_prg_thread_if_running();
// create ep, scq, rcq
LCI_endpoint_free(&ep_iovec);
LCI_endpoint_free(&ep_eager);
LCI_queue_free(&scq);
LCI_queue_free(&rcq);
if (use_two_device)
LCI_device_free(&device_iovec);
}
LCI_finalize();
}
}
}

void lci_environment::join_prg_thread_if_running()
{
if (prg_thread_eager_p || prg_thread_iovec_p)
{
prg_thread_flag = false;
if (prg_thread_eager_p)
{
prg_thread_eager_p->join();
prg_thread_eager_p.reset();
}
if (prg_thread_iovec_p)
{
prg_thread_iovec_p->join();
prg_thread_iovec_p.reset();
}
}
}

void lci_environment::progress_fn(LCI_device_t device)
{
while (prg_thread_flag)
{
LCI_progress(device);
}
}

bool lci_environment::do_progress(LCI_device_t device)
{
if (!device)
return false;
LCI_error_t ret = LCI_progress(device);
HPX_ASSERT(ret == LCI_OK || ret == LCI_ERR_RETRY);
return ret == LCI_OK;
Expand All @@ -331,70 +240,16 @@ namespace hpx { namespace util {
return res;
}

LCI_device_t& lci_environment::get_device_eager()
{
return device_eager;
}

LCI_device_t& lci_environment::get_device_iovec()
{
return device_iovec;
}

LCI_endpoint_t& lci_environment::get_endpoint_eager()
{
return ep_eager;
}

LCI_endpoint_t& lci_environment::get_endpoint_iovec()
void lci_environment::log(
lci_environment::log_level_t level, const char* format, ...)
{
return ep_iovec;
}
va_list args;
va_start(args, format);

LCI_comp_t& lci_environment::get_scq()
{
return scq;
}
if (level <= log_level)
vfprintf(log_outfile, format, args);

LCI_comp_t& lci_environment::get_rcq()
{
return rcq;
}

lci_environment::scoped_lock::scoped_lock()
{
mtx_.lock();
}

lci_environment::scoped_lock::~scoped_lock()
{
mtx_.unlock();
}

void lci_environment::scoped_lock::unlock()
{
mtx_.unlock();
}

lci_environment::scoped_try_lock::scoped_try_lock()
: locked(true)
{
locked = mtx_.try_lock();
}

lci_environment::scoped_try_lock::~scoped_try_lock()
{
if (locked)
mtx_.unlock();
}

void lci_environment::scoped_try_lock::unlock()
{
if (!locked)
{
locked = false;
mtx_.unlock();
}
va_end(args);
}
}} // namespace hpx::util

Expand Down
Loading

0 comments on commit e6d0ded

Please sign in to comment.