Skip to content

Commit

Permalink
fix(ExecutorNotifyWaitable): Keep strong references to used guard con…
Browse files Browse the repository at this point in the history
…ditions

A waitable should make sure, that all entities it adds to the waitset
are alive during the wait. The ExecutorNotifyWaitable did not do this,
leading to crashes, if a callback group was dropped while waiting.
This commit changes this behavior, by holding shared pointers to the
used guard condition.
Also while at it, fixed a possible race, were a trigger could get lost.
Optimized the is_ready call by using the indices returned by rcl.

Signed-off-by: Janosch Machowinski <[email protected]>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Jan 9, 2025
1 parent 05284e6 commit ff56953
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 45 deletions.
31 changes: 28 additions & 3 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <mutex>
#include <set>
#include <vector>

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -41,7 +42,9 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
* of this waitable has signaled the wait_set.
*/
RCLCPP_PUBLIC
explicit ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback = {});
explicit ExecutorNotifyWaitable(
std::function<void(void)> on_execute_callback = {}, const rclcpp::Context::SharedPtr & context =
rclcpp::contexts::get_global_default_context());

// Destructor
RCLCPP_PUBLIC
Expand Down Expand Up @@ -157,9 +160,31 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable

std::function<void(size_t)> on_ready_callback_;

/// Helper class to hold a shard ptr of a guard condition,
/// while still be able to be comparable to a weak pointer of it
class GuardHolder
{
public:
explicit GuardHolder(rclcpp::GuardCondition::WeakPtr & weak_ptr);

bool operator<(const GuardHolder & other) const noexcept;

rclcpp::GuardCondition::WeakPtr weak_reference;
rclcpp::GuardCondition::SharedPtr strong_reference;
};

/// The collection of guard conditions to be waited on.
std::set<rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
std::set<GuardHolder> notify_guard_conditions_;

/// The indixes were our guard conditions were stored in the
/// rcl waitset
std::vector<size_t> idxs_of_added_guard_condition_;

/// set to true, if we got a pending trigger
bool needs_processing = false;

/// A guard condition needed to generate wakeups
rclcpp::GuardCondition::SharedPtr guard_condition_;
};

} // namespace executors
Expand Down
19 changes: 1 addition & 18 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
this->entities_need_rebuild_.store(true);
})),
}, options.context)),
entities_need_rebuild_(true),
collector_(notify_waitable_),
wait_set_({}, {}, {}, {}, {}, {}, options.context),
Expand Down Expand Up @@ -732,33 +732,16 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Clear any previous wait result
this->wait_result_.reset();

// we need to make sure that callback groups don't get out of scope
// during the wait. As in jazzy, they are not covered by the DynamicStorage,
// we explicitly hold them here as a bugfix
std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;

{
std::lock_guard<std::mutex> guard(mutex_);

if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}

auto callback_groups = this->collector_.get_all_callback_groups();
cbgs.resize(callback_groups.size());
for(const auto & w_ptr : callback_groups) {
auto shr_ptr = w_ptr.lock();
if(shr_ptr) {
cbgs.push_back(std::move(shr_ptr));
}
}
}

this->wait_result_.emplace(wait_set_.wait(timeout));

// drop references to the callback groups, before trying to execute anything
cbgs.clear();

if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
Expand Down
101 changes: 77 additions & 24 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,23 @@ namespace rclcpp
namespace executors
{

ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback)
: execute_callback_(on_execute_callback)
ExecutorNotifyWaitable::GuardHolder::GuardHolder(rclcpp::GuardCondition::WeakPtr & weak_ptr)
: weak_reference(weak_ptr),
strong_reference(weak_ptr.lock())
{
}

bool ExecutorNotifyWaitable::GuardHolder::operator<(const GuardHolder & other) const noexcept
{
return std::owner_less<rclcpp::GuardCondition::WeakPtr>()(this->weak_reference,
other.weak_reference);
}

ExecutorNotifyWaitable::ExecutorNotifyWaitable(
std::function<void(void)> on_execute_callback,
const rclcpp::Context::SharedPtr & context)
: execute_callback_(on_execute_callback),
guard_condition_(std::make_shared<rclcpp::GuardCondition>(context))
{
}

Expand All @@ -30,6 +45,9 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(ExecutorNotifyWaitable & other)
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
this->execute_callback_ = other.execute_callback_;
this->notify_guard_conditions_ = other.notify_guard_conditions_;
this->guard_condition_ = other.guard_condition_;
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
this->needs_processing = other.needs_processing;
}

ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other)
Expand All @@ -38,6 +56,9 @@ ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitabl
std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
this->execute_callback_ = other.execute_callback_;
this->notify_guard_conditions_ = other.notify_guard_conditions_;
this->guard_condition_ = other.guard_condition_;
this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
this->needs_processing = other.needs_processing;
}
return *this;
}
Expand All @@ -47,21 +68,45 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

idxs_of_added_guard_condition_.clear();
idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size());

if(needs_processing) {
rcl_guard_condition_t * cond = &guard_condition_->get_rcl_guard_condition();
size_t rcl_index;
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}

idxs_of_added_guard_condition_.push_back(rcl_index);

// we want to directly wake up any way, not need to add the other guard conditions
guard_condition_->trigger();

return;
}

// Note: no guard conditions need to be re-triggered, since the guard
// conditions in this class are not tracking a stateful condition, but instead
// only serve to interrupt the wait set when new information is available to
// consider.
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
for (const auto & guard_holder : this->notify_guard_conditions_) {
const auto & guard_condition = guard_holder.strong_reference;
if (!guard_condition) {continue;}

rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL);
size_t rcl_index;
rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}

idxs_of_added_guard_condition_.push_back(rcl_index);
}
}

Expand All @@ -71,27 +116,33 @@ ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t & wait_set)
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

bool any_ready = false;
for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
const auto * rcl_guard_condition = wait_set.guard_conditions[ii];
for (size_t rcl_index : idxs_of_added_guard_condition_) {
if(rcl_index >= wait_set.size_of_guard_conditions) {
throw std::runtime_error(
"ExecutorNotifyWaitable::is_ready: Internal error, got index out of range");
}

const auto * rcl_guard_condition = wait_set.guard_conditions[rcl_index];

if (nullptr == rcl_guard_condition) {
continue;
}
for (const auto & weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) {
any_ready = true;
break;
}
}

any_ready = true;
needs_processing = true;
break;
}

return any_ready;
}

void
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & /*data*/)
{
std::lock_guard<std::mutex> lock(execute_mutex_);

needs_processing = false;

this->execute_callback_();
}

Expand Down Expand Up @@ -122,8 +173,8 @@ ExecutorNotifyWaitable::set_on_ready_callback(std::function<void(size_t, int)> c
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = gc_callback;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
for (const auto & guard_holder : notify_guard_conditions_) {
const auto & gc = guard_holder.strong_reference;
if (!gc) {
continue;
}
Expand All @@ -138,8 +189,8 @@ ExecutorNotifyWaitable::clear_on_ready_callback()
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = nullptr;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
for (const auto & guard_holder : notify_guard_conditions_) {
const auto & gc = guard_holder.strong_reference;
if (!gc) {
continue;
}
Expand All @@ -159,9 +210,10 @@ void
ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
auto guard_condition = weak_guard_condition.lock();
if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) {
notify_guard_conditions_.insert(weak_guard_condition);
auto holder = GuardHolder(weak_guard_condition);
const auto & guard_condition = holder.strong_reference;
if (guard_condition && notify_guard_conditions_.count(holder) == 0) {
notify_guard_conditions_.insert(holder);
if (on_ready_callback_) {
guard_condition->set_on_trigger_callback(on_ready_callback_);
}
Expand All @@ -172,9 +224,10 @@ void
ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
if (notify_guard_conditions_.count(weak_guard_condition) != 0) {
notify_guard_conditions_.erase(weak_guard_condition);
auto guard_condition = weak_guard_condition.lock();
auto holder = GuardHolder(weak_guard_condition);
if (notify_guard_conditions_.count(holder) != 0) {
notify_guard_conditions_.erase(holder);
const auto & guard_condition = holder.strong_reference;
// If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset
if (guard_condition && on_ready_callback_) {
guard_condition->set_on_trigger_callback(nullptr);
Expand Down

0 comments on commit ff56953

Please sign in to comment.