diff --git a/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp b/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp index 8f97596218..3b823144ec 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp @@ -216,6 +216,11 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo shared_waitables_ ); + if (this->needs_pruning_) { + this->storage_prune_deleted_entities(); + this->needs_pruning_ = false; + } + this->storage_release_ownerships(); } diff --git a/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp b/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp index 7f5cad74ad..d07aa3fb57 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/static_storage.hpp @@ -160,6 +160,15 @@ class StaticStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCom services_, waitables_ ); + + if(this->needs_pruning_) { + // we need to throw here, as the indexing of the rcl_waitset is broken, + // in case of invalid entries + + throw std::runtime_error( + "StaticStorage::storage_rebuild_rcl_wait_set(): entity weak_ptr " + "unexpectedly expired in static entity storage"); + } } // storage_add_subscription() explicitly not declared here diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 011e776aa5..14c7627631 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -889,3 +889,213 @@ TYPED_TEST(TestExecutors, testRaceDropCallbackGroupFromSecondThread) executor.cancel(); executor_thread.join(); } + +TYPED_TEST(TestExecutors, dropSomeTimer) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + auto node = std::make_shared("test_node"); + + bool timer1_works = false; + bool timer2_works = false; + + auto timer1 = node->create_timer(std::chrono::milliseconds(10), [&timer1_works]() { + timer1_works = true; + }); + auto timer2 = node->create_timer(std::chrono::milliseconds(10), [&timer2_works]() { + timer2_works = true; + }); + + executor.add_node(node); + + // first let's make sure that both timers work + auto max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!timer1_works || !timer2_works) { + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + // delete timer 2. Note, the executor uses an unordered map internally, to order + // the entities added to the rcl waitset therefore the order is kind of undefined, + // and this test may be flaky. In case it triggers, something is most likely + // really broken. + timer2.reset(); + + timer1_works = false; + timer2_works = false; + max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!timer1_works && !timer2_works) { + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + ASSERT_TRUE(timer1_works || timer2_works); +} + +TYPED_TEST(TestExecutors, dropSomeNodeWithTimer) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + auto node1 = std::make_shared("test_node_1"); + auto node2 = std::make_shared("test_node_2"); + + bool timer1_works = false; + bool timer2_works = false; + + auto timer1 = node1->create_timer(std::chrono::milliseconds(10), [&timer1_works]() { + timer1_works = true; + }); + auto timer2 = node2->create_timer(std::chrono::milliseconds(10), [&timer2_works]() { + timer2_works = true; + }); + + executor.add_node(node1); + executor.add_node(node2); + + // first let's make sure that both timers work + auto max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!timer1_works || !timer2_works) { + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + // delete node 1. + node1 = nullptr; + + timer2_works = false; + max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!timer2_works) { + // let the executor pick up the node and the timer + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + ASSERT_TRUE(timer2_works); +} + +TYPED_TEST(TestExecutors, dropSomeSubscription) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + auto node = std::make_shared("test_node"); + + bool sub1_works = false; + bool sub2_works = false; + + auto sub1 = node->create_subscription("/test_drop", 10, + [&sub1_works](const test_msgs::msg::Empty &) { + sub1_works = true; + }); + auto sub2 = node->create_subscription("/test_drop", 10, + [&sub2_works](const test_msgs::msg::Empty &) { + sub2_works = true; + }); + + auto pub = node->create_publisher("/test_drop", 10); + + executor.add_node(node); + + // first let's make sure that both timers work + auto max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!sub1_works || !sub2_works) { + pub->publish(test_msgs::msg::Empty()); + + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + // delete subscription 2. Note, the executor uses an unordered map internally, to order + // the entities added to the rcl waitset therefore the order is kind of undefined, + // and this test may be flaky. In case it triggers, something is most likely + // really broken. + sub2.reset(); + + sub1_works = false; + sub2_works = false; + max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!sub1_works && !sub2_works) { + pub->publish(test_msgs::msg::Empty()); + + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + ASSERT_TRUE(sub1_works || sub2_works); +} + +TYPED_TEST(TestExecutors, dropSomeNodesWithSubscription) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + auto node = std::make_shared("test_node"); + auto node1 = std::make_shared("test_node_1"); + auto node2 = std::make_shared("test_node_2"); + + bool sub1_works = false; + bool sub2_works = false; + + auto sub1 = node1->create_subscription("/test_drop", 10, + [&sub1_works](const test_msgs::msg::Empty &) { + sub1_works = true; + }); + auto sub2 = node2->create_subscription("/test_drop", 10, + [&sub2_works](const test_msgs::msg::Empty &) { + sub2_works = true; + }); + + auto pub = node->create_publisher("/test_drop", 10); + + executor.add_node(node); + executor.add_node(node1); + executor.add_node(node2); + + // first let's make sure that both subscribers work + auto max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!sub1_works || !sub2_works) { + pub->publish(test_msgs::msg::Empty()); + + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + // delete node 2. + node2 = nullptr; + + sub1_works = false; + max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while(!sub1_works) { + pub->publish(test_msgs::msg::Empty()); + + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + ASSERT_TRUE(sub1_works); +}