diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 4fd912fd10..9c5e7928e0 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -30,6 +30,7 @@ #include "rclcpp/detail/subscription_callback_type_helper.hpp" #include "rclcpp/function_traits.hpp" #include "rclcpp/message_info.hpp" +#include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" #include "rclcpp/type_adapter.hpp" @@ -158,13 +159,14 @@ struct AnySubscriptionCallbackPossibleTypes template< typename MessageT, typename AllocatorT, - bool is_adapted_type = rclcpp::TypeAdapter::is_specialized::value + bool is_adapted_type = rclcpp::TypeAdapter::is_specialized::value, + bool is_serialized_type = serialization_traits::is_serialized_message_class::value > struct AnySubscriptionCallbackHelper; /// Specialization for when MessageT is not a TypeAdapter. template -struct AnySubscriptionCallbackHelper +struct AnySubscriptionCallbackHelper { using CallbackTypes = AnySubscriptionCallbackPossibleTypes; @@ -194,7 +196,7 @@ struct AnySubscriptionCallbackHelper /// Specialization for when MessageT is a TypeAdapter. template -struct AnySubscriptionCallbackHelper +struct AnySubscriptionCallbackHelper { using CallbackTypes = AnySubscriptionCallbackPossibleTypes; @@ -232,6 +234,26 @@ struct AnySubscriptionCallbackHelper >; }; +/// Specialization for when MessageT is a SerializedMessage to exclude duplicated declarations. +template +struct AnySubscriptionCallbackHelper +{ + using CallbackTypes = AnySubscriptionCallbackPossibleTypes; + + using variant_type = std::variant< + typename CallbackTypes::ConstRefSerializedMessageCallback, + typename CallbackTypes::ConstRefSerializedMessageWithInfoCallback, + typename CallbackTypes::UniquePtrSerializedMessageCallback, + typename CallbackTypes::UniquePtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageCallback, + typename CallbackTypes::SharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageCallback, + typename CallbackTypes::ConstRefSharedConstPtrSerializedMessageWithInfoCallback, + typename CallbackTypes::SharedPtrSerializedMessageCallback, + typename CallbackTypes::SharedPtrSerializedMessageWithInfoCallback + >; +}; + } // namespace detail template< @@ -487,7 +509,9 @@ class AnySubscriptionCallback } // Dispatch when input is a ros message and the output could be anything. - void + template + typename std::enable_if::value, + void>::type dispatch( std::shared_ptr message, const rclcpp::MessageInfo & message_info) @@ -589,7 +613,7 @@ class AnySubscriptionCallback // Dispatch when input is a serialized message and the output could be anything. void dispatch( - std::shared_ptr serialized_message, + std::shared_ptr serialized_message, const rclcpp::MessageInfo & message_info) { TRACETOOLS_TRACEPOINT(callback_start, static_cast(this), false); diff --git a/rclcpp/include/rclcpp/create_generic_subscription.hpp b/rclcpp/include/rclcpp/create_generic_subscription.hpp index f5281cc673..c2549721b5 100644 --- a/rclcpp/include/rclcpp/create_generic_subscription.hpp +++ b/rclcpp/include/rclcpp/create_generic_subscription.hpp @@ -45,13 +45,15 @@ namespace rclcpp * Not all publisher options are currently respected, the only relevant options for this * publisher are `event_callbacks`, `use_default_callbacks`, and `%callback_group`. */ -template> +template< + typename CallbackT, + typename AllocatorT = std::allocator> std::shared_ptr create_generic_subscription( rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface, const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos, - std::function)> callback, + CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options = ( rclcpp::SubscriptionOptionsWithAllocator() ) @@ -60,13 +62,20 @@ std::shared_ptr create_generic_subscription( auto ts_lib = rclcpp::get_typesupport_library( topic_type, "rosidl_typesupport_cpp"); + auto allocator = options.get_allocator(); + + using rclcpp::AnySubscriptionCallback; + AnySubscriptionCallback + any_subscription_callback(*allocator); + any_subscription_callback.set(std::forward(callback)); + auto subscription = std::make_shared( topics_interface->get_node_base_interface(), std::move(ts_lib), topic_name, topic_type, qos, - callback, + any_subscription_callback, options); topics_interface->add_subscription(subscription, options.callback_group); diff --git a/rclcpp/include/rclcpp/generic_subscription.hpp b/rclcpp/include/rclcpp/generic_subscription.hpp index dfbae0467b..e9bf79deea 100644 --- a/rclcpp/include/rclcpp/generic_subscription.hpp +++ b/rclcpp/include/rclcpp/generic_subscription.hpp @@ -74,8 +74,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos, - // TODO(nnmm): Add variant for callback with message info. See issue #1604. - std::function)> callback, + AnySubscriptionCallback callback, const rclcpp::SubscriptionOptionsWithAllocator & options) : SubscriptionBase( node_base, @@ -85,7 +84,11 @@ class GenericSubscription : public rclcpp::SubscriptionBase options.event_callbacks, options.use_default_callbacks, DeliveredMessageKind::SERIALIZED_MESSAGE), - callback_(callback), + callback_([callback]( + std::shared_ptr serialized_message, + const rclcpp::MessageInfo & message_info) mutable { + callback.dispatch(serialized_message, message_info); + }), ts_lib_(ts_lib) {} @@ -151,7 +154,9 @@ class GenericSubscription : public rclcpp::SubscriptionBase private: RCLCPP_DISABLE_COPY(GenericSubscription) - std::function)> callback_; + std::function, + const rclcpp::MessageInfo)> callback_; // The type support library should stay loaded, so it is stored in the GenericSubscription std::shared_ptr ts_lib_; }; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 6521264b86..50b96bbeaf 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -358,12 +358,14 @@ class Node : public std::enable_shared_from_this * `%callback_group`. * \return Shared pointer to the created generic subscription. */ - template> + template< + typename CallbackT, + typename AllocatorT = std::allocator> std::shared_ptr create_generic_subscription( const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos, - std::function)> callback, + CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options = ( rclcpp::SubscriptionOptionsWithAllocator() ) diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 5f1fb7b3df..d55a23f9c1 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -221,13 +221,13 @@ Node::create_generic_publisher( ); } -template +template std::shared_ptr Node::create_generic_subscription( const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos, - std::function)> callback, + CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options) { return rclcpp::create_generic_subscription( @@ -235,7 +235,7 @@ Node::create_generic_subscription( extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), topic_type, qos, - std::move(callback), + std::forward(callback), options ); } diff --git a/rclcpp/src/rclcpp/generic_subscription.cpp b/rclcpp/src/rclcpp/generic_subscription.cpp index e6e61add24..2d2be61277 100644 --- a/rclcpp/src/rclcpp/generic_subscription.cpp +++ b/rclcpp/src/rclcpp/generic_subscription.cpp @@ -49,9 +49,9 @@ GenericSubscription::handle_message( void GenericSubscription::handle_serialized_message( const std::shared_ptr & message, - const rclcpp::MessageInfo &) + const rclcpp::MessageInfo & message_info) { - callback_(message); + callback_(message, message_info); } void diff --git a/rclcpp/test/rclcpp/test_generic_pubsub.cpp b/rclcpp/test/rclcpp/test_generic_pubsub.cpp index f4cef0b757..79fbfcc33a 100644 --- a/rclcpp/test/rclcpp/test_generic_pubsub.cpp +++ b/rclcpp/test/rclcpp/test_generic_pubsub.cpp @@ -69,7 +69,7 @@ class RclcppGenericNodeFixture : public Test size_t counter = 0; auto subscription = node_->create_generic_subscription( topic_name, type, rclcpp::QoS(1), - [&counter, &messages, this](std::shared_ptr message) { + [&counter, &messages, this](const std::shared_ptr message) { T2 deserialized_message; rclcpp::Serialization serializer; serializer.deserialize_message(message.get(), &deserialized_message); @@ -236,7 +236,7 @@ TEST_F(RclcppGenericNodeFixture, generic_subscription_uses_qos) auto publisher = node_->create_publisher(topic_name, qos); auto subscription = node_->create_generic_subscription( topic_name, topic_type, qos, - [](std::shared_ptr/* message */) {}); + [](std::shared_ptr/* message */) {}); auto connected = [publisher, subscription]() -> bool { return publisher->get_subscription_count() && subscription->get_publisher_count(); }; @@ -263,3 +263,49 @@ TEST_F(RclcppGenericNodeFixture, generic_publisher_uses_qos) // It normally takes < 20ms, 5s chosen as "a very long time" ASSERT_TRUE(wait_for(connected, 5s)); } + +TEST_F(RclcppGenericNodeFixture, generic_subscription_different_callbacks) +{ + using namespace std::chrono_literals; + std::string topic_name = "string_topic"; + std::string topic_type = "test_msgs/msg/Strings"; + rclcpp::QoS qos = rclcpp::QoS(1); + + auto publisher = node_->create_publisher(topic_name, qos); + + // Test shared_ptr for const messages + { + auto subscription = node_->create_generic_subscription( + topic_name, topic_type, qos, + [](const std::shared_ptr/* message */) {}); + auto connected = [publisher, subscription]() -> bool { + return publisher->get_subscription_count() && subscription->get_publisher_count(); + }; + // It normally takes < 20ms, 5s chosen as "a very long time" + ASSERT_TRUE(wait_for(connected, 5s)); + } + + // Test unique_ptr + { + auto subscription = node_->create_generic_subscription( + topic_name, topic_type, qos, + [](std::unique_ptr/* message */) {}); + auto connected = [publisher, subscription]() -> bool { + return publisher->get_subscription_count() && subscription->get_publisher_count(); + }; + // It normally takes < 20ms, 5s chosen as "a very long time" + ASSERT_TRUE(wait_for(connected, 5s)); + } + + // Test message callback + { + auto subscription = node_->create_generic_subscription( + topic_name, topic_type, qos, + [](rclcpp::SerializedMessage /* message */) {}); + auto connected = [publisher, subscription]() -> bool { + return publisher->get_subscription_count() && subscription->get_publisher_count(); + }; + // It normally takes < 20ms, 5s chosen as "a very long time" + ASSERT_TRUE(wait_for(connected, 5s)); + } +}