Skip to content

Commit

Permalink
Ensure waitables handle guard condition retriggering (#2483)
Browse files Browse the repository at this point in the history
Signed-off-by: William Woodall <[email protected]>
Signed-off-by: Michael Carroll <[email protected]>
Co-authored-by: Michael Carroll <[email protected]>
  • Loading branch information
wjwwood and mjcarroll authored Apr 8, 2024
1 parent 63c2e2e commit bfa7efa
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
RCLCPP_PUBLIC
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);


RCLCPP_PUBLIC
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ class SubscriptionIntraProcess

virtual ~SubscriptionIntraProcess() = default;

void
add_to_wait_set(rcl_wait_set_t & wait_set) override
{
// This block is necessary when the guard condition wakes the wait set, but
// the intra process waitable was not handled before the wait set is waited
// on again.
// Basically we're keeping the guard condition triggered so long as there is
// data in the buffer.
if (this->buffer_->has_data()) {
// If there is data still to be processed, indicate to the
// executor or waitset by triggering the guard condition.
this->trigger_guard_condition();
}
// Let the parent classes handle the rest of the work:
return SubscriptionIntraProcessBufferT::add_to_wait_set(wait_set);
}

std::shared_ptr<void>
take_data() override
{
Expand Down Expand Up @@ -149,7 +166,7 @@ class SubscriptionIntraProcess
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
execute_impl(const std::shared_ptr<void> & data)
{
if (!data) {
if (nullptr == data) {
return;
}

Expand Down
23 changes: 11 additions & 12 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include <memory>
#include <vector>
#include <utility>

#include "rcl/allocator.h"

Expand Down Expand Up @@ -121,8 +120,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (waitable_handles_[i]->is_ready(*wait_set)) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
if (!waitable_handles_[i]->is_ready(*wait_set)) {
waitable_handles_[i].reset();
}
}

Expand All @@ -146,7 +145,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
timer_handles_.end()
);

waitable_handles_.clear();
waitable_handles_.erase(
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
waitable_handles_.end()
);
}

bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
Expand Down Expand Up @@ -390,17 +392,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
rclcpp::AnyExecutable & any_exec,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
auto & waitable_handles = waitable_triggered_handles_;
auto it = waitable_handles.begin();
while (it != waitable_handles.end()) {
auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) {
std::shared_ptr<Waitable> & waitable = *it;
if (waitable) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
if (!group) {
// Group was not found, meaning the waitable is not valid...
// Remove it from the ready list and continue looking
it = waitable_handles.erase(it);
it = waitable_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
Expand All @@ -413,11 +414,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
any_exec.waitable = waitable;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
waitable_handles.erase(it);
waitable_handles_.erase(it);
return;
}
// Else, the waitable is no longer valid, remove it and continue
it = waitable_handles.erase(it);
it = waitable_handles_.erase(it);
}
}

Expand Down Expand Up @@ -498,8 +499,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;

VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;

std::shared_ptr<VoidAlloc> allocator_;
};

Expand Down
1 change: 1 addition & 0 deletions rclcpp/src/rclcpp/event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <stdexcept>
#include <string>

#include "rcl/error_handling.h"
#include "rcl/event.h"

#include "rclcpp/event_handler.hpp"
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void
ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t & wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

// Note: no guard conditions need to be re-triggered, since the guard
// conditions in this class are not tracking a stateful condition, but instead
// only serve to interrupt the wait set when new information is available to
// consider.
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (!guard_condition) {continue;}
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ if(TARGET test_thread_safe_synchronization)
target_link_libraries(test_thread_safe_synchronization ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_intra_process_waitable waitables/test_intra_process_waitable.cpp)
if(TARGET test_intra_process_waitable)
target_link_libraries(test_intra_process_waitable ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_rosout_qos test_rosout_qos.cpp)
if(TARGET test_rosout_qos)
target_link_libraries(test_rosout_qos ${PROJECT_NAME} rcl::rcl rmw::rmw)
Expand Down
176 changes: 8 additions & 168 deletions rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,57 +60,6 @@ class TestWaitable : public rclcpp::Waitable
void execute(const std::shared_ptr<void> &) override {}
};

static bool test_waitable_result2 = false;

class TestWaitable2 : public rclcpp::Waitable
{
public:
explicit TestWaitable2(rcl_publisher_t * pub_ptr)
: pub_ptr_(pub_ptr),
pub_event_(rcl_get_zero_initialized_event())
{
EXPECT_EQ(
rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
RCL_RET_OK);
}

~TestWaitable2()
{
EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK);
}

void add_to_wait_set(rcl_wait_set_t & wait_set) override
{
EXPECT_EQ(rcl_wait_set_add_event(&wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK);
}

bool is_ready(const rcl_wait_set_t &) override
{
return test_waitable_result2;
}

std::shared_ptr<void>
take_data() override
{
return nullptr;
}

void execute(const std::shared_ptr<void> & data) override
{
(void) data;
}

size_t get_number_of_ready_events() override
{
return 1;
}

private:
rcl_publisher_t * pub_ptr_;
rcl_event_t pub_event_;
size_t wait_set_event_index_;
};

struct RclWaitSetSizes
{
size_t size_of_subscriptions = 0;
Expand Down Expand Up @@ -705,129 +654,20 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) {
}

TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) {
auto node1 = std::make_shared<rclcpp::Node>("waitable_node", "ns");
auto node2 = std::make_shared<rclcpp::Node>("waitable_node2", "ns");
rclcpp::Waitable::SharedPtr waitable1 = std::make_shared<TestWaitable>();
rclcpp::Waitable::SharedPtr waitable2 = std::make_shared<TestWaitable>();
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) {
rclcpp::AnyExecutable result;
allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes);
return result;
};

{
auto node1 = std::make_shared<rclcpp::Node>(
"waitable_node", "ns",
rclcpp::NodeOptions()
.start_parameter_event_publisher(false)
.start_parameter_services(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub1 = node1->create_publisher<test_msgs::msg::Empty>(
"test_topic_1", rclcpp::QoS(10), pub_options);

auto waitable1 =
std::make_shared<TestWaitable2>(pub1->get_publisher_handle().get());
node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr);

auto basic_node = create_node_with_disabled_callback_groups("basic_node");
WeakCallbackGroupsToNodesMap weak_groups_to_nodes;
basic_node->for_each_callback_group(
[basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node->get_node_base_interface()));
});
node1->for_each_callback_group(
[node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node1->get_node_base_interface()));
});
allocator_memory_strategy()->collect_entities(weak_groups_to_nodes);

rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
ASSERT_EQ(
rcl_wait_set_init(
&wait_set,
allocator_memory_strategy()->number_of_ready_subscriptions(),
allocator_memory_strategy()->number_of_guard_conditions(),
allocator_memory_strategy()->number_of_ready_timers(),
allocator_memory_strategy()->number_of_ready_clients(),
allocator_memory_strategy()->number_of_ready_services(),
allocator_memory_strategy()->number_of_ready_events(),
rclcpp::contexts::get_global_default_context()->get_rcl_context().get(),
allocator_memory_strategy()->get_allocator()),
RCL_RET_OK);

ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set));

ASSERT_EQ(
rcl_wait(
&wait_set,
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(100))
.count()),
RCL_RET_OK);
test_waitable_result2 = true;
allocator_memory_strategy()->remove_null_handles(&wait_set);

rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes);
EXPECT_EQ(result.node_base, node1->get_node_base_interface());
test_waitable_result2 = false;

EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK);
}

{
auto node2 = std::make_shared<rclcpp::Node>(
"waitable_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));

rclcpp::PublisherOptions pub_options;
pub_options.use_default_callbacks = false;

auto pub2 = node2->create_publisher<test_msgs::msg::Empty>(
"test_topic_2", rclcpp::QoS(10), pub_options);

auto waitable2 =
std::make_shared<TestWaitable2>(pub2->get_publisher_handle().get());
node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr);

auto basic_node2 = std::make_shared<rclcpp::Node>(
"basic_node2", "ns",
rclcpp::NodeOptions()
.start_parameter_services(false)
.start_parameter_event_publisher(false));
WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes;
basic_node2->for_each_callback_group(
[basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
basic_node2->get_node_base_interface()));
});
node2->for_each_callback_group(
[node2,
&weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr)
{
weak_groups_to_uncollected_nodes.insert(
std::pair<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>(
group_ptr,
node2->get_node_base_interface()));
});

rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes);
EXPECT_EQ(failed_result.node_base, nullptr);
}
EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity));
}

TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) {
Expand Down
1 change: 1 addition & 0 deletions rclcpp/test/rclcpp/test_memory_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TestWaitable : public rclcpp::Waitable
public:
void add_to_wait_set(rcl_wait_set_t &) override {}
bool is_ready(const rcl_wait_set_t &) override {return true;}

std::shared_ptr<void> take_data() override {return nullptr;}
void execute(const std::shared_ptr<void> &) override {}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable
public:
TestWaitable()
: is_ready_(false) {}

void add_to_wait_set(rcl_wait_set_t &) override {}

bool is_ready(const rcl_wait_set_t &) override {return is_ready_;}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable
public:
TestWaitable()
: is_ready_(false), add_to_wait_set_(false) {}

void add_to_wait_set(rcl_wait_set_t &) override
{
if (!add_to_wait_set_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TestWaitable : public rclcpp::Waitable
public:
TestWaitable()
: is_ready_(false) {}

void add_to_wait_set(rcl_wait_set_t &) override {}

bool is_ready(const rcl_wait_set_t &) override {return is_ready_;}
Expand Down
Loading

0 comments on commit bfa7efa

Please sign in to comment.