diff --git a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp index 7ae1c3b875..ff4abb08f2 100644 --- a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rclcpp/guard_condition.hpp" #include "rclcpp/waitable.hpp" @@ -41,7 +42,9 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable * of this waitable has signaled the wait_set. */ RCLCPP_PUBLIC - explicit ExecutorNotifyWaitable(std::function on_execute_callback = {}); + explicit ExecutorNotifyWaitable( + std::function on_execute_callback = {}, const rclcpp::Context::SharedPtr & context = + rclcpp::contexts::get_global_default_context()); // Destructor RCLCPP_PUBLIC @@ -157,9 +160,31 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable std::function on_ready_callback_; + /// Helper class to hold a shard ptr of a guard condition, + /// while still be able to be comparable to a weak pointer of it + class GuardHolder + { +public: + explicit GuardHolder(rclcpp::GuardCondition::WeakPtr & weak_ptr); + + bool operator<(const GuardHolder & other) const noexcept; + + rclcpp::GuardCondition::WeakPtr weak_reference; + rclcpp::GuardCondition::SharedPtr strong_reference; + }; + /// The collection of guard conditions to be waited on. - std::set> notify_guard_conditions_; + std::set notify_guard_conditions_; + + /// The indixes were our guard conditions were stored in the + /// rcl waitset + std::vector idxs_of_added_guard_condition_; + + /// set to true, if we got a pending trigger + bool needs_processing = false; + + /// A guard condition needed to generate wakeups + rclcpp::GuardCondition::SharedPtr guard_condition_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 160709fb98..260b921044 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -66,7 +66,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) notify_waitable_(std::make_shared( [this]() { this->entities_need_rebuild_.store(true); - })), + }, options.context)), entities_need_rebuild_(true), collector_(notify_waitable_), wait_set_({}, {}, {}, {}, {}, {}, options.context), @@ -732,33 +732,16 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) // Clear any previous wait result this->wait_result_.reset(); - // we need to make sure that callback groups don't get out of scope - // during the wait. As in jazzy, they are not covered by the DynamicStorage, - // we explicitly hold them here as a bugfix - std::vector cbgs; - { std::lock_guard guard(mutex_); if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } - - auto callback_groups = this->collector_.get_all_callback_groups(); - cbgs.resize(callback_groups.size()); - for(const auto & w_ptr : callback_groups) { - auto shr_ptr = w_ptr.lock(); - if(shr_ptr) { - cbgs.push_back(std::move(shr_ptr)); - } - } } this->wait_result_.emplace(wait_set_.wait(timeout)); - // drop references to the callback groups, before trying to execute anything - cbgs.clear(); - if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) { RCUTILS_LOG_WARN_NAMED( "rclcpp", diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index 2e62f9dd1a..8397b0c868 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -20,8 +20,23 @@ namespace rclcpp namespace executors { -ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function on_execute_callback) -: execute_callback_(on_execute_callback) +ExecutorNotifyWaitable::GuardHolder::GuardHolder(rclcpp::GuardCondition::WeakPtr & weak_ptr) +: weak_reference(weak_ptr), + strong_reference(weak_ptr.lock()) +{ +} + +bool ExecutorNotifyWaitable::GuardHolder::operator<(const GuardHolder & other) const noexcept +{ + return std::owner_less()(this->weak_reference, + other.weak_reference); +} + +ExecutorNotifyWaitable::ExecutorNotifyWaitable( + std::function on_execute_callback, + const rclcpp::Context::SharedPtr & context) +: execute_callback_(on_execute_callback), + guard_condition_(std::make_shared(context)) { } @@ -30,6 +45,9 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(ExecutorNotifyWaitable & other) std::lock_guard lock(other.guard_condition_mutex_); this->execute_callback_ = other.execute_callback_; this->notify_guard_conditions_ = other.notify_guard_conditions_; + this->guard_condition_ = other.guard_condition_; + this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_; + this->needs_processing = other.needs_processing; } ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other) @@ -38,6 +56,9 @@ ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitabl std::lock_guard lock(other.guard_condition_mutex_); this->execute_callback_ = other.execute_callback_; this->notify_guard_conditions_ = other.notify_guard_conditions_; + this->guard_condition_ = other.guard_condition_; + this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_; + this->needs_processing = other.needs_processing; } return *this; } @@ -47,21 +68,45 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set) { std::lock_guard lock(guard_condition_mutex_); + idxs_of_added_guard_condition_.clear(); + idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size()); + + if(needs_processing) { + rcl_guard_condition_t * cond = &guard_condition_->get_rcl_guard_condition(); + size_t rcl_index; + rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } + + idxs_of_added_guard_condition_.push_back(rcl_index); + + // we want to directly wake up any way, not need to add the other guard conditions + guard_condition_->trigger(); + + return; + } + // Note: no guard conditions need to be re-triggered, since the guard // conditions in this class are not tracking a stateful condition, but instead // only serve to interrupt the wait set when new information is available to // consider. - for (auto weak_guard_condition : this->notify_guard_conditions_) { - auto guard_condition = weak_guard_condition.lock(); + for (const auto & guard_holder : this->notify_guard_conditions_) { + const auto & guard_condition = guard_holder.strong_reference; if (!guard_condition) {continue;} rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition(); - rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL); + size_t rcl_index; + rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error( ret, "failed to add guard condition to wait set"); } + + idxs_of_added_guard_condition_.push_back(rcl_index); } } @@ -71,20 +116,23 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set) std::lock_guard lock(guard_condition_mutex_); bool any_ready = false; - for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) { - const auto * rcl_guard_condition = wait_set.guard_conditions[ii]; + for (size_t rcl_index : idxs_of_added_guard_condition_) { + if(rcl_index >= wait_set.size_of_guard_conditions) { + throw std::runtime_error( + "ExecutorNotifyWaitable::is_ready: Internal error, got index out of range"); + } + + const auto * rcl_guard_condition = wait_set.guard_conditions[rcl_index]; if (nullptr == rcl_guard_condition) { continue; } - for (const auto & weak_guard_condition : this->notify_guard_conditions_) { - auto guard_condition = weak_guard_condition.lock(); - if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) { - any_ready = true; - break; - } - } + + any_ready = true; + needs_processing = true; + break; } + return any_ready; } @@ -92,6 +140,9 @@ void ExecutorNotifyWaitable::execute(const std::shared_ptr & /*data*/) { std::lock_guard lock(execute_mutex_); + + needs_processing = false; + this->execute_callback_(); } @@ -122,8 +173,8 @@ ExecutorNotifyWaitable::set_on_ready_callback(std::function c std::lock_guard lock(guard_condition_mutex_); on_ready_callback_ = gc_callback; - for (auto weak_gc : notify_guard_conditions_) { - auto gc = weak_gc.lock(); + for (const auto & guard_holder : notify_guard_conditions_) { + const auto & gc = guard_holder.strong_reference; if (!gc) { continue; } @@ -138,8 +189,8 @@ ExecutorNotifyWaitable::clear_on_ready_callback() std::lock_guard lock(guard_condition_mutex_); on_ready_callback_ = nullptr; - for (auto weak_gc : notify_guard_conditions_) { - auto gc = weak_gc.lock(); + for (const auto & guard_holder : notify_guard_conditions_) { + const auto & gc = guard_holder.strong_reference; if (!gc) { continue; } @@ -159,9 +210,10 @@ void ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition) { std::lock_guard lock(guard_condition_mutex_); - auto guard_condition = weak_guard_condition.lock(); - if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) { - notify_guard_conditions_.insert(weak_guard_condition); + auto holder = GuardHolder(weak_guard_condition); + const auto & guard_condition = holder.strong_reference; + if (guard_condition && notify_guard_conditions_.count(holder) == 0) { + notify_guard_conditions_.insert(holder); if (on_ready_callback_) { guard_condition->set_on_trigger_callback(on_ready_callback_); } @@ -172,9 +224,10 @@ void ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition) { std::lock_guard lock(guard_condition_mutex_); - if (notify_guard_conditions_.count(weak_guard_condition) != 0) { - notify_guard_conditions_.erase(weak_guard_condition); - auto guard_condition = weak_guard_condition.lock(); + auto holder = GuardHolder(weak_guard_condition); + if (notify_guard_conditions_.count(holder) != 0) { + notify_guard_conditions_.erase(holder); + const auto & guard_condition = holder.strong_reference; // If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset if (guard_condition && on_ready_callback_) { guard_condition->set_on_trigger_callback(nullptr);