From cd57ea7af92c0adaf9492589868f74913a3c61df Mon Sep 17 00:00:00 2001 From: Lennart Nachtigall Date: Fri, 13 Dec 2024 12:21:24 +0100 Subject: [PATCH] First step towards modernizing the rt publisher (#210) (cherry picked from commit e58c7ad290f64e172bfd16136cc146ce5759680d) --- include/realtime_tools/realtime_publisher.hpp | 63 ++++++++++++++----- test/realtime_publisher_tests.cpp | 41 ++++++++++++ test/realtime_publisher_tests_non_polling.cpp | 41 ++++++++++++ 3 files changed, 128 insertions(+), 17 deletions(-) diff --git a/include/realtime_tools/realtime_publisher.hpp b/include/realtime_tools/realtime_publisher.hpp index ca004342..3ac1eb06 100644 --- a/include/realtime_tools/realtime_publisher.hpp +++ b/include/realtime_tools/realtime_publisher.hpp @@ -49,27 +49,39 @@ namespace realtime_tools { -template +template class RealtimePublisher { -private: - using PublisherSharedPtr = typename rclcpp::Publisher::SharedPtr; - public: + /// Provide various typedefs to resemble the rclcpp::Publisher type + using PublisherType = rclcpp::Publisher; + using PublisherSharedPtr = typename rclcpp::Publisher::SharedPtr; + + using PublishedType = typename rclcpp::TypeAdapter::custom_type; + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + + RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher) + /// The msg_ variable contains the data that will get published on the ROS topic. - Msg msg_; + MessageT msg_; /** \brief Constructor for the realtime publisher * * \param publisher the publisher to wrap */ explicit RealtimePublisher(PublisherSharedPtr publisher) - : publisher_(publisher), is_running_(false), keep_running_(true), turn_(LOOP_NOT_STARTED) + : publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED) { thread_ = std::thread(&RealtimePublisher::publishingLoop, this); } - RealtimePublisher() : is_running_(false), keep_running_(false), turn_(LOOP_NOT_STARTED) {} + [[deprecated( + "Use constructor with rclcpp::Publisher::SharedPtr instead - this class does not make sense " + "without a real publisher")]] + RealtimePublisher() + : is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED) + { + } /// Destructor ~RealtimePublisher() @@ -101,7 +113,7 @@ class RealtimePublisher bool trylock() { if (msg_mutex_.try_lock()) { - if (turn_ == REALTIME) { + if (turn_ == State::REALTIME) { return true; } else { msg_mutex_.unlock(); @@ -112,6 +124,23 @@ class RealtimePublisher } } + /** \brief Try to get the data lock from realtime and publish the given message + * + * Tries to gain unique access to msg_ variable. If this succeeds + * update the msg_ variable and call unlockAndPublish + * @return false in case no lock for the realtime variable could be acquired + */ + bool tryPublish(const MessageT & msg) + { + if (!trylock()) { + return false; + } + + msg_ = msg; + unlockAndPublish(); + return true; + } + /** \brief Unlock the msg_ variable * * After a successful trylock and after the data is written to the mgs_ @@ -120,7 +149,7 @@ class RealtimePublisher */ void unlockAndPublish() { - turn_ = NON_REALTIME; + turn_ = State::NON_REALTIME; unlock(); } @@ -163,10 +192,10 @@ class RealtimePublisher void publishingLoop() { is_running_ = true; - turn_ = REALTIME; + turn_ = State::REALTIME; while (keep_running_) { - Msg outgoing; + MessageT outgoing; // Locks msg_ and copies it @@ -176,7 +205,7 @@ class RealtimePublisher lock(); #endif - while (turn_ != NON_REALTIME && keep_running_) { + while (turn_ != State::NON_REALTIME && keep_running_) { #ifdef NON_POLLING updated_cond_.wait(lock_); #else @@ -186,7 +215,7 @@ class RealtimePublisher #endif } outgoing = msg_; - turn_ = REALTIME; + turn_ = State::REALTIME; unlock(); @@ -210,12 +239,12 @@ class RealtimePublisher std::condition_variable updated_cond_; #endif - enum { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; - std::atomic turn_; // Who's turn is it to use msg_? + enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED }; + std::atomic turn_; // Who's turn is it to use msg_? }; -template -using RealtimePublisherSharedPtr = std::shared_ptr>; +template +using RealtimePublisherSharedPtr = std::shared_ptr>; } // namespace realtime_tools #endif // REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_ diff --git a/test/realtime_publisher_tests.cpp b/test/realtime_publisher_tests.cpp index d560b93c..eebfff18 100644 --- a/test/realtime_publisher_tests.cpp +++ b/test/realtime_publisher_tests.cpp @@ -91,3 +91,44 @@ TEST(RealtimePublisher, rt_publish) EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); rclcpp::shutdown(); } + +TEST(RealtimePublisher, rt_try_publish) +{ + rclcpp::init(0, nullptr); + const size_t ATTEMPTS = 10; + const std::chrono::milliseconds DELAY(250); + + const char * expected_msg = "Hello World"; + auto node = std::make_shared("construct_move_destruct"); + rclcpp::QoS qos(10); + qos.reliable().transient_local(); + auto pub = node->create_publisher("~/rt_publish", qos); + RealtimePublisher rt_pub(pub); + + // try publish a latched message + bool publish_success = false; + for (std::size_t i = 0; i < ATTEMPTS; ++i) { + StringMsg msg; + msg.string_value = expected_msg; + + if (rt_pub.tryPublish(msg)) { + publish_success = true; + break; + } + std::this_thread::sleep_for(DELAY); + } + ASSERT_TRUE(publish_success); + + // make sure subscriber gets it + StringCallback str_callback; + + auto sub = node->create_subscription( + "~/rt_publish", qos, + std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1)); + for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) { + rclcpp::spin_some(node); + std::this_thread::sleep_for(DELAY); + } + EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); + rclcpp::shutdown(); +} diff --git a/test/realtime_publisher_tests_non_polling.cpp b/test/realtime_publisher_tests_non_polling.cpp index 001e796b..f39d1064 100644 --- a/test/realtime_publisher_tests_non_polling.cpp +++ b/test/realtime_publisher_tests_non_polling.cpp @@ -93,3 +93,44 @@ TEST(RealtimePublisherNonPolling, rt_publish) EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); rclcpp::shutdown(); } + +TEST(RealtimePublisher, rt_try_publish) +{ + rclcpp::init(0, nullptr); + const size_t ATTEMPTS = 10; + const std::chrono::milliseconds DELAY(250); + + const char * expected_msg = "Hello World"; + auto node = std::make_shared("construct_move_destruct"); + rclcpp::QoS qos(10); + qos.reliable().transient_local(); + auto pub = node->create_publisher("~/rt_publish", qos); + RealtimePublisher rt_pub(pub); + + // try publish a latched message + bool publish_success = false; + for (std::size_t i = 0; i < ATTEMPTS; ++i) { + StringMsg msg; + msg.string_value = expected_msg; + + if (rt_pub.tryPublish(msg)) { + publish_success = true; + break; + } + std::this_thread::sleep_for(DELAY); + } + ASSERT_TRUE(publish_success); + + // make sure subscriber gets it + StringCallback str_callback; + + auto sub = node->create_subscription( + "~/rt_publish", qos, + std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1)); + for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) { + rclcpp::spin_some(node); + std::this_thread::sleep_for(DELAY); + } + EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str()); + rclcpp::shutdown(); +}