diff --git a/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp b/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp index 9064e0129d41..7997f78885ee 100644 --- a/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp +++ b/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp @@ -30,75 +30,33 @@ namespace hpx { namespace util { static void init_config(runtime_configuration& cfg); static void init(int* argc, char*** argv, runtime_configuration& cfg); - static void setup(runtime_configuration& cfg); static void finalize(); - static void join_prg_thread_if_running(); - static void progress_fn(LCI_device_t device); static bool do_progress(LCI_device_t device); + static bool do_progress(); static bool enabled(); static int rank(); static int size(); - static LCI_device_t& get_device_eager(); - static LCI_device_t& get_device_iovec(); - - static LCI_endpoint_t& get_endpoint_eager(); - static LCI_endpoint_t& get_endpoint_iovec(); - - static LCI_comp_t& get_scq(); - - static LCI_comp_t& get_rcq(); - static std::string get_processor_name(); - struct HPX_EXPORT scoped_lock + // Configurations: + // Log level + enum class log_level_t { - scoped_lock(); - scoped_lock(scoped_lock const&) = delete; - scoped_lock& operator=(scoped_lock const&) = delete; - ~scoped_lock(); - void unlock(); + none, + profile, + debug }; - - struct HPX_EXPORT scoped_try_lock - { - scoped_try_lock(); - scoped_try_lock(scoped_try_lock const&) = delete; - scoped_try_lock& operator=(scoped_try_lock const&) = delete; - ~scoped_try_lock(); - void unlock(); - bool locked; - }; - - typedef hpx::spinlock mutex_type; + static log_level_t log_level; + // Output filename of log + static FILE* log_outfile; + static void log(log_level_t level, const char* format, ...); private: - static mutex_type mtx_; static bool enabled_; - static bool setuped_; - static LCI_device_t device_eager; - static LCI_device_t device_iovec; - static LCI_endpoint_t ep_eager; - static LCI_endpoint_t ep_iovec; - static LCI_comp_t scq; - static LCI_comp_t rcq; - static std::unique_ptr prg_thread_eager_p; - static std::unique_ptr prg_thread_iovec_p; - static std::atomic prg_thread_flag; - - public: - // configurations: - // whether to use separate devices/progress threads for eager and iovec messages. - static bool use_two_device; - // whether to bypass the parcel queue and connection cache. - static bool enable_send_immediate; - // whether to use HPX resource partitioner to run the LCI progress function. - static bool enable_lci_progress_pool; - // whether to enable the backlog queue and eager message aggregation - static bool enable_lci_backlog_queue; }; }} // namespace hpx::util diff --git a/libs/core/lci_base/src/lci_environment.cpp b/libs/core/lci_base/src/lci_environment.cpp index cebbcfed7c79..1ed9e55ba1c8 100644 --- a/libs/core/lci_base/src/lci_environment.cpp +++ b/libs/core/lci_base/src/lci_environment.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include /////////////////////////////////////////////////////////////////////////////// @@ -91,70 +92,56 @@ namespace hpx { namespace util { namespace hpx { namespace util { - lci_environment::mutex_type lci_environment::mtx_; bool lci_environment::enabled_ = false; - bool lci_environment::setuped_ = false; - LCI_device_t lci_environment::device_eager; - LCI_device_t lci_environment::device_iovec; - LCI_endpoint_t lci_environment::ep_eager; - LCI_endpoint_t lci_environment::ep_iovec; - LCI_comp_t lci_environment::scq; - LCI_comp_t lci_environment::rcq; - // We need this progress thread to send early parcels - std::unique_ptr lci_environment::prg_thread_eager_p = nullptr; - std::unique_ptr lci_environment::prg_thread_iovec_p = nullptr; - std::atomic lci_environment::prg_thread_flag = false; - - bool lci_environment::use_two_device = false; - bool lci_environment::enable_send_immediate = false; - bool lci_environment::enable_lci_progress_pool = false; - bool lci_environment::enable_lci_backlog_queue = false; + lci_environment::log_level_t lci_environment::log_level = log_level_t::none; + FILE* lci_environment::log_outfile = nullptr; /////////////////////////////////////////////////////////////////////////// void lci_environment::init_config(util::runtime_configuration& rtcfg) { - enable_lci_progress_pool = hpx::util::get_entry_as( - rtcfg, "hpx.parcel.lci.rp_prg_pool", false /* Does not matter*/); // The default value here does not matter here - // the key "hpx.parcel.lci.sendimm" is guaranteed to exist - enable_send_immediate = hpx::util::get_entry_as( - rtcfg, "hpx.parcel.lci.sendimm", false /* Does not matter*/); - enable_lci_backlog_queue = hpx::util::get_entry_as( - rtcfg, "hpx.parcel.lci.backlog_queue", false /* Does not matter*/); - use_two_device = - get_entry_as(rtcfg, "hpx.parcel.lci.use_two_device", false); - - if (!enable_send_immediate && enable_lci_backlog_queue) - { - throw std::runtime_error( - "Backlog queue must be used with send_immediate enabled"); - } - std::size_t num_threads = - hpx::util::get_entry_as(rtcfg, "hpx.os_threads", 1); - if (enable_lci_progress_pool && num_threads <= 1) - { - enable_lci_progress_pool = false; - fprintf( - stderr, "WARNING: set enable_lci_progress_pool to false!\n"); - } - if (use_two_device && enable_lci_progress_pool && num_threads <= 2) - { - use_two_device = false; - fprintf(stderr, "WARNING: set use_two_device to false!\n"); - } - } - - void set_affinity(pthread_t pthread_handler, size_t target) - { - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(target, &cpuset); - int rv = - pthread_setaffinity_np(pthread_handler, sizeof(cpuset), &cpuset); - if (rv != 0) + std::string log_level_str = get_entry_as( + rtcfg, "hpx.parcel.lci.log_level", "" /* Does not matter*/); + if (log_level_str == "none") + log_level = log_level_t::none; + else if (log_level_str == "profile") + log_level = log_level_t::profile; + else if (log_level_str == "debug") + log_level = log_level_t::debug; + else + throw std::runtime_error("Unknown log level " + log_level_str); + std::string log_filename = get_entry_as( + rtcfg, "hpx.parcel.lci.log_outfile", "" /* Does not matter*/); + if (log_filename == "stderr") + log_outfile = stderr; + else if (log_filename == "stdout") + log_outfile = stdout; + else { - fprintf(stderr, "ERROR %d thread affinity didn't work.\n", rv); - exit(1); + const int filename_max = 256; + char filename[filename_max]; + char* p0_old = log_filename.data(); + char* p0_new = strchr(log_filename.data(), '%'); + char* p1 = filename; + while (p0_new) + { + long nbytes = p0_new - p0_old; + HPX_ASSERT(p1 + nbytes < filename + filename_max); + memcpy(p1, p0_old, nbytes); + p1 += nbytes; + nbytes = + snprintf(p1, filename + filename_max - p1, "%d", LCI_RANK); + p1 += nbytes; + p0_old = p0_new + 1; + p0_new = strchr(p0_old, '%'); + } + strncat(p1, p0_old, filename + filename_max - p1 - 1); + log_outfile = fopen(filename, "w+"); + if (log_outfile == nullptr) + { + throw std::runtime_error( + "Cannot open the logfile " + std::string(filename)); + } } } @@ -199,52 +186,8 @@ namespace hpx { namespace util { rtcfg.add_entry("hpx.parcel.bootstrap", "lci"); rtcfg.add_entry("hpx.parcel.lci.rank", std::to_string(this_rank)); - enabled_ = true; - } - - void lci_environment::setup(util::runtime_configuration& rtcfg) - { - if (!enabled_) - return; - if (setuped_) - return; // don't call twice - init_config(rtcfg); - // create ep, scq, rcq - device_eager = LCI_UR_DEVICE; - if (use_two_device) - LCI_device_init(&device_iovec); - else - device_iovec = LCI_UR_DEVICE; - LCI_queue_create(LCI_UR_DEVICE /* no use */, &scq); - LCI_queue_create(LCI_UR_DEVICE /* no use */, &rcq); - LCI_plist_t plist_; - LCI_plist_create(&plist_); - LCI_plist_set_default_comp(plist_, rcq); - LCI_plist_set_comp_type(plist_, LCI_PORT_COMMAND, LCI_COMPLETION_QUEUE); - LCI_plist_set_comp_type(plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); - LCI_endpoint_init(&ep_eager, device_eager, plist_); - LCI_endpoint_init(&ep_iovec, device_iovec, plist_); - LCI_plist_free(&plist_); - - // create progress thread - HPX_ASSERT(prg_thread_flag == false); - HPX_ASSERT(prg_thread_eager_p == nullptr); - HPX_ASSERT(prg_thread_iovec_p == nullptr); - prg_thread_flag = true; - prg_thread_eager_p = - std::make_unique(progress_fn, get_device_eager()); - if (use_two_device) - prg_thread_iovec_p = - std::make_unique(progress_fn, get_device_iovec()); - int target = get_entry_as(rtcfg, "hpx.parcel.lci.prg_thread_core", -1); - if (target >= 0) - { - set_affinity(prg_thread_eager_p->native_handle(), target); - if (use_two_device) - set_affinity(prg_thread_iovec_p->native_handle(), target + 1); - } - setuped_ = true; + enabled_ = true; } std::string lci_environment::get_processor_name() @@ -256,55 +199,21 @@ namespace hpx { namespace util { { if (enabled()) { + enabled_ = false; // for some reasons, this code block can be entered twice when HPX exits int lci_init = 0; LCI_initialized(&lci_init); if (lci_init) { - if (setuped_) - { - join_prg_thread_if_running(); - // create ep, scq, rcq - LCI_endpoint_free(&ep_iovec); - LCI_endpoint_free(&ep_eager); - LCI_queue_free(&scq); - LCI_queue_free(&rcq); - if (use_two_device) - LCI_device_free(&device_iovec); - } LCI_finalize(); } } } - void lci_environment::join_prg_thread_if_running() - { - if (prg_thread_eager_p || prg_thread_iovec_p) - { - prg_thread_flag = false; - if (prg_thread_eager_p) - { - prg_thread_eager_p->join(); - prg_thread_eager_p.reset(); - } - if (prg_thread_iovec_p) - { - prg_thread_iovec_p->join(); - prg_thread_iovec_p.reset(); - } - } - } - - void lci_environment::progress_fn(LCI_device_t device) - { - while (prg_thread_flag) - { - LCI_progress(device); - } - } - bool lci_environment::do_progress(LCI_device_t device) { + if (!device) + return false; LCI_error_t ret = LCI_progress(device); HPX_ASSERT(ret == LCI_OK || ret == LCI_ERR_RETRY); return ret == LCI_OK; @@ -331,70 +240,16 @@ namespace hpx { namespace util { return res; } - LCI_device_t& lci_environment::get_device_eager() - { - return device_eager; - } - - LCI_device_t& lci_environment::get_device_iovec() - { - return device_iovec; - } - - LCI_endpoint_t& lci_environment::get_endpoint_eager() - { - return ep_eager; - } - - LCI_endpoint_t& lci_environment::get_endpoint_iovec() + void lci_environment::log( + lci_environment::log_level_t level, const char* format, ...) { - return ep_iovec; - } + va_list args; + va_start(args, format); - LCI_comp_t& lci_environment::get_scq() - { - return scq; - } + if (level <= log_level) + vfprintf(log_outfile, format, args); - LCI_comp_t& lci_environment::get_rcq() - { - return rcq; - } - - lci_environment::scoped_lock::scoped_lock() - { - mtx_.lock(); - } - - lci_environment::scoped_lock::~scoped_lock() - { - mtx_.unlock(); - } - - void lci_environment::scoped_lock::unlock() - { - mtx_.unlock(); - } - - lci_environment::scoped_try_lock::scoped_try_lock() - : locked(true) - { - locked = mtx_.try_lock(); - } - - lci_environment::scoped_try_lock::~scoped_try_lock() - { - if (locked) - mtx_.unlock(); - } - - void lci_environment::scoped_try_lock::unlock() - { - if (!locked) - { - locked = false; - mtx_.unlock(); - } + va_end(args); } }} // namespace hpx::util diff --git a/libs/full/parcelport_lci/CMakeLists.txt b/libs/full/parcelport_lci/CMakeLists.txt index b261e809f28f..021e338a58c3 100644 --- a/libs/full/parcelport_lci/CMakeLists.txt +++ b/libs/full/parcelport_lci/CMakeLists.txt @@ -11,21 +11,43 @@ endif() list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") set(parcelport_lci_headers + hpx/parcelport_lci/config.hpp hpx/parcelport_lci/header.hpp hpx/parcelport_lci/locality.hpp - hpx/parcelport_lci/receiver.hpp - hpx/parcelport_lci/sender.hpp - hpx/parcelport_lci/sender_connection.hpp + hpx/parcelport_lci/receiver_base.hpp + hpx/parcelport_lci/sender_base.hpp + hpx/parcelport_lci/sender_connection_base.hpp hpx/parcelport_lci/backlog_queue.hpp hpx/parcelport_lci/parcelport_lci.hpp + hpx/parcelport_lci/putva/sender_putva.hpp + hpx/parcelport_lci/putva/sender_connection_putva.hpp + hpx/parcelport_lci/putva/receiver_putva.hpp + hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp + hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp + hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp + hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp + hpx/parcelport_lci/completion_manager_base.hpp + hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp + hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp ) # cmake-format: off set(parcelport_lci_compat_headers) # cmake-format: on -set(parcelport_lci_sources locality.cpp parcelport_lci.cpp sender.cpp - sender_connection.cpp backlog_queue.cpp +set(parcelport_lci_sources + config.cpp + locality.cpp + parcelport_lci.cpp + backlog_queue.cpp + sender_connection_base.cpp + sender_base.cpp + putva/sender_putva.cpp + putva/sender_connection_putva.cpp + sendrecv/sender_sendrecv.cpp + sendrecv/sender_connection_sendrecv.cpp + sendrecv/receiver_sendrecv.cpp + sendrecv/receiver_connection_sendrecv.cpp ) include(HPX_SetupLCI) diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp index 930536565186..1051b511de49 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/backlog_queue.hpp @@ -15,9 +15,10 @@ #include namespace hpx::parcelset::policies::lci { - struct sender_connection; + struct completion_manager_base; + struct sender_connection_base; namespace backlog_queue { - using message_type = sender_connection; + using message_type = sender_connection_base; using message_ptr = std::shared_ptr; struct backlog_queue_t { @@ -27,7 +28,8 @@ namespace hpx::parcelset::policies::lci { void push(message_ptr message); bool empty(int dst_rank); - bool background_work(size_t num_thread) noexcept; + bool background_work(completion_manager_base* completion_manager, + size_t num_thread) noexcept; void free(); } // namespace backlog_queue } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp new file mode 100644 index 000000000000..d013779110cf --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp @@ -0,0 +1,56 @@ +// Copyright (c) 2014-2023 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +namespace hpx::parcelset::policies::lci { + struct completion_manager_queue : public completion_manager_base + { + completion_manager_queue() + { + LCI_queue_create(LCI_UR_DEVICE, &queue); + } + + ~completion_manager_queue() + { + LCI_queue_free(&queue); + } + + LCI_comp_t alloc_completion() + { + return queue; + } + + void enqueue_completion(LCI_comp_t comp) + { + HPX_UNUSED(comp); + } + + LCI_request_t poll() + { + LCI_request_t request; + request.flag = LCI_ERR_RETRY; + LCI_queue_pop(queue, &request); + return request; + } + + LCI_comp_t get_completion_object() + { + return queue; + } + + private: + LCI_comp_t queue; + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp new file mode 100644 index 000000000000..33ab477cbdef --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp @@ -0,0 +1,72 @@ +// Copyright (c) 2014-2023 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +#include + +namespace hpx::parcelset::policies::lci { + struct completion_manager_sync : public completion_manager_base + { + completion_manager_sync() {} + + ~completion_manager_sync() {} + + LCI_comp_t alloc_completion() + { + LCI_comp_t sync; + LCI_sync_create(LCI_UR_DEVICE, 1, &sync); + return sync; + } + + void enqueue_completion(LCI_comp_t comp) + { + std::unique_lock l(lock); + sync_list.push_back(comp); + } + + LCI_request_t poll() + { + LCI_request_t request; + request.flag = LCI_ERR_RETRY; + if (sync_list.empty()) + { + return request; + } + { + std::unique_lock l(lock, std::try_to_lock); + if (l.owns_lock() && !sync_list.empty()) + { + LCI_comp_t sync = sync_list.front(); + sync_list.pop_front(); + LCI_error_t ret = LCI_sync_test(sync, &request); + if (ret == LCI_OK) + { + HPX_ASSERT(request.flag == LCI_OK); + LCI_sync_free(&sync); + } + else + { + sync_list.push_back(sync); + } + } + } + return request; + } + + private: + hpx::spinlock lock; + std::deque sync_list; + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp new file mode 100644 index 000000000000..914e954d18bf --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp @@ -0,0 +1,29 @@ +// Copyright (c) 2014-2023 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +namespace hpx::parcelset::policies::lci { + struct completion_manager_base + { + virtual ~completion_manager_base() {} + virtual LCI_comp_t alloc_completion() = 0; + virtual void enqueue_completion(LCI_comp_t comp) = 0; + virtual LCI_request_t poll() = 0; + virtual LCI_comp_t get_completion_object() + { + return nullptr; + } + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp new file mode 100644 index 000000000000..5d224f7b52c8 --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp @@ -0,0 +1,54 @@ +// Copyright (c) 2023 Jiakun Yan +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +namespace hpx::parcelset::policies::lci { + struct config_t + { + // whether init_config has been called + static bool is_initialized; + // whether to use separate devices/progress threads for eager and iovec messages. + static bool use_two_device; + // whether to bypass the parcel queue and connection cache. + static bool enable_send_immediate; + // whether to enable the backlog queue and eager message aggregation + static bool enable_lci_backlog_queue; + // which protocol to use + enum class protocol_t + { + putva, + sendrecv, + putsendrecv, + }; + static protocol_t protocol; + // which completion mechanism to use + static LCI_comp_type_t completion_type; + // how to run LCI_progress + enum class progress_type_t + { + rp, // HPX resource partitioner + pthread, // Normal pthread + worker, // HPX worker thread + }; + static progress_type_t progress_type; + // Which core to pin the progress threads + static int progress_thread_core; + // How many pre-posted receives for new messages + // (can only be applied to `sendrecv` protocol). + static int prepost_recv_num; + + static void init_config(util::runtime_configuration const& rtcfg); + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp index c5fcb1de1a79..4401e0751b2c 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp @@ -28,21 +28,23 @@ namespace hpx::parcelset::policies::lci { { // siguature for assert_valid pos_signature = 0, + // tag + pos_tag = 1 * sizeof(value_type), // non-zero-copy chunk size - pos_numbytes_nonzero_copy = 1 * sizeof(value_type), + pos_numbytes_nonzero_copy = 2 * sizeof(value_type), // transmission chunk size - pos_numbytes_tchunk = 2 * sizeof(value_type), + pos_numbytes_tchunk = 3 * sizeof(value_type), // how many bytes in total (including zero-copy and non-zero-copy chunks) - pos_numbytes = 3 * sizeof(value_type), + pos_numbytes = 4 * sizeof(value_type), // zero-copy chunk number - pos_numchunks_zero_copy = 4 * sizeof(value_type), + pos_numchunks_zero_copy = 5 * sizeof(value_type), // non-zero-copy chunk number - pos_numchunks_nonzero_copy = 5 * sizeof(value_type), + pos_numchunks_nonzero_copy = 6 * sizeof(value_type), // whether piggyback data - pos_piggy_back_flag_data = 6 * sizeof(value_type), + pos_piggy_back_flag_data = 7 * sizeof(value_type), // whether piggyback transmission chunk - pos_piggy_back_flag_tchunk = 6 * sizeof(value_type) + 1, - pos_piggy_back_address = 6 * sizeof(value_type) + 2 + pos_piggy_back_flag_tchunk = 7 * sizeof(value_type) + 1, + pos_piggy_back_address = 7 * sizeof(value_type) + 2 }; template @@ -51,6 +53,7 @@ namespace hpx::parcelset::policies::lci { { HPX_ASSERT(max_header_size >= pos_piggy_back_address); data_ = header_buffer; + memset(data_, 0, pos_piggy_back_address); std::int64_t size = static_cast(buffer.data_.size()); std::int64_t numbytes = static_cast(buffer.data_size_); @@ -138,6 +141,16 @@ namespace hpx::parcelset::policies::lci { return get(); } + void set_tag(LCI_tag_t tag) noexcept + { + set(static_cast(tag)); + } + + value_type get_tag() const noexcept + { + return get(); + } + value_type numbytes_nonzero_copy() const noexcept { return get(); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp index d3067173bdce..2852f69ebb98 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp @@ -12,13 +12,14 @@ #include +#include #include #include +#include #include #include -#include -#include -#include +#include +#include #include #include @@ -34,7 +35,7 @@ namespace hpx::parcelset { template <> struct connection_handler_traits { - using connection_type = policies::lci::sender_connection; + using connection_type = policies::lci::sender_connection_base; using send_early_parcel = std::true_type; using do_background_work = std::true_type; using send_immediate_parcels = std::true_type; @@ -65,7 +66,7 @@ namespace hpx::parcelset { util::runtime_configuration const& ini); public: - using sender_type = sender; + using sender_type = sender_base; parcelport(util::runtime_configuration const& ini, threads::policies::callback_notifier const& notifier); @@ -81,7 +82,7 @@ namespace hpx::parcelset { /// Return the name of this locality std::string get_locality_name() const override; - std::shared_ptr create_connection( + std::shared_ptr create_connection( parcelset::locality const& l, error_code&); parcelset::locality agas_locality( @@ -100,18 +101,53 @@ namespace hpx::parcelset { bool can_send_immediate(); - // whether the parcelport is sending early parcels - static bool is_sending_early_parcel; + bool send_immediate(parcelset::parcelport* pp, + parcelset::locality const& dest, + sender_base::parcel_buffer_type buffer, + sender_base::callback_fn_type&& callbackFn); private: using mutex_type = hpx::spinlock; - std::atomic stopped_; - - sender sender_; - receiver receiver_; + std::shared_ptr sender_p; + std::shared_ptr receiver_p; void io_service_work(); + + public: + // States + // whether the parcelport has been initialized + // (starting to execute the background works) + std::atomic is_initialized = false; + // whether the parcelport is sending early parcels + std::atomic is_sending_early_parcel = false; + + // LCI objects + LCI_device_t device_eager; + LCI_device_t device_iovec; + LCI_endpoint_t endpoint_new_eager; + LCI_endpoint_t endpoint_followup; + LCI_endpoint_t endpoint_new_iovec; + + // Parcelport objects + static std::atomic prg_thread_flag; + std::unique_ptr prg_thread_eager_p; + std::unique_ptr prg_thread_iovec_p; + std::shared_ptr send_completion_manager; + std::shared_ptr + recv_new_completion_manager; + std::shared_ptr + recv_followup_completion_manager; + + bool do_progress(); + + private: + static void progress_thread_fn(LCI_device_t device); + + void setup(util::runtime_configuration const& rtcfg); + void cleanup(); + + void join_prg_thread_if_running(); }; } // namespace policies::lci } // namespace hpx::parcelset @@ -138,7 +174,7 @@ namespace hpx::traits { { if (util::lci_environment::enabled()) { - util::lci_environment::setup(cfg.rtcfg_); + parcelset::policies::lci::config_t::init_config(cfg.rtcfg_); cfg.num_localities_ = static_cast(util::lci_environment::size()); cfg.node_ = @@ -149,29 +185,34 @@ namespace hpx::traits { static void init(hpx::resource::partitioner& rp) noexcept { if (util::lci_environment::enabled() && - util::lci_environment::enable_lci_progress_pool) + parcelset::policies::lci::config_t::progress_type == + parcelset::policies::lci::config_t::progress_type_t::rp) { + if (!parcelset::policies::lci::config_t::is_initialized) + { + fprintf(stderr, + "init_config hasn't been called! Something is " + "wrong!\n"); + exit(1); + } rp.create_thread_pool("lci-progress-pool-eager", hpx::resource::scheduling_policy::static_, hpx::threads::policies::scheduler_mode:: do_background_work_only); - if (util::lci_environment::use_two_device) + if (parcelset::policies::lci::config_t::use_two_device) rp.create_thread_pool("lci-progress-pool-iovec", hpx::resource::scheduling_policy::static_, hpx::threads::policies::scheduler_mode:: do_background_work_only); rp.add_resource(rp.numa_domains()[0].cores()[0].pus()[0], "lci-progress-pool-eager"); - if (util::lci_environment::use_two_device) + if (parcelset::policies::lci::config_t::use_two_device) rp.add_resource(rp.numa_domains()[0].cores()[1].pus()[0], "lci-progress-pool-iovec"); } } - static void destroy() - { - util::lci_environment::finalize(); - } + static void destroy() {} static constexpr char const* call() noexcept { @@ -188,10 +229,15 @@ namespace hpx::traits { #endif "max_connections = " "${HPX_HAVE_PARCELPORT_LCI_MAX_CONNECTIONS:8192}\n" - "rp_prg_pool = 1\n" + "log_level = none\n" + "log_outfile = stderr\n" "backlog_queue = 0\n" "use_two_device = 0\n" - "prg_thread_core = -1\n"; + "prg_thread_core = -1\n" + "protocol = putva\n" + "comp_type = queue\n" + "progress_type = rp\n" + "prepost_recv_num = 1\n"; } }; } // namespace hpx::traits diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp similarity index 69% rename from libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver.hpp rename to libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp index 75299bf912f7..845eb301f0b3 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp @@ -12,7 +12,7 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) #include -#include +#include #include #include @@ -28,105 +28,29 @@ #include namespace hpx::parcelset::policies::lci { - struct buffer_wrapper + struct receiver_putva : public receiver_base { - struct fake_allocator + explicit receiver_putva(parcelport* pp) noexcept + : receiver_base(pp) { - }; - using allocator_type = fake_allocator; - void* ptr; - size_t length; - buffer_wrapper() = default; - buffer_wrapper(const buffer_wrapper& wrapper) = default; - buffer_wrapper& operator=(const buffer_wrapper& wrapper) = default; - buffer_wrapper(const allocator_type& alloc) - { - HPX_UNUSED(alloc); - ptr = nullptr; - length = 0; - } - buffer_wrapper( - const buffer_wrapper& wrapper, const allocator_type& alloc) - { - HPX_UNUSED(alloc); - ptr = wrapper.ptr; - length = wrapper.length; - } - char& operator[](size_t i) const - { - HPX_ASSERT(i < length); - char* p = (char*) ptr; - return p[i]; } - void* data() const - { - return ptr; - } - size_t size() const - { - return length; - } - }; - - struct request_wrapper_t - { - LCI_request_t request; - request_wrapper_t() - { - request.flag = LCI_ERR_RETRY; - } - ~request_wrapper_t() - { - if (request.flag == LCI_OK) - { - if (request.type == LCI_IOVEC) - { - for (int j = 0; j < request.data.iovec.count; ++j) - { - LCI_lbuffer_free(request.data.iovec.lbuffers[j]); - } - free(request.data.iovec.lbuffers); - free(request.data.iovec.piggy_back.address); - } - else - { - HPX_ASSERT(request.type = LCI_MEDIUM); - LCI_mbuffer_free(request.data.mbuffer); - } - } - else - { - HPX_ASSERT(request.flag == LCI_ERR_RETRY); - } - } - }; - - template - struct receiver - { - using buffer_type = parcel_buffer; - - explicit receiver(Parcelport& pp) noexcept - : pp_(pp) - { - } - - void run() noexcept {} bool background_work() noexcept { // We first try to accept a new connection request_wrapper_t request; - LCI_queue_pop(util::lci_environment::get_rcq(), &request.request); + request.request = pp_->recv_new_completion_manager->poll(); if (request.request.flag == LCI_OK) { + HPX_ASSERT(request.request.flag == LCI_OK); process_request(request.request); return true; } return false; } + private: void process_request(LCI_request_t request) { if (request.type == LCI_MEDIUM) @@ -139,7 +63,7 @@ namespace hpx::parcelset::policies::lci { (char*) request.data.mbuffer.address + consumed, buffer); handle_received_parcels( - decode_parcels(pp_, HPX_MOVE(buffer))); + decode_parcels(*pp_, HPX_MOVE(buffer))); } HPX_ASSERT(consumed == request.data.mbuffer.length); } @@ -149,7 +73,7 @@ namespace hpx::parcelset::policies::lci { HPX_ASSERT(request.type == LCI_IOVEC); buffer_type buffer; decode_iovec(request.data.iovec, buffer); - handle_received_parcels(decode_parcels(pp_, HPX_MOVE(buffer))); + handle_received_parcels(decode_parcels(*pp_, HPX_MOVE(buffer))); } } @@ -158,7 +82,6 @@ namespace hpx::parcelset::policies::lci { #if defined(HPX_HAVE_PARCELPORT_COUNTERS) hpx::chrono::high_resolution_timer timer_; parcelset::data_point& data = buffer.data_point_; - data.time_ = timer_.elapsed_nanoseconds(); #endif // decode header header header_ = header((char*) address); @@ -240,20 +163,17 @@ namespace hpx::parcelset::policies::lci { std::size_t chunk_size = buffer.transmission_chunks_[j].second; HPX_ASSERT(iovec.lbuffers[i].length == chunk_size); - buffer_wrapper& c = buffer.chunks_[j]; - c.length = chunk_size; - c.ptr = iovec.lbuffers[i].address; + buffer.chunks_[j] = serialization::create_pointer_chunk( + iovec.lbuffers[i].address, chunk_size); ++i; } } HPX_ASSERT(i == iovec.count); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) data.bytes_ = static_cast(header_.numbytes()); - data.time_ = timer_.elapsed_nanoseconds() - data.time_; + data.time_ = timer_.elapsed_nanoseconds(); #endif } - - Parcelport& pp_; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp new file mode 100644 index 000000000000..12af797f2c20 --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp @@ -0,0 +1,60 @@ +// Copyright (c) 2014-2015 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + struct sender_connection_putva : public sender_connection_base + { + public: + sender_connection_putva(int dst, parcelset::parcelport* pp) + : sender_connection_base(dst, pp) + { + } + ~sender_connection_putva() {} + void load(handler_type&& handler, + postprocess_handler_type&& parcel_postprocess); + return_t send_nb(); + void done(); + bool tryMerge( + const std::shared_ptr& other_base); + + private: + enum class connection_state + { + initialized, + sent, + locked, + }; + bool can_be_eager_message(size_t max_header_size); + bool isEager(); + void cleanup(); + return_t send_msg(); + + std::atomic state; + bool is_eager; + LCI_mbuffer_t mbuffer; + LCI_iovec_t iovec; + std::shared_ptr* + sharedPtr_p; // for LCI_putva + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_putva.hpp new file mode 100644 index 000000000000..27e15b02c624 --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_putva.hpp @@ -0,0 +1,67 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + struct sender_connection_putva; + struct sender_putva : public sender_base + { + explicit sender_putva(parcelport* pp) noexcept + : sender_base(pp) + { + } + + connection_ptr create_connection(int dest, parcelset::parcelport* pp); + }; + +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver_base.hpp new file mode 100644 index 000000000000..c458bd2a7e0b --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/receiver_base.hpp @@ -0,0 +1,139 @@ + +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + class HPX_EXPORT parcelport; + struct buffer_wrapper + { + struct fake_allocator + { + }; + using allocator_type = fake_allocator; + void* ptr; + size_t length; + buffer_wrapper() = default; + buffer_wrapper(const buffer_wrapper& wrapper) = default; + buffer_wrapper& operator=(const buffer_wrapper& wrapper) = default; + explicit buffer_wrapper(const allocator_type& alloc) + { + HPX_UNUSED(alloc); + ptr = nullptr; + length = 0; + } + buffer_wrapper( + const buffer_wrapper& wrapper, const allocator_type& alloc) + { + HPX_UNUSED(alloc); + ptr = wrapper.ptr; + length = wrapper.length; + } + ~buffer_wrapper() {} + char& operator[](size_t i) const + { + HPX_ASSERT(i < length); + char* p = (char*) ptr; + return p[i]; + } + void* data() const + { + return ptr; + } + size_t size() const + { + return length; + } + void allocate(size_t size) + { + ptr = new char[size]; + length = size; + } + void free() + { + HPX_ASSERT(ptr != nullptr); + delete[](char*) ptr; + } + }; + + struct request_wrapper_t + { + LCI_request_t request; + request_wrapper_t() + { + request.flag = LCI_ERR_RETRY; + } + ~request_wrapper_t() + { + if (request.flag == LCI_OK) + { + if (request.type == LCI_IOVEC) + { + for (int j = 0; j < request.data.iovec.count; ++j) + { + LCI_lbuffer_free(request.data.iovec.lbuffers[j]); + } + free(request.data.iovec.lbuffers); + free(request.data.iovec.piggy_back.address); + } + else + { + HPX_ASSERT(request.type = LCI_MEDIUM); + LCI_mbuffer_free(request.data.mbuffer); + } + } + else + { + HPX_ASSERT(request.flag == LCI_ERR_RETRY); + } + } + }; + + struct receiver_base + { + using buffer_type = + parcel_buffer; + + explicit receiver_base(parcelport* pp) noexcept + : pp_(pp) + { + } + + virtual ~receiver_base() {} + + void run() noexcept {} + + virtual bool background_work() noexcept = 0; + + protected: + parcelport* pp_; + }; + +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_base.hpp similarity index 82% rename from libs/full/parcelport_lci/include/hpx/parcelport_lci/sender.hpp rename to libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_base.hpp index cfb0cc9fe79d..eba5b6b32bd4 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_base.hpp @@ -36,10 +36,8 @@ #include #include -#include #include #include -#include #include #include @@ -50,17 +48,24 @@ #include namespace hpx::parcelset::policies::lci { - struct sender_connection; - struct sender + class HPX_EXPORT parcelport; + struct sender_connection_base; + struct sender_base { - using connection_type = sender_connection; + using connection_type = sender_connection_base; using connection_ptr = std::shared_ptr; - sender() noexcept {} + sender_base() = delete; + sender_base(parcelport* pp) noexcept + : pp_(pp) + { + } + virtual ~sender_base() {} void run() noexcept {} - connection_ptr create_connection(int dest, parcelset::parcelport* pp); + virtual connection_ptr create_connection( + int dest, parcelset::parcelport* pp) = 0; bool background_work(size_t num_thread) noexcept; @@ -71,9 +76,11 @@ namespace hpx::parcelset::policies::lci { using callback_fn_type = hpx::move_only_function; - static bool send(parcelset::parcelport* pp, + bool send_immediate(parcelset::parcelport* pp, parcelset::locality const& dest, parcel_buffer_type buffer, callback_fn_type&& callbackFn); + + parcelport* pp_; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp similarity index 54% rename from libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection.hpp rename to libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp index b348cbe79f2d..34dbb44f968f 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp @@ -10,7 +10,7 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) -#include +#include #include #include @@ -20,31 +20,42 @@ #include namespace hpx::parcelset::policies::lci { - struct sender; - struct sender_connection - : parcelset::parcelport_connection> + struct sender_connection_base + : public parcelset::parcelport_connection> { - private: - using sender_type = sender; + protected: using write_handler_type = hpx::function; using data_type = std::vector; using base_type = - parcelset::parcelport_connection; + parcelset::parcelport_connection; using handler_type = hpx::move_only_function; using postprocess_handler_type = hpx::move_only_function)>; + std::shared_ptr)>; public: - sender_connection(int dst, parcelset::parcelport* pp) + enum class return_status_t + { + done, // This connection is done. No need to call send anymore. + retry, // Need to call send on this connection to make progress. + wait // Wait for the completion object and then call send to + // make progress. + }; + struct return_t + { + return_status_t status; + LCI_comp_t completion; + }; + sender_connection_base(int dst, parcelset::parcelport* pp) : dst_rank(dst) - , pp_(pp) + , pp_((lci::parcelport*) pp) , there_(parcelset::locality(locality(dst_rank))) { } - ~sender_connection() {} + virtual ~sender_connection_base() {} parcelset::locality const& destination() const noexcept { @@ -58,27 +69,20 @@ namespace hpx::parcelset::policies::lci { void async_write(handler_type&& handler, postprocess_handler_type&& parcel_postprocess); - - bool can_be_eager_message(size_t max_header_size); - - void load(handler_type&& handler, - postprocess_handler_type&& parcel_postprocess); - - bool isEager(); - bool send(); - - void done(); - - bool tryMerge(const std::shared_ptr& other); + virtual void load(handler_type&& handler, + postprocess_handler_type&& parcel_postprocess) = 0; + return_t send(); + virtual return_t send_nb() = 0; + virtual void done() = 0; + virtual bool tryMerge( + const std::shared_ptr& other_base) = 0; + void profile_start_hook(const header& header_); + void profile_end_hook(); int dst_rank; handler_type handler_; postprocess_handler_type postprocess_handler_; - bool is_eager; - LCI_mbuffer_t mbuffer; - LCI_iovec_t iovec; - std::shared_ptr* sharedPtr_p; // for LCI_putva - parcelset::parcelport* pp_; + parcelport* pp_; parcelset::locality there_; #if defined(HPX_HAVE_PARCELPORT_COUNTERS) parcelset::data_point data_point_; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp new file mode 100644 index 000000000000..f4daedd2f785 --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp @@ -0,0 +1,76 @@ +// Copyright (c) 2014-2015 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + struct receiver_connection_sendrecv + : public std::enable_shared_from_this + { + struct return_t + { + bool isDone; + LCI_comp_t completion; + }; + + public: + receiver_connection_sendrecv(int dst, parcelset::parcelport* pp); + ~receiver_connection_sendrecv() {} + void load(char* header_buffer); + return_t receive(); + void done(); + + private: + enum class connection_state + { + initialized, + rcvd_transmission_chunks, + rcvd_data, + rcvd_chunks, + locked + }; + LCI_comp_t unified_recv(void* address, int length); + return_t receive_transmission_chunks(); + return_t receive_data(); + return_t receive_chunks(); + void receive_chunks_zc_preprocess(); + return_t receive_chunks_zc(); + return_t receive_chunks_nzc(); + // the state of this connection + std::atomic state; + size_t recv_chunks_idx; + size_t recv_zero_copy_chunks_idx; + // related information about this connection + hpx::chrono::high_resolution_timer timer_; + int dst_rank; + bool need_recv_data; + bool need_recv_tchunks; + LCI_tag_t tag; + LCI_tag_t original_tag; + receiver_base::buffer_type buffer; + std::vector parcels_; + std::vector> chunk_buffers_; + parcelport* pp_; + std::shared_ptr* sharedPtr_p; + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp new file mode 100644 index 000000000000..8b2e084c3ae6 --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp @@ -0,0 +1,70 @@ + +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + struct receiver_connection_sendrecv; + struct receiver_sendrecv : public receiver_base + { + using connection_type = receiver_connection_sendrecv; + using connection_ptr = std::shared_ptr; + + explicit receiver_sendrecv(parcelport* pp) noexcept + : receiver_base(pp) + { + if (config_t::protocol == config_t::protocol_t::sendrecv) + { + for (int i = 0; i < config_t::prepost_recv_num; ++i) + { + LCI_comp_t completion = + pp_->recv_new_completion_manager->alloc_completion(); + LCI_recvmn(pp_->endpoint_new_eager, LCI_RANK_ANY, 0, + completion, nullptr); + pp_->recv_new_completion_manager->enqueue_completion( + completion); + } + } + } + + ~receiver_sendrecv() {} + + connection_ptr create_connection(int dest, parcelset::parcelport* pp); + + bool background_work() noexcept; + + private: + bool accept_new() noexcept; + bool followup() noexcept; + }; + +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp new file mode 100644 index 000000000000..866f64995301 --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp @@ -0,0 +1,71 @@ +// Copyright (c) 2014-2015 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + struct sender_connection_sendrecv : public sender_connection_base + { + public: + sender_connection_sendrecv(int dst, parcelset::parcelport* pp) + : sender_connection_base(dst, pp) + { + } + ~sender_connection_sendrecv() {} + void load(handler_type&& handler, + postprocess_handler_type&& parcel_postprocess); + return_t send_nb(); + void done(); + bool tryMerge( + const std::shared_ptr& other_base); + + private: + enum class connection_state + { + initialized, + sent_header, + sent_transmission_chunks, + sent_data, + sent_chunks, + locked, + }; + return_t send_header(); + return_t unified_followup_send(void* address, int length); + return_t send_transmission_chunks(); + return_t send_data(); + return_t send_chunks(); + // the state of this connection + std::atomic state; + size_t send_chunks_idx; + // related information about this connection + hpx::chrono::high_resolution_timer timer_; + header header_; + LCI_mbuffer_t header_buffer; + bool need_send_data; + bool need_send_tchunks; + LCI_tag_t tag; + LCI_tag_t original_tag; + std::shared_ptr* sharedPtr_p; + + static std::atomic next_tag; + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp new file mode 100644 index 000000000000..a742de1437a7 --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_sendrecv.hpp @@ -0,0 +1,66 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + struct sender_connection_sendrecv; + struct sender_sendrecv : public sender_base + { + explicit sender_sendrecv(parcelport* pp) noexcept + : sender_base(pp) + { + } + + connection_ptr create_connection(int dest, parcelset::parcelport* pp); + }; + +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/src/backlog_queue.cpp b/libs/full/parcelport_lci/src/backlog_queue.cpp index 0d8f54034441..ea0dbf2169f9 100644 --- a/libs/full/parcelport_lci/src/backlog_queue.cpp +++ b/libs/full/parcelport_lci/src/backlog_queue.cpp @@ -10,13 +10,14 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) +#include #include #include +#include #include #include -#include -#include -#include +#include +#include namespace hpx::parcelset::policies::lci::backlog_queue { thread_local backlog_queue_t tls_backlog_queue; @@ -32,7 +33,10 @@ namespace hpx::parcelset::policies::lci::backlog_queue { { bool succeed = message_queue.back()->tryMerge(message); if (succeed) + { + message->done(); return; + } } tls_backlog_queue.messages[message->dst_rank].push_back( HPX_MOVE(message)); @@ -48,7 +52,8 @@ namespace hpx::parcelset::policies::lci::backlog_queue { return ret; } - bool background_work(size_t num_thread) noexcept + bool background_work( + completion_manager_base* completion_manager, size_t num_thread) noexcept { bool did_some_work = false; for (size_t i = 0; i < tls_backlog_queue.messages.size(); ++i) @@ -58,19 +63,24 @@ namespace hpx::parcelset::policies::lci::backlog_queue { !tls_backlog_queue.messages[idx].empty()) { message_ptr message = tls_backlog_queue.messages[idx].front(); - // bool needCallDone = message->isEager(); - bool isSent = message->send(); - if (isSent) + auto ret = message->send_nb(); + if (ret.status == sender_connection_base::return_status_t::done) + { + tls_backlog_queue.messages[idx].pop_front(); + message->done(); + did_some_work = true; + } + else if (ret.status == + sender_connection_base::return_status_t::wait) { tls_backlog_queue.messages[idx].pop_front(); did_some_work = true; - // if (needCallDone) { - // message->done(); - // it can context switch to another thread at this point - // } + completion_manager->enqueue_completion(ret.completion); } else { + HPX_ASSERT(ret.status == + sender_connection_base::return_status_t::retry); break; } } diff --git a/libs/full/parcelport_lci/src/config.cpp b/libs/full/parcelport_lci/src/config.cpp new file mode 100644 index 000000000000..7859e4980448 --- /dev/null +++ b/libs/full/parcelport_lci/src/config.cpp @@ -0,0 +1,131 @@ +// Copyright (c) 2022 Jiakun Yan +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include + +#include +#include + +namespace hpx::parcelset::policies::lci { + bool config_t::is_initialized = false; + bool config_t::use_two_device; + bool config_t::enable_send_immediate; + bool config_t::enable_lci_backlog_queue; + config_t::protocol_t config_t::protocol; + LCI_comp_type_t config_t::completion_type; + config_t::progress_type_t config_t::progress_type; + int config_t::progress_thread_core; + int config_t::prepost_recv_num; + + void config_t::init_config(util::runtime_configuration const& rtcfg) + { + if (is_initialized) + return; + is_initialized = true; + // The default value here does not matter here + enable_send_immediate = util::get_entry_as( + rtcfg, "hpx.parcel.lci.sendimm", false /* Does not matter*/); + enable_lci_backlog_queue = util::get_entry_as( + rtcfg, "hpx.parcel.lci.backlog_queue", false /* Does not matter*/); + use_two_device = util::get_entry_as( + rtcfg, "hpx.parcel.lci.use_two_device", false); + // set protocol to use + std::string protocol_str = util::get_entry_as( + rtcfg, "hpx.parcel.lci.protocol", ""); + if (protocol_str == "putva") + { + protocol = protocol_t::putva; + } + else if (protocol_str == "putsendrecv") + { + protocol = protocol_t::putsendrecv; + } + else if (protocol_str == "sendrecv") + { + protocol = protocol_t::sendrecv; + } + else + { + throw std::runtime_error("Unknown protocol " + protocol_str); + } + // set completion mechanism to use + std::string completion_str = util::get_entry_as( + rtcfg, "hpx.parcel.lci.comp_type", ""); + if (completion_str == "queue") + { + completion_type = LCI_COMPLETION_QUEUE; + } + else if (completion_str == "sync") + { + completion_type = LCI_COMPLETION_SYNC; + } + else + { + throw std::runtime_error( + "Unknown completion type " + completion_str); + } + // set the way to run LCI_progress + std::string progress_type_str = util::get_entry_as( + rtcfg, "hpx.parcel.lci.progress_type", ""); + if (progress_type_str == "rp") + { + progress_type = progress_type_t::rp; + } + else if (progress_type_str == "pthread") + { + progress_type = progress_type_t::pthread; + } + else if (progress_type_str == "worker") + { + progress_type = progress_type_t::worker; + } + else + { + throw std::runtime_error( + "Unknown progress type " + progress_type_str); + } + progress_thread_core = + util::get_entry_as(rtcfg, "hpx.parcel.lci.prg_thread_core", -1); + prepost_recv_num = + util::get_entry_as(rtcfg, "hpx.parcel.lci.prepost_recv_num", 1); + + if (protocol == protocol_t::sendrecv) + { + if (use_two_device) + { + use_two_device = false; + fprintf(stderr, "WARNING: set use_two_device to false!\n"); + } + } + if (!enable_send_immediate && enable_lci_backlog_queue) + { + enable_lci_backlog_queue = false; + fprintf( + stderr, "WARNING: set enable_lci_backlog_queue to false!\n"); + } + std::size_t num_threads = + util::get_entry_as(rtcfg, "hpx.os_threads", 1); + if (progress_type == progress_type_t::rp && num_threads <= 1) + { + progress_type = progress_type_t::pthread; + fprintf(stderr, "WARNING: set progress_type to pthread!\n"); + } + if (use_two_device && progress_type == progress_type_t::rp && + num_threads <= 2) + { + use_two_device = false; + fprintf(stderr, "WARNING: set use_two_device to false!\n"); + } + } +} // namespace hpx::parcelset::policies::lci +#endif diff --git a/libs/full/parcelport_lci/src/parcelport_lci.cpp b/libs/full/parcelport_lci/src/parcelport_lci.cpp index e4577f0b8bce..8addd5611b0f 100644 --- a/libs/full/parcelport_lci/src/parcelport_lci.cpp +++ b/libs/full/parcelport_lci/src/parcelport_lci.cpp @@ -12,14 +12,24 @@ #include +#include #include #include +#include +#include #include #include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include #include #include #include @@ -44,29 +54,41 @@ namespace hpx::parcelset::policies::lci { threads::policies::callback_notifier const& notifier) : parcelport::base_type(ini, here(), notifier) , stopped_(false) - , receiver_(*this) { + if (!util::lci_environment::enabled()) + return; + if (!parcelset::policies::lci::config_t::is_initialized) + { + fprintf(stderr, + "init_config hasn't been called! Something is wrong!\n"); + exit(1); + } + setup(ini); } parcelport::~parcelport() { + if (!util::lci_environment::enabled()) + return; + cleanup(); util::lci_environment::finalize(); } void parcelport::initialized() { if (util::lci_environment::enabled() && - util::lci_environment::enable_lci_progress_pool) + config_t::progress_type != config_t::progress_type_t::pthread) { - util::lci_environment::join_prg_thread_if_running(); + join_prg_thread_if_running(); } + is_initialized = true; } // Start the handling of connections. bool parcelport::do_run() { - receiver_.run(); - sender_.run(); + receiver_p->run(); + sender_p->run(); for (std::size_t i = 0; i != io_service_pool_.size(); ++i) { io_service_pool_.get_io_service(int(i)).post( @@ -86,7 +108,6 @@ namespace hpx::parcelset::policies::lci { "lci::parcelport::do_stop"); } stopped_ = true; - LCI_barrier(); } /// Return the name of this locality @@ -97,11 +118,11 @@ namespace hpx::parcelset::policies::lci { std::to_string(util::lci_environment::rank()); } - std::shared_ptr parcelport::create_connection( + std::shared_ptr parcelport::create_connection( parcelset::locality const& l, error_code&) { int dest_rank = l.get().rank(); - return sender_.create_connection(dest_rank, this); + return sender_p->create_connection(dest_rank, this); } parcelset::locality parcelport::agas_locality( @@ -131,13 +152,13 @@ namespace hpx::parcelset::policies::lci { if (do_lci_progress == -1) { do_lci_progress = 0; - if (util::lci_environment::enable_lci_progress_pool && + if (config_t::progress_type == config_t::progress_type_t::rp && hpx::threads::get_self_id() != hpx::threads::invalid_thread_id) { if (hpx::this_thread::get_pool() == &hpx::resource::get_thread_pool("lci-progress-pool-eager")) do_lci_progress = 1; - else if (util::lci_environment::use_two_device && + else if (config_t::use_two_device && hpx::this_thread::get_pool() == &hpx::resource::get_thread_pool( "lci-progress-pool-iovec")) @@ -153,8 +174,7 @@ namespace hpx::parcelset::policies::lci { int idle_loop_count = 0; while (idle_loop_count < max_idle_loop_count) { - while (util::lci_environment::do_progress( - util::lci_environment::get_device_eager())) + while (util::lci_environment::do_progress(device_eager)) { has_work = true; idle_loop_count = 0; @@ -164,12 +184,10 @@ namespace hpx::parcelset::policies::lci { } else if (do_lci_progress == 2) { - // magic number int idle_loop_count = 0; while (idle_loop_count < max_idle_loop_count) { - while (util::lci_environment::do_progress( - util::lci_environment::get_device_iovec())) + while (util::lci_environment::do_progress(device_iovec)) { has_work = true; idle_loop_count = 0; @@ -190,35 +208,54 @@ namespace hpx::parcelset::policies::lci { if (stopped_) return false; - bool has_work; + bool has_work = false; if (mode & parcelport_background_mode_send) { - has_work = sender_.background_work(num_thread); - // try to send pending messages - has_work = backlog_queue::background_work(num_thread) || has_work; + has_work = sender_p->background_work(num_thread); + if (config_t::progress_type == config_t::progress_type_t::worker) + do_progress(); + if (config_t::enable_lci_backlog_queue) + // try to send pending messages + has_work = backlog_queue::background_work( + send_completion_manager.get(), num_thread) || + has_work; } if (mode & parcelport_background_mode_receive) { - has_work = receiver_.background_work() || has_work; + has_work = receiver_p->background_work() || has_work; + if (config_t::progress_type == config_t::progress_type_t::worker) + do_progress(); } return has_work; } bool parcelport::can_send_immediate() { - return util::lci_environment::enable_send_immediate; + return config_t::enable_send_immediate; + } + + bool parcelport::send_immediate(parcelset::parcelport* pp, + parcelset::locality const& dest, sender_base::parcel_buffer_type buffer, + sender_base::callback_fn_type&& callbackFn) + { + return sender_p->send_immediate(pp, dest, + HPX_FORWARD(sender_base::parcel_buffer_type, buffer), + HPX_FORWARD(sender_base::callback_fn_type, callbackFn)); } void parcelport::io_service_work() { std::size_t k = 0; // We only execute work on the IO service while HPX is starting - // TODO: stop io service work when worker threads start to execute the - // background function - while (hpx::is_starting()) + // and stop io service work when worker threads start to execute the + // background function + while (hpx::is_starting() && !is_initialized) { - bool has_work = sender_.background_work(0); - has_work = receiver_.background_work() || has_work; + bool has_work = sender_p->background_work(0); + has_work = receiver_p->background_work() || has_work; + if (config_t::progress_type == config_t::progress_type_t::worker) + while (do_progress()) + continue; if (has_work) { k = 0; @@ -232,7 +269,168 @@ namespace hpx::parcelset::policies::lci { } } } - bool parcelport::is_sending_early_parcel = false; + + std::atomic parcelport::prg_thread_flag = false; + void parcelport::progress_thread_fn(LCI_device_t device) + { + while (prg_thread_flag) + { + util::lci_environment::do_progress(device); + } + } + + void set_affinity(pthread_t pthread_handler, size_t target) + { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(target, &cpuset); + int rv = + pthread_setaffinity_np(pthread_handler, sizeof(cpuset), &cpuset); + if (rv != 0) + { + fprintf(stderr, "ERROR %d thread affinity didn't work.\n", rv); + exit(1); + } + } + + void parcelport::setup(util::runtime_configuration const& rtcfg) + { + HPX_UNUSED(rtcfg); + // Create device + device_eager = LCI_UR_DEVICE; + if (config_t::use_two_device) + LCI_device_init(&device_iovec); + else + device_iovec = LCI_UR_DEVICE; + + // Create completion objects + if (config_t::protocol == config_t::protocol_t::sendrecv && + config_t::completion_type == LCI_COMPLETION_SYNC) + { + recv_new_completion_manager = + std::make_shared(); + } + else + { + recv_new_completion_manager = + std::make_shared(); + } + switch (config_t::completion_type) + { + case LCI_COMPLETION_QUEUE: + send_completion_manager = + std::make_shared(); + recv_followup_completion_manager = + std::make_shared(); + break; + case LCI_COMPLETION_SYNC: + send_completion_manager = + std::make_shared(); + recv_followup_completion_manager = + std::make_shared(); + break; + default: + throw std::runtime_error("Unknown completion type!"); + } + + // Create endpoints + LCI_plist_t plist_; + LCI_plist_create(&plist_); + LCI_plist_set_comp_type( + plist_, LCI_PORT_COMMAND, config_t::completion_type); + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, config_t::completion_type); + LCI_endpoint_init(&endpoint_followup, device_eager, plist_); + LCI_plist_set_default_comp( + plist_, recv_new_completion_manager->get_completion_object()); + if (config_t::protocol == config_t::protocol_t::sendrecv && + config_t::completion_type == LCI_COMPLETION_SYNC) + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); + else + { + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); + } + if (config_t::protocol == config_t::protocol_t::sendrecv) + LCI_plist_set_match_type(plist_, LCI_MATCH_TAG); + LCI_endpoint_init(&endpoint_new_eager, device_eager, plist_); + LCI_endpoint_init(&endpoint_new_iovec, device_iovec, plist_); + LCI_plist_free(&plist_); + + // Create progress threads + HPX_ASSERT(prg_thread_flag == false); + HPX_ASSERT(prg_thread_eager_p == nullptr); + HPX_ASSERT(prg_thread_iovec_p == nullptr); + prg_thread_flag = true; + prg_thread_eager_p = + std::make_unique(progress_thread_fn, device_eager); + if (config_t::use_two_device) + prg_thread_iovec_p = + std::make_unique(progress_thread_fn, device_iovec); + if (config_t::progress_thread_core >= 0) + { + set_affinity(prg_thread_eager_p->native_handle(), + config_t::progress_thread_core); + if (config_t::use_two_device) + set_affinity(prg_thread_iovec_p->native_handle(), + config_t::progress_thread_core + 1); + } + + // Create the sender and receiver + switch (config_t::protocol) + { + case config_t::protocol_t::putva: + sender_p = std::make_shared(this); + receiver_p = std::make_shared(this); + break; + case config_t::protocol_t::sendrecv: + case config_t::protocol_t::putsendrecv: + sender_p = std::make_shared(this); + receiver_p = std::make_shared(this); + break; + default: + throw std::runtime_error("Unknown Protocol!"); + } + } + + void parcelport::cleanup() + { + join_prg_thread_if_running(); + // free ep, rcq + LCI_endpoint_free(&endpoint_new_iovec); + LCI_endpoint_free(&endpoint_followup); + LCI_endpoint_free(&endpoint_new_eager); + if (config_t::use_two_device) + LCI_device_free(&device_iovec); + } + + void parcelport::join_prg_thread_if_running() + { + if (prg_thread_eager_p || prg_thread_iovec_p) + { + prg_thread_flag = false; + if (prg_thread_eager_p) + { + prg_thread_eager_p->join(); + prg_thread_eager_p.reset(); + } + if (prg_thread_iovec_p) + { + prg_thread_iovec_p->join(); + prg_thread_iovec_p.reset(); + } + } + } + + bool parcelport::do_progress() + { + bool ret = false; + ret = util::lci_environment::do_progress(device_eager) || ret; + if (config_t::use_two_device) + ret = util::lci_environment::do_progress(device_iovec) || ret; + return ret; + } } // namespace hpx::parcelset::policies::lci HPX_REGISTER_PARCELPORT(hpx::parcelset::policies::lci::parcelport, lci) diff --git a/libs/full/parcelport_lci/src/sender_connection.cpp b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp similarity index 68% rename from libs/full/parcelport_lci/src/sender_connection.cpp rename to libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp index 037608ca24b1..d1ad8d60ebcb 100644 --- a/libs/full/parcelport_lci/src/sender_connection.cpp +++ b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp @@ -13,49 +13,19 @@ #include #include -#include #include #include #include -#include -#include -#include +#include +#include "hpx/parcelport_lci/putva/sender_connection_putva.hpp" #include #include #include namespace hpx::parcelset::policies::lci { - void sender_connection::async_write( - sender_connection::handler_type&& handler, - sender_connection::postprocess_handler_type&& parcel_postprocess) - { - load(HPX_FORWARD(handler_type, handler), - HPX_FORWARD(postprocess_handler_type, parcel_postprocess)); - if (!util::lci_environment::enable_lci_backlog_queue || - HPX_UNLIKELY(parcelport::is_sending_early_parcel)) - { - while (!send()) - continue; - return; - } - else - { - if (!backlog_queue::empty(dst_rank)) - { - backlog_queue::push(shared_from_this()); - return; - } - bool isSent = send(); - if (!isSent) - { - backlog_queue::push(shared_from_this()); - return; - } - } - } - bool sender_connection::can_be_eager_message(size_t eager_threshold) + bool sender_connection_putva::can_be_eager_message(size_t eager_threshold) { int num_zero_copy_chunks = static_cast(buffer_.num_chunks_.first); if (num_zero_copy_chunks > 0) @@ -71,8 +41,9 @@ namespace hpx::parcelset::policies::lci { return false; } - void sender_connection::load(sender_connection::handler_type&& handler, - sender_connection::postprocess_handler_type&& parcel_postprocess) + void sender_connection_putva::load( + sender_connection_putva::handler_type&& handler, + sender_connection_putva::postprocess_handler_type&& parcel_postprocess) { #if defined(HPX_HAVE_PARCELPORT_COUNTERS) data_point_ = buffer_.data_point_; @@ -92,14 +63,12 @@ namespace hpx::parcelset::policies::lci { int num_zero_copy_chunks = static_cast(buffer_.num_chunks_.first); if (is_eager) { - while (LCI_mbuffer_alloc(util::lci_environment::get_device_eager(), - &mbuffer) != LCI_OK) + while (LCI_mbuffer_alloc(pp_->device_eager, &mbuffer) != LCI_OK) continue; HPX_ASSERT(mbuffer.length == (size_t) LCI_MEDIUM_SIZE); header_ = header(buffer_, (char*) mbuffer.address, mbuffer.length); mbuffer.length = header_.size(); - if (util::lci_environment::enable_send_immediate) - done(); + cleanup(); } else { @@ -164,77 +133,118 @@ namespace hpx::parcelset::policies::lci { } } HPX_ASSERT(long_msg_num == i); - sharedPtr_p = - new std::shared_ptr(shared_from_this()); + sharedPtr_p = new std::shared_ptr( + std::dynamic_pointer_cast( + shared_from_this())); } + profile_start_hook(header_); + state.store(connection_state::initialized, std::memory_order_release); } - bool sender_connection::isEager() + bool sender_connection_putva::isEager() { return is_eager; } - bool sender_connection::send() + sender_connection_putva::return_t sender_connection_putva::send_nb() + { + switch (state.load(std::memory_order_acquire)) + { + case connection_state::initialized: + return send_msg(); + + case connection_state::sent: + return {return_status_t::done, nullptr}; + + case connection_state::locked: + return {return_status_t::retry, nullptr}; + + default: + throw std::runtime_error("Unexpected send state!"); + } + } + + sender_connection_putva::return_t sender_connection_putva::send_msg() { + const auto current_state = connection_state::initialized; + const auto next_state = connection_state::sent; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + int ret; if (is_eager) { - ret = LCI_putmna(util::lci_environment::get_endpoint_eager(), - mbuffer, dst_rank, 0, LCI_DEFAULT_COMP_REMOTE); + ret = LCI_putmna(pp_->endpoint_new_eager, mbuffer, dst_rank, 0, + LCI_DEFAULT_COMP_REMOTE); if (ret == LCI_OK) { -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - data_point_.time_ = hpx::chrono::high_resolution_clock::now() - - data_point_.time_; - pp_->add_sent_data(data_point_); -#endif - if (!util::lci_environment::enable_send_immediate) - done(); + state.store(next_state, std::memory_order_release); + return {return_status_t::done, nullptr}; } } else { void* buffer_to_free = iovec.piggy_back.address; + LCI_comp_t completion = + pp_->send_completion_manager->alloc_completion(); // In order to keep the send_connection object from being // deallocated. We have to allocate a shared_ptr in the heap // and pass a pointer to shared_ptr to LCI. // We will get this pointer back via the send completion queue // after this send completes. - ret = LCI_putva(util::lci_environment::get_endpoint_iovec(), iovec, - util::lci_environment::get_scq(), dst_rank, 0, - LCI_DEFAULT_COMP_REMOTE, sharedPtr_p); + state.store(connection_state::locked, std::memory_order_relaxed); + ret = LCI_putva(pp_->endpoint_new_iovec, iovec, completion, + dst_rank, 0, LCI_DEFAULT_COMP_REMOTE, sharedPtr_p); // After this point, if ret == OK, this object can be shared by // two threads (the sending thread and the thread polling the // completion queue). Care must be taken to avoid data race. if (ret == LCI_OK) { free(buffer_to_free); + state.store(next_state, std::memory_order_release); + return {return_status_t::wait, completion}; + } + else + { + state.store(current_state, std::memory_order_release); } } - return ret == LCI_OK; + return {return_status_t::retry, nullptr}; } - void sender_connection::done() + void sender_connection_putva::cleanup() { if (!is_eager) { HPX_ASSERT(iovec.count > 0); free(iovec.lbuffers); -#if defined(HPX_HAVE_PARCELPORT_COUNTERS) - data_point_.time_ = - hpx::chrono::high_resolution_clock::now() - data_point_.time_; - pp_->add_sent_data(data_point_); -#endif } error_code ec; handler_(ec); handler_.reset(); buffer_.clear(); + } + void sender_connection_putva::done() + { + profile_end_hook(); + if (!is_eager) + { + cleanup(); + } +#if defined(HPX_HAVE_PARCELPORT_COUNTERS) + data_point_.time_ = + hpx::chrono::high_resolution_clock::now() - data_point_.time_; + pp_->add_sent_data(data_point_); +#endif if (postprocess_handler_) { + // Return this connection to the connection cache. + // After postprocess_handler is invoked, this connection can be + // obtained by another thread. + // so make sure to call this at the very end. hpx::move_only_function)> + parcelset::locality const&, + std::shared_ptr)> postprocess_handler; std::swap(postprocess_handler, postprocess_handler_); error_code ec2; @@ -242,9 +252,12 @@ namespace hpx::parcelset::policies::lci { } } - bool sender_connection::tryMerge( - const std::shared_ptr& other) + bool sender_connection_putva::tryMerge( + const std::shared_ptr& other_base) { + std::shared_ptr other = + std::dynamic_pointer_cast(other_base); + HPX_ASSERT(other); if (!isEager() || !other->isEager()) { // we can only merge eager messages diff --git a/libs/full/parcelport_lci/src/putva/sender_putva.cpp b/libs/full/parcelport_lci/src/putva/sender_putva.cpp new file mode 100644 index 000000000000..59718d480f3a --- /dev/null +++ b/libs/full/parcelport_lci/src/putva/sender_putva.cpp @@ -0,0 +1,30 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2020 Google +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include +#include "hpx/parcelport_lci/putva/sender_connection_putva.hpp" + +#include +#include + +namespace hpx::parcelset::policies::lci { + sender_putva::connection_ptr sender_putva::create_connection( + int dest, parcelset::parcelport* pp) + { + return std::make_shared(dest, pp); + } +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/src/sender.cpp b/libs/full/parcelport_lci/src/sender_base.cpp similarity index 51% rename from libs/full/parcelport_lci/src/sender.cpp rename to libs/full/parcelport_lci/src/sender_base.cpp index 1fea5dba7a17..eb490eb3bd4f 100644 --- a/libs/full/parcelport_lci/src/sender.cpp +++ b/libs/full/parcelport_lci/src/sender_base.cpp @@ -10,52 +10,61 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) +#include + #include #include +#include +#include #include -#include -#include -#include +#include +#include +#include +#include #include +#include +#include namespace hpx::parcelset::policies::lci { - sender::connection_ptr sender::create_connection( - int dest, parcelset::parcelport* pp) - { - return std::make_shared(dest, pp); - } - - bool sender::background_work(size_t /* num_thread */) noexcept + bool sender_base::background_work(size_t /* num_thread */) noexcept { bool did_some_work = false; // try to accept a new connection - LCI_request_t request; - request.flag = LCI_ERR_RETRY; - LCI_queue_pop(util::lci_environment::get_scq(), &request); + LCI_request_t request = pp_->send_completion_manager->poll(); + // LCI_queue_pop(util::lci_environment::get_scq(), &request); if (request.flag == LCI_OK) { - auto* sharedPtr_p = (connection_ptr*) request.user_context; - (*sharedPtr_p)->done(); - delete sharedPtr_p; did_some_work = true; + auto* sharedPtr_p = (connection_ptr*) request.user_context; + sender_connection_base::return_t ret = (*sharedPtr_p)->send(); + if (ret.status == sender_connection_base::return_status_t::done) + { + (*sharedPtr_p)->done(); + delete sharedPtr_p; + } + else if (ret.status == + sender_connection_base::return_status_t::wait) + { + pp_->send_completion_manager->enqueue_completion( + ret.completion); + } } return did_some_work; } - bool sender::send(parcelset::parcelport* pp, + bool sender_base::send_immediate(parcelset::parcelport* pp, parcelset::locality const& dest, parcel_buffer_type buffer, callback_fn_type&& callbackFn) { int dest_rank = dest.get().rank(); - auto connection = std::make_shared(dest_rank, pp); + auto connection = create_connection(dest_rank, pp); connection->buffer_ = HPX_MOVE(buffer); connection->async_write(HPX_MOVE(callbackFn), nullptr); return true; } - } // namespace hpx::parcelset::policies::lci #endif diff --git a/libs/full/parcelport_lci/src/sender_connection_base.cpp b/libs/full/parcelport_lci/src/sender_connection_base.cpp new file mode 100644 index 000000000000..3f6cdd837cd3 --- /dev/null +++ b/libs/full/parcelport_lci/src/sender_connection_base.cpp @@ -0,0 +1,121 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2020 Google +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + void sender_connection_base::async_write( + sender_connection_base::handler_type&& handler, + sender_connection_base::postprocess_handler_type&& parcel_postprocess) + { + load(HPX_FORWARD(handler_type, handler), + HPX_FORWARD(postprocess_handler_type, parcel_postprocess)); + return_t ret = send(); + if (ret.status == return_status_t::done) + { + done(); + } + else if (ret.status == return_status_t::wait) + { + pp_->send_completion_manager->enqueue_completion(ret.completion); + } + } + + sender_connection_base::return_t sender_connection_base::send() + { + if (!config_t::enable_lci_backlog_queue || + HPX_UNLIKELY(pp_->is_sending_early_parcel)) + { + // If we are sending early parcels, we should not expect the + // thread make progress on the backlog queue + return_t ret; + do + { + ret = send_nb(); + if (ret.status == return_status_t::retry && + config_t::progress_type == + config_t::progress_type_t::worker) + while (pp_->do_progress()) + continue; + } while (ret.status == return_status_t::retry); + return ret; + } + else + { + if (!backlog_queue::empty(dst_rank)) + { + backlog_queue::push(shared_from_this()); + return {return_status_t::retry, nullptr}; + } + return_t ret = send_nb(); + if (ret.status == return_status_t::retry) + { + backlog_queue::push(shared_from_this()); + return ret; + } + else + { + return ret; + } + } + } + + void sender_connection_base::profile_start_hook(const header& header_) + { + if (util::lci_environment::log_level < + util::lci_environment::log_level_t::profile) + return; + char buf[1024]; + size_t consumed = 0; + consumed += snprintf(buf + consumed, sizeof(buf) - consumed, + "%d:%lf:send_connection(%p) start:%d:%d:%d:[", LCI_RANK, + hpx::chrono::high_resolution_clock::now() / 1e9, (void*) this, + header_.numbytes_nonzero_copy(), header_.numbytes_tchunk(), + header_.num_zero_copy_chunks()); + HPX_ASSERT(sizeof(buf) > consumed); + for (int i = 0; i < header_.num_zero_copy_chunks(); ++i) + { + std::string format = "%lu,"; + if (i == header_.num_zero_copy_chunks() - 1) + format = "%lu"; + consumed += snprintf(buf + consumed, sizeof(buf) - consumed, + format.c_str(), buffer_.transmission_chunks_[i].second); + HPX_ASSERT(sizeof(buf) > consumed); + } + consumed += snprintf(buf + consumed, sizeof(buf) - consumed, "]\n"); + HPX_ASSERT(sizeof(buf) > consumed); + util::lci_environment::log( + util::lci_environment::log_level_t::profile, "%s", buf); + } + + void sender_connection_base::profile_end_hook() + { + util::lci_environment::log(util::lci_environment::log_level_t::profile, + "%d:%lf:send_connection(%p) end\n", LCI_RANK, + hpx::chrono::high_resolution_clock::now() / 1e9, (void*) this); + } +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp new file mode 100644 index 000000000000..72fd3b5f2345 --- /dev/null +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp @@ -0,0 +1,364 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2020 Google +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + receiver_connection_sendrecv::receiver_connection_sendrecv( + int dst, parcelset::parcelport* pp) + : dst_rank(dst) + , pp_((lci::parcelport*) pp) + { + } + + void receiver_connection_sendrecv::load(char* header_buffer) + { + header header_ = header(header_buffer); + header_.assert_valid(); +#if defined(HPX_HAVE_PARCELPORT_COUNTERS) + buffer.data_point_.bytes_ = + static_cast(header_.numbytes()); + timer_.restart(); +#endif + tag = header_.get_tag(); + // decode data + buffer.data_.allocate(header_.numbytes_nonzero_copy()); + char* piggy_back_data = header_.piggy_back_data(); + if (piggy_back_data) + { + need_recv_data = false; + memcpy(buffer.data_.ptr, piggy_back_data, buffer.data_.length); + } + else + { + need_recv_data = true; + } + need_recv_tchunks = false; + if (header_.num_zero_copy_chunks() != 0) + { + // decode transmission chunk + int num_zero_copy_chunks = header_.num_zero_copy_chunks(); + int num_non_zero_copy_chunks = header_.num_non_zero_copy_chunks(); + buffer.num_chunks_.first = num_zero_copy_chunks; + buffer.num_chunks_.second = num_non_zero_copy_chunks; + auto& tchunks = buffer.transmission_chunks_; + tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); + int tchunks_length = static_cast(tchunks.size() * + sizeof(receiver_base::buffer_type::transmission_chunk_type)); + char* piggy_back_tchunk = header_.piggy_back_tchunk(); + if (piggy_back_tchunk) + { + memcpy( + (void*) tchunks.data(), piggy_back_tchunk, tchunks_length); + } + else + { + need_recv_tchunks = true; + } + // zero-copy chunks + buffer.chunks_.resize(num_zero_copy_chunks); + if (!pp_->allow_zero_copy_receive_optimizations()) + { + chunk_buffers_.resize(num_zero_copy_chunks); + } + } + // Calculate how many recvs we need to make + int num_recv = + need_recv_data + need_recv_tchunks + header_.num_zero_copy_chunks(); + sharedPtr_p = nullptr; + if (num_recv > 0) + { + sharedPtr_p = new std::shared_ptr( + shared_from_this()); + } + // set state + state.store(connection_state::initialized, std::memory_order_release); + recv_chunks_idx = 0; + recv_zero_copy_chunks_idx = 0; + original_tag = tag; + util::lci_environment::log(util::lci_environment::log_level_t::debug, + "recv connection (%d, %d, %d) start!\n", dst_rank, LCI_RANK, tag); + } + + receiver_connection_sendrecv::return_t + receiver_connection_sendrecv::receive() + { + while ( + state.load(std::memory_order_acquire) == connection_state::locked) + { + continue; + } + + switch (state.load(std::memory_order_acquire)) + { + case connection_state::initialized: + return receive_transmission_chunks(); + + case connection_state::rcvd_transmission_chunks: + return receive_data(); + + case connection_state::rcvd_data: + return receive_chunks(); + + case connection_state::rcvd_chunks: + return {true, nullptr}; + + default: + throw std::runtime_error("Unexpected recv state!"); + } + } + + LCI_comp_t receiver_connection_sendrecv::unified_recv( + void* address, int length) + { + LCI_comp_t completion = + pp_->recv_followup_completion_manager->alloc_completion(); + if (length <= LCI_MEDIUM_SIZE) + { + LCI_mbuffer_t mbuffer; + mbuffer.address = address; + mbuffer.length = length; + LCI_error_t ret = LCI_recvm(pp_->endpoint_followup, mbuffer, + dst_rank, tag, completion, sharedPtr_p); + HPX_ASSERT(ret == LCI_OK); + HPX_UNUSED(ret); + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "recvm (%d, %d, %d) tag %d size %d\n", dst_rank, LCI_RANK, + original_tag, tag, length); + tag = (tag + 1) % LCI_MAX_TAG; + } + else + { + LCI_lbuffer_t lbuffer; + lbuffer.segment = LCI_SEGMENT_ALL; + lbuffer.address = address; + lbuffer.length = length; + LCI_error_t ret = LCI_recvl(pp_->endpoint_followup, lbuffer, + dst_rank, tag, completion, sharedPtr_p); + HPX_ASSERT(ret == LCI_OK); + HPX_UNUSED(ret); + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "recvl (%d, %d, %d) tag %d size %d\n", dst_rank, LCI_RANK, + original_tag, tag, length); + tag = (tag + 1) % LCI_MAX_TAG; + } + return completion; + } + + receiver_connection_sendrecv::return_t + receiver_connection_sendrecv::receive_transmission_chunks() + { + const auto current_state = connection_state::initialized; + const auto next_state = connection_state::rcvd_transmission_chunks; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + HPX_UNUSED(current_state); + if (need_recv_tchunks) + { + auto& tchunks = buffer.transmission_chunks_; + int tchunk_length = static_cast(tchunks.size() * + sizeof(receiver_base::buffer_type::transmission_chunk_type)); + state.store(connection_state::locked, std::memory_order_relaxed); + LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length); + state.store(next_state, std::memory_order_release); + return {false, completion}; + } + else + { + state.store(next_state, std::memory_order_release); + return receive_data(); + } + } + + receiver_connection_sendrecv::return_t + receiver_connection_sendrecv::receive_data() + { + const auto current_state = connection_state::rcvd_transmission_chunks; + const auto next_state = connection_state::rcvd_data; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + HPX_UNUSED(current_state); + if (need_recv_data) + { + state.store(connection_state::locked, std::memory_order_relaxed); + LCI_comp_t completion = unified_recv( + buffer.data_.data(), static_cast(buffer.data_.size())); + state.store(next_state, std::memory_order_release); + return {false, completion}; + } + else + { + state.store(next_state, std::memory_order_release); + return receive_chunks(); + } + } + + receiver_connection_sendrecv::return_t + receiver_connection_sendrecv::receive_chunks() + { + if (pp_->allow_zero_copy_receive_optimizations()) + { + return receive_chunks_zc(); + } + else + { + return receive_chunks_nzc(); + } + } + + void receiver_connection_sendrecv::receive_chunks_zc_preprocess() + { + HPX_ASSERT(recv_chunks_idx == 0); + + auto const num_zero_copy_chunks = static_cast( + static_cast(buffer.num_chunks_.first)); + if (num_zero_copy_chunks != 0) + { + HPX_ASSERT(buffer.chunks_.size() == num_zero_copy_chunks); + + // De-serialize the parcels such that all data but the + // zero-copy chunks are in place. This de-serialization also + // allocates all zero-chunk buffers and stores those in the + // chunks array for the subsequent networking to place the + // received data directly. + for (std::size_t i = 0; i != num_zero_copy_chunks; ++i) + { + auto const chunk_size = static_cast( + buffer.transmission_chunks_[i].second); + buffer.chunks_[i] = + serialization::create_pointer_chunk(nullptr, chunk_size); + } + + parcels_ = decode_parcels_zero_copy(*pp_, buffer); + + // note that at this point, buffer_.chunks_ will have + // entries for all chunks, including the non-zero-copy ones + } + + // we should have received at least one parcel if there are + // zero-copy chunks to be received + HPX_ASSERT(parcels_.empty() || !buffer.chunks_.empty()); + } + + receiver_connection_sendrecv::return_t + receiver_connection_sendrecv::receive_chunks_zc() + { + const auto current_state = connection_state::rcvd_data; + const auto next_state = connection_state::rcvd_chunks; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + HPX_UNUSED(current_state); + // handle zero-copy receive, this should be done on the first entry + // to receive_chunks only + if (parcels_.empty()) + { + receive_chunks_zc_preprocess(); + } + + while (recv_chunks_idx != buffer.chunks_.size()) + { + auto& chunk = buffer.chunks_[recv_chunks_idx++]; + if (chunk.type_ == serialization::chunk_type::chunk_type_index) + { + continue; // skip non-zero-copy chunks + } + + // the zero-copy chunks come first in the transmission_chunks_ + // array + std::size_t chunk_size = + buffer.transmission_chunks_[recv_zero_copy_chunks_idx++].second; + + // the parcel de-serialization above should have allocated the + // correct amount of memory + HPX_ASSERT_MSG( + chunk.data() != nullptr && chunk.size() == chunk_size, + "zero-copy chunk buffers should have been initialized " + "during de-serialization"); + HPX_UNUSED(chunk_size); + + state.store(connection_state::locked, std::memory_order_relaxed); + LCI_comp_t completion = + unified_recv(chunk.data(), static_cast(chunk.size())); + state.store(current_state, std::memory_order_release); + return {false, completion}; + } + HPX_ASSERT_MSG(recv_zero_copy_chunks_idx == buffer.num_chunks_.first, + "observed: {}, expected {}", recv_zero_copy_chunks_idx, + buffer.num_chunks_.first); + state.store(next_state, std::memory_order_release); + return {true, nullptr}; + } + + receiver_connection_sendrecv::return_t + receiver_connection_sendrecv::receive_chunks_nzc() + { + const auto current_state = connection_state::rcvd_data; + const auto next_state = connection_state::rcvd_chunks; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + HPX_UNUSED(current_state); + while (recv_chunks_idx < buffer.chunks_.size()) + { + std::size_t idx = recv_chunks_idx++; + std::size_t chunk_size = buffer.transmission_chunks_[idx].second; + auto& chunk = chunk_buffers_[idx]; + chunk.resize(chunk_size); + buffer.chunks_[idx] = + serialization::create_pointer_chunk(chunk.data(), chunk.size()); + state.store(connection_state::locked, std::memory_order_relaxed); + LCI_comp_t completion = + unified_recv(chunk.data(), static_cast(chunk.size())); + state.store(current_state, std::memory_order_release); + return {false, completion}; + } + state.store(next_state, std::memory_order_release); + return {true, nullptr}; + } + + void receiver_connection_sendrecv::done() + { + util::lci_environment::log(util::lci_environment::log_level_t::debug, + "recv connection (%d, %d, %d, %d) done!\n", dst_rank, LCI_RANK, + original_tag, tag - original_tag + 1); +#if defined(HPX_HAVE_PARCELPORT_COUNTERS) + buffer.data_point_.time_ = timer_.elapsed_nanoseconds(); +#endif + + if (parcels_.empty()) + { + // decode and handle received data + HPX_ASSERT(buffer.num_chunks_.first == 0 || + !pp_->allow_zero_copy_receive_optimizations()); + handle_received_parcels(decode_parcels(*pp_, HPX_MOVE(buffer))); + chunk_buffers_.clear(); + } + else + { + // handle the received zero-copy parcels. + HPX_ASSERT(buffer.num_chunks_.first != 0 && + pp_->allow_zero_copy_receive_optimizations()); + handle_received_parcels(HPX_MOVE(parcels_)); + } + buffer.data_.free(); + parcels_.clear(); + } +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp new file mode 100644 index 000000000000..0e002ee7c888 --- /dev/null +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp @@ -0,0 +1,116 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2020 Google +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include +#include + +#include +#include + +namespace hpx::parcelset::policies::lci { + receiver_sendrecv::connection_ptr receiver_sendrecv::create_connection( + int dest, parcelset::parcelport* pp) + { + return std::make_shared(dest, pp); + } + + bool receiver_sendrecv::background_work() noexcept + { + bool did_something = false; + // We first try to accept a new connection + did_something = accept_new() || did_something; + // We will then try to make progress on existing connections + did_something = followup() || did_something; + return did_something; + } + + bool receiver_sendrecv::accept_new() noexcept + { + request_wrapper_t request; + request.request = pp_->recv_new_completion_manager->poll(); + + if (request.request.flag == LCI_OK) + { + if (config_t::protocol == config_t::protocol_t::sendrecv) + { + LCI_comp_t completion = + pp_->recv_new_completion_manager->alloc_completion(); + LCI_recvmn(pp_->endpoint_new_eager, LCI_RANK_ANY, 0, completion, + nullptr); + pp_->recv_new_completion_manager->enqueue_completion( + completion); + } + HPX_ASSERT(request.request.flag == LCI_OK); + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "accept_new (%d, %d, %d) length %lu\n", request.request.rank, + LCI_RANK, request.request.tag, + request.request.data.mbuffer.length); + connection_ptr connection = + create_connection(request.request.rank, pp_); + connection->load((char*) request.request.data.mbuffer.address); + receiver_connection_sendrecv::return_t ret = connection->receive(); + if (ret.isDone) + { + connection->done(); + } + else + { + pp_->recv_followup_completion_manager->enqueue_completion( + ret.completion); + } + return true; + } + return false; + } + + bool receiver_sendrecv::followup() noexcept + { + // We don't use a request_wrapper here because all the receive buffers + // should be managed by the connections + LCI_request_t request = pp_->recv_followup_completion_manager->poll(); + + if (request.flag == LCI_OK) + { + HPX_ASSERT(request.user_context); + auto* sharedPtr_p = (connection_ptr*) request.user_context; + size_t length; + if (request.type == LCI_MEDIUM) + length = request.data.mbuffer.length; + else + length = request.data.lbuffer.length; + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "followup (%d, %d, %d) length %lu\n", request.rank, LCI_RANK, + request.tag, length); + receiver_connection_sendrecv::return_t ret = + (*sharedPtr_p)->receive(); + if (ret.isDone) + { + (*sharedPtr_p)->done(); + delete sharedPtr_p; + } + else + { + pp_->recv_followup_completion_manager->enqueue_completion( + ret.completion); + } + return true; + } + return false; + } +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp new file mode 100644 index 000000000000..5793bc6f7788 --- /dev/null +++ b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp @@ -0,0 +1,373 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2020 Google +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace hpx::parcelset::policies::lci { + + std::atomic sender_connection_sendrecv::next_tag = 0; + + void sender_connection_sendrecv::load( + sender_connection_sendrecv::handler_type&& handler, + sender_connection_sendrecv::postprocess_handler_type&& + parcel_postprocess) + { +#if defined(HPX_HAVE_PARCELPORT_COUNTERS) + timer_.restart(); +#endif + + HPX_ASSERT(!handler_); + HPX_ASSERT(!postprocess_handler_); + HPX_ASSERT(!buffer_.data_.empty()); + handler_ = HPX_FORWARD(Handler, handler); + postprocess_handler_ = + HPX_FORWARD(ParcelPostprocess, parcel_postprocess); + + // build header + while (LCI_mbuffer_alloc(pp_->device_eager, &header_buffer) != LCI_OK) + continue; + HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE); + header_ = header( + buffer_, (char*) header_buffer.address, header_buffer.length); + header_buffer.length = header_.size(); + HPX_ASSERT((header_.num_zero_copy_chunks() == 0) == + buffer_.transmission_chunks_.empty()); + need_send_data = false; + need_send_tchunks = false; + // Calculate how many sends we need to make + int num_send = 0; // header doesn't need to do tag matching + if (header_.piggy_back_data() == nullptr) + { + need_send_data = true; + ++num_send; + } + if (header_.num_zero_copy_chunks() != 0) + { + if (header_.piggy_back_tchunk() == nullptr) + { + need_send_tchunks = true; + ++num_send; + } + num_send += header_.num_zero_copy_chunks(); + } + tag = 0; // If no need to post send, then tag can be ignored. + sharedPtr_p = nullptr; + if (num_send > 0) + { + tag = next_tag.fetch_add(num_send) % LCI_MAX_TAG; + sharedPtr_p = new std::shared_ptr( + std::dynamic_pointer_cast( + shared_from_this())); + } + if ((int) tag <= LCI_MAX_TAG && (int) tag + num_send > LCI_MAX_TAG) + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "Rank %d Wrap around!\n", LCI_RANK); + header_.set_tag(tag); + send_chunks_idx = 0; + // set state + profile_start_hook(header_); + state.store(connection_state::initialized, std::memory_order_release); + original_tag = tag; + util::lci_environment::log(util::lci_environment::log_level_t::debug, + "send connection (%d, %d, %d, %d) tchunks " + "%d data %d chunks %d start!\n", + LCI_RANK, dst_rank, original_tag, num_send, + header_.piggy_back_tchunk() != nullptr, + header_.piggy_back_data() != nullptr, + header_.num_zero_copy_chunks()); + } + + sender_connection_sendrecv::return_t sender_connection_sendrecv::send_nb() + { + while ( + state.load(std::memory_order_acquire) == connection_state::locked) + { + continue; + } + + switch (state.load(std::memory_order_acquire)) + { + case connection_state::initialized: + return send_header(); + + case connection_state::sent_header: + return send_transmission_chunks(); + + case connection_state::sent_transmission_chunks: + return send_data(); + + case connection_state::sent_data: + return send_chunks(); + + case connection_state::sent_chunks: + return {return_status_t::done, nullptr}; + + default: + throw std::runtime_error("Unexpected send state!"); + } + } + + sender_connection_sendrecv::return_t + sender_connection_sendrecv::send_header() + { + const auto current_state = connection_state::initialized; + const auto next_state = connection_state::sent_header; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + HPX_UNUSED(current_state); + LCI_error_t ret; + if (config_t::protocol == config_t::protocol_t::putsendrecv) + { + ret = LCI_putmna(pp_->endpoint_new_eager, header_buffer, dst_rank, + 0, LCI_DEFAULT_COMP_REMOTE); + } + else + { + HPX_ASSERT(config_t::protocol == config_t::protocol_t::sendrecv); + ret = + LCI_sendmn(pp_->endpoint_new_eager, header_buffer, dst_rank, 0); + } + if (ret == LCI_OK) + { + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "%s (%d, %d, %d) length %lu\n", + config_t::protocol == config_t::protocol_t::putsendrecv ? + "LCI_putmna" : + "LCI_sendmn", + LCI_RANK, dst_rank, tag, header_buffer.length); + state.store(next_state, std::memory_order_release); + return send_transmission_chunks(); + } + else + { + HPX_ASSERT(ret == LCI_ERR_RETRY); + return {return_status_t::retry, nullptr}; + } + } + + sender_connection_sendrecv::return_t + sender_connection_sendrecv::unified_followup_send(void* address, int length) + { + if (length <= LCI_MEDIUM_SIZE) + { + LCI_mbuffer_t buffer; + buffer.address = address; + buffer.length = length; + LCI_error_t ret = + LCI_sendm(pp_->endpoint_followup, buffer, dst_rank, tag); + if (ret == LCI_OK) + { + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "sendm (%d, %d, %d) tag %d size %d\n", LCI_RANK, dst_rank, + original_tag, tag, length); + tag = (tag + 1) % LCI_MAX_TAG; + return {return_status_t::done, nullptr}; + } + else + { + HPX_ASSERT(ret == LCI_ERR_RETRY); + return {return_status_t::retry, nullptr}; + } + } + else + { + LCI_lbuffer_t buffer; + buffer.segment = LCI_SEGMENT_ALL; + buffer.address = address; + buffer.length = length; + LCI_comp_t completion = + pp_->send_completion_manager->alloc_completion(); + LCI_error_t ret = LCI_sendl(pp_->endpoint_followup, buffer, + dst_rank, tag, completion, sharedPtr_p); + if (ret == LCI_OK) + { + util::lci_environment::log( + util::lci_environment::log_level_t::debug, + "sendl (%d, %d, %d) tag %d size %d\n", LCI_RANK, dst_rank, + original_tag, tag, length); + tag = (tag + 1) % LCI_MAX_TAG; + return {return_status_t::wait, completion}; + } + else + { + HPX_ASSERT(ret == LCI_ERR_RETRY); + return {return_status_t::retry, nullptr}; + } + } + } + + sender_connection_sendrecv::return_t + sender_connection_sendrecv::send_transmission_chunks() + { + const auto current_state = connection_state::sent_header; + const auto next_state = connection_state::sent_transmission_chunks; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + if (!need_send_tchunks) + { + state.store(next_state, std::memory_order_release); + return send_data(); + } + + std::vector& + tchunks = buffer_.transmission_chunks_; + int tchunks_size = (int) tchunks.size() * + sizeof(parcel_buffer_type::transmission_chunk_type); + state.store(connection_state::locked, std::memory_order_relaxed); + auto ret = unified_followup_send(tchunks.data(), tchunks_size); + if (ret.status == return_status_t::done) + { + state.store(next_state, std::memory_order_release); + return send_data(); + } + else if (ret.status == return_status_t::retry) + { + state.store(current_state, std::memory_order_release); + return ret; + } + else + { + state.store(next_state, std::memory_order_release); + return ret; + } + } + + sender_connection_sendrecv::return_t sender_connection_sendrecv::send_data() + { + const auto current_state = connection_state::sent_transmission_chunks; + const auto next_state = connection_state::sent_data; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + if (!need_send_data) + { + state.store(next_state, std::memory_order_release); + return send_chunks(); + } + state.store(connection_state::locked, std::memory_order_relaxed); + auto ret = + unified_followup_send(buffer_.data_.data(), buffer_.data_.size()); + if (ret.status == return_status_t::done) + { + state.store(next_state, std::memory_order_release); + return send_chunks(); + } + else if (ret.status == return_status_t::retry) + { + state.store(current_state, std::memory_order_release); + return ret; + } + else + { + state.store(next_state, std::memory_order_release); + return ret; + } + } + + sender_connection_sendrecv::return_t + sender_connection_sendrecv::send_chunks() + { + const auto current_state = connection_state::sent_data; + const auto next_state = connection_state::sent_chunks; + HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); + + while (send_chunks_idx < buffer_.chunks_.size()) + { + serialization::serialization_chunk& chunk = + buffer_.chunks_[send_chunks_idx]; + if (chunk.type_ == serialization::chunk_type::chunk_type_pointer) + { + state.store( + connection_state::locked, std::memory_order_relaxed); + auto ret = + unified_followup_send(const_cast(chunk.data_.cpos_), + static_cast(chunk.size_)); + if (ret.status == return_status_t::done) + { + ++send_chunks_idx; + state.store(current_state, std::memory_order_release); + continue; + } + else if (ret.status == return_status_t::retry) + { + state.store(current_state, std::memory_order_release); + return ret; + } + else + { + ++send_chunks_idx; + state.store(current_state, std::memory_order_release); + return ret; + } + } + else + { + ++send_chunks_idx; + } + } + + state.store(next_state, std::memory_order_release); + return {return_status_t::done, nullptr}; + } + + void sender_connection_sendrecv::done() + { + util::lci_environment::log(util::lci_environment::log_level_t::debug, + "send connection (%d, %d, %d, %d) done!\n", LCI_RANK, dst_rank, + original_tag, tag - original_tag + 1); + profile_end_hook(); + error_code ec; + handler_(ec); + handler_.reset(); +#if defined(HPX_HAVE_PARCELPORT_COUNTERS) + data_point_.time_ = timer_.elapsed_nanoseconds(); + pp_->add_sent_data(data_point_); +#endif + buffer_.clear(); + + if (postprocess_handler_) + { + // Return this connection to the connection cache. + // After postprocess_handler is invoked, this connection can be + // obtained by another thread. + // so make sure to call this at the very end. + hpx::move_only_function)> + postprocess_handler; + std::swap(postprocess_handler, postprocess_handler_); + error_code ec2; + postprocess_handler(ec2, there_, shared_from_this()); + } + } + + bool sender_connection_sendrecv::tryMerge( + const std::shared_ptr& other_base) + { + // We cannot merge any message here. + HPX_UNUSED(other_base); + return false; + } +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/src/sendrecv/sender_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/sender_sendrecv.cpp new file mode 100644 index 000000000000..1ee5ee25a5a6 --- /dev/null +++ b/libs/full/parcelport_lci/src/sendrecv/sender_sendrecv.cpp @@ -0,0 +1,29 @@ +// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2020 Google +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include +#include +#include +#include +#include + +#include + +namespace hpx::parcelset::policies::lci { + sender_sendrecv::connection_ptr sender_sendrecv::create_connection( + int dest, parcelset::parcelport* pp) + { + return std::make_shared(dest, pp); + } +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelset/include/hpx/parcelset/parcelport_impl.hpp b/libs/full/parcelset/include/hpx/parcelset/parcelport_impl.hpp index 85e29ae738ba..4600276d6e7b 100644 --- a/libs/full/parcelset/include/hpx/parcelset/parcelport_impl.hpp +++ b/libs/full/parcelset/include/hpx/parcelset/parcelport_impl.hpp @@ -546,7 +546,7 @@ namespace hpx::parcelset { std::make_move_iterator(ps), std::make_move_iterator(ps + encoded_parcels))); - if (ConnectionHandler::sender_type::send( + if (connection_handler().send_immediate( this, dest_, HPX_MOVE(buffer), HPX_MOVE(callback_fn))) { // we don't propagate errors for now