Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First step towards modernizing the rt publisher (backport #210) #233

Open
wants to merge 2 commits into
base: humble
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions include/realtime_tools/realtime_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,39 @@

namespace realtime_tools
{
template <class Msg>
template <class MessageT>
class RealtimePublisher
{
private:
using PublisherSharedPtr = typename rclcpp::Publisher<Msg>::SharedPtr;

public:
/// Provide various typedefs to resemble the rclcpp::Publisher type
using PublisherType = rclcpp::Publisher<MessageT>;
using PublisherSharedPtr = typename rclcpp::Publisher<MessageT>::SharedPtr;

using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;

RCLCPP_SMART_PTR_DEFINITIONS(RealtimePublisher<MessageT>)

/// The msg_ variable contains the data that will get published on the ROS topic.
Msg msg_;
MessageT msg_;

/** \brief Constructor for the realtime publisher
*
* \param publisher the publisher to wrap
*/
explicit RealtimePublisher(PublisherSharedPtr publisher)
: publisher_(publisher), is_running_(false), keep_running_(true), turn_(LOOP_NOT_STARTED)
: publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
{
thread_ = std::thread(&RealtimePublisher::publishingLoop, this);
}

RealtimePublisher() : is_running_(false), keep_running_(false), turn_(LOOP_NOT_STARTED) {}
[[deprecated(
"Use constructor with rclcpp::Publisher<T>::SharedPtr instead - this class does not make sense "
"without a real publisher")]]
RealtimePublisher()
: is_running_(false), keep_running_(false), turn_(State::LOOP_NOT_STARTED)
{
}

/// Destructor
~RealtimePublisher()
Expand Down Expand Up @@ -101,7 +113,7 @@ class RealtimePublisher
bool trylock()
{
if (msg_mutex_.try_lock()) {
if (turn_ == REALTIME) {
if (turn_ == State::REALTIME) {
return true;
} else {
msg_mutex_.unlock();
Expand All @@ -112,6 +124,23 @@ class RealtimePublisher
}
}

/** \brief Try to get the data lock from realtime and publish the given message
*
* Tries to gain unique access to msg_ variable. If this succeeds
* update the msg_ variable and call unlockAndPublish
* @return false in case no lock for the realtime variable could be acquired
*/
bool tryPublish(const MessageT & msg)
{
if (!trylock()) {
return false;
}

msg_ = msg;
unlockAndPublish();
return true;
}

/** \brief Unlock the msg_ variable
*
* After a successful trylock and after the data is written to the mgs_
Expand All @@ -120,7 +149,7 @@ class RealtimePublisher
*/
void unlockAndPublish()
{
turn_ = NON_REALTIME;
turn_ = State::NON_REALTIME;
unlock();
}

Expand Down Expand Up @@ -163,10 +192,10 @@ class RealtimePublisher
void publishingLoop()
{
is_running_ = true;
turn_ = REALTIME;
turn_ = State::REALTIME;

while (keep_running_) {
Msg outgoing;
MessageT outgoing;

// Locks msg_ and copies it

Expand All @@ -176,7 +205,7 @@ class RealtimePublisher
lock();
#endif

while (turn_ != NON_REALTIME && keep_running_) {
while (turn_ != State::NON_REALTIME && keep_running_) {
#ifdef NON_POLLING
updated_cond_.wait(lock_);
#else
Expand All @@ -186,7 +215,7 @@ class RealtimePublisher
#endif
}
outgoing = msg_;
turn_ = REALTIME;
turn_ = State::REALTIME;

unlock();

Expand All @@ -210,12 +239,12 @@ class RealtimePublisher
std::condition_variable updated_cond_;
#endif

enum { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
std::atomic<int> turn_; // Who's turn is it to use msg_?
enum class State : int { REALTIME, NON_REALTIME, LOOP_NOT_STARTED };
std::atomic<State> turn_; // Who's turn is it to use msg_?
};

template <class Msg>
using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<Msg>>;
template <class MessageT>
using RealtimePublisherSharedPtr = std::shared_ptr<RealtimePublisher<MessageT>>;

} // namespace realtime_tools
#endif // REALTIME_TOOLS__REALTIME_PUBLISHER_HPP_
41 changes: 41 additions & 0 deletions test/realtime_publisher_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,44 @@ TEST(RealtimePublisher, rt_publish)
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}

TEST(RealtimePublisher, rt_try_publish)
{
rclcpp::init(0, nullptr);
const size_t ATTEMPTS = 10;
const std::chrono::milliseconds DELAY(250);

const char * expected_msg = "Hello World";
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
rclcpp::QoS qos(10);
qos.reliable().transient_local();
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
RealtimePublisher<StringMsg> rt_pub(pub);

// try publish a latched message
bool publish_success = false;
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
StringMsg msg;
msg.string_value = expected_msg;

if (rt_pub.tryPublish(msg)) {
publish_success = true;
break;
}
std::this_thread::sleep_for(DELAY);
}
ASSERT_TRUE(publish_success);

// make sure subscriber gets it
StringCallback str_callback;

auto sub = node->create_subscription<StringMsg>(
"~/rt_publish", qos,
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
rclcpp::spin_some(node);
std::this_thread::sleep_for(DELAY);
}
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}
41 changes: 41 additions & 0 deletions test/realtime_publisher_tests_non_polling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,44 @@ TEST(RealtimePublisherNonPolling, rt_publish)
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}

TEST(RealtimePublisher, rt_try_publish)
{
rclcpp::init(0, nullptr);
const size_t ATTEMPTS = 10;
const std::chrono::milliseconds DELAY(250);

const char * expected_msg = "Hello World";
auto node = std::make_shared<rclcpp::Node>("construct_move_destruct");
rclcpp::QoS qos(10);
qos.reliable().transient_local();
auto pub = node->create_publisher<StringMsg>("~/rt_publish", qos);
RealtimePublisher<StringMsg> rt_pub(pub);

// try publish a latched message
bool publish_success = false;
for (std::size_t i = 0; i < ATTEMPTS; ++i) {
StringMsg msg;
msg.string_value = expected_msg;

if (rt_pub.tryPublish(msg)) {
publish_success = true;
break;
}
std::this_thread::sleep_for(DELAY);
}
ASSERT_TRUE(publish_success);

// make sure subscriber gets it
StringCallback str_callback;

auto sub = node->create_subscription<StringMsg>(
"~/rt_publish", qos,
std::bind(&StringCallback::callback, &str_callback, std::placeholders::_1));
for (size_t i = 0; i < ATTEMPTS && str_callback.msg_.string_value.empty(); ++i) {
rclcpp::spin_some(node);
std::this_thread::sleep_for(DELAY);
}
EXPECT_STREQ(expected_msg, str_callback.msg_.string_value.c_str());
rclcpp::shutdown();
}
Loading