diff --git a/rmw_zenoh_cpp/src/detail/event.cpp b/rmw_zenoh_cpp/src/detail/event.cpp index 4b99e419..2b204c91 100644 --- a/rmw_zenoh_cpp/src/detail/event.cpp +++ b/rmw_zenoh_cpp/src/detail/event.cpp @@ -22,10 +22,10 @@ ///============================================================================= -void EventsBase::set_user_callback( +void DataCallbackManager::set_callback( const void * user_data, rmw_event_callback_t callback) { - std::lock_guard lock_mutex(event_mutex_); + std::lock_guard lock_mutex(event_mutex_); if (callback) { // Push events arrived before setting the the executor callback. @@ -42,10 +42,10 @@ void EventsBase::set_user_callback( } ///============================================================================= -void EventsBase::trigger_user_callback() +void DataCallbackManager::trigger_callback() { // Trigger the user provided event callback if available. - std::lock_guard lock_mutex(event_mutex_); + std::lock_guard lock_mutex(event_mutex_); if (callback_ != nullptr) { callback_(user_data_, 1); } else { @@ -54,7 +54,7 @@ void EventsBase::trigger_user_callback() } ///============================================================================= -void EventsBase::event_set_callback( +void EventsManager::event_set_callback( rmw_zenoh_event_type_t event_id, rmw_event_callback_t callback, const void * user_data) @@ -82,7 +82,7 @@ void EventsBase::event_set_callback( } ///============================================================================= -void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id) +void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -103,7 +103,7 @@ void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id) } ///============================================================================= -bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const +bool EventsManager::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -119,7 +119,7 @@ bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const } ///============================================================================= -std::unique_ptr EventsBase::pop_next_event( +std::unique_ptr EventsManager::pop_next_event( rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { @@ -145,7 +145,7 @@ std::unique_ptr EventsBase::pop_next_event( } ///============================================================================= -void EventsBase::add_new_event( +void EventsManager::add_new_event( rmw_zenoh_event_type_t event_id, std::unique_ptr event) { @@ -180,7 +180,7 @@ void EventsBase::add_new_event( } ///============================================================================= -void EventsBase::attach_event_condition( +void EventsManager::attach_event_condition( rmw_zenoh_event_type_t event_id, std::condition_variable * condition_variable) { @@ -197,7 +197,7 @@ void EventsBase::attach_event_condition( } ///============================================================================= -void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id) +void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( @@ -212,7 +212,7 @@ void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id) } ///============================================================================= -void EventsBase::notify_event(rmw_zenoh_event_type_t event_id) +void EventsManager::notify_event(rmw_zenoh_event_type_t event_id) { if (event_id > ZENOH_EVENT_ID_MAX) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( diff --git a/rmw_zenoh_cpp/src/detail/event.hpp b/rmw_zenoh_cpp/src/detail/event.hpp index 727ff1fc..01dbce4c 100644 --- a/rmw_zenoh_cpp/src/detail/event.hpp +++ b/rmw_zenoh_cpp/src/detail/event.hpp @@ -36,14 +36,14 @@ enum rmw_zenoh_event_type_t // subscription events ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE, ZENOH_EVENT_MESSAGE_LOST, - // RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, + ZENOH_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, ZENOH_EVENT_SUBSCRIPTION_MATCHED, // publisher events // RMW_EVENT_LIVELINESS_LOST, // RMW_EVENT_OFFERED_DEADLINE_MISSED, ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE, - // RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, + ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, ZENOH_EVENT_PUBLICATION_MATCHED, }; @@ -56,35 +56,60 @@ static const std::unordered_map event_ {RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE}, {RMW_EVENT_MESSAGE_LOST, ZENOH_EVENT_MESSAGE_LOST}, {RMW_EVENT_SUBSCRIPTION_MATCHED, ZENOH_EVENT_SUBSCRIPTION_MATCHED}, - {RMW_EVENT_PUBLICATION_MATCHED, ZENOH_EVENT_PUBLICATION_MATCHED} + {RMW_EVENT_PUBLICATION_MATCHED, ZENOH_EVENT_PUBLICATION_MATCHED}, + {RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, ZENOH_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE}, + {RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE} // TODO(clalancette): Implement remaining events }; ///============================================================================= /// A struct to store status changes which can be mapped to rmw event statuses. -/// The data field can be used to store serialized information for more complex statuses. struct rmw_zenoh_event_status_t { size_t total_count; size_t total_count_change; size_t current_count; + size_t current_count_change; + // The data field can be used to store serialized information for more complex statuses. std::string data; + + rmw_zenoh_event_status_t() + : total_count(0), + total_count_change(0), + current_count(0), + current_count_change(0) + {} }; ///============================================================================= -/// Base class to be inherited by entities that support events. -class EventsBase +/// A class that manages callbacks that should be triggered when a new +/// message/request/response is received by an entity. +class DataCallbackManager { public: /// @brief Set the user defined callback that should be called when /// a new message/response/request is received. /// @param user_data the data that should be passed to the callback. /// @param callback the callback to be set. - void set_user_callback(const void * user_data, rmw_event_callback_t callback); + void set_callback(const void * user_data, rmw_event_callback_t callback); /// Trigger the user callback. - void trigger_user_callback(); + void trigger_callback(); + +private: + std::mutex event_mutex_; + /// User callback that can be set via set_callback(). + rmw_event_callback_t callback_ {nullptr}; + /// User data that should be passed to the user callback. + const void * user_data_ {nullptr}; + /// number of trigger requests made before the callback was set. + size_t unread_count_ {0}; +}; +/// Base class to be inherited by entities that support events. +class EventsManager +{ +public: /// @brief Set the callback to be triggered when the relevant event is triggered. /// @param event_id the id of the event /// @param callback the callback to trigger for this event. @@ -133,7 +158,7 @@ class EventsBase mutable std::mutex event_condition_mutex_; /// Condition variable to attach for event notifications. std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr}; - /// User callback that can be set via set_user_callback(). + /// User callback that can be set via data_callback_mgr.set_callback(). rmw_event_callback_t callback_ {nullptr}; /// User data that should be passed to the user callback. const void * user_data_ {nullptr}; diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index b35c1309..1a820dbb 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -41,152 +41,268 @@ using Entity = liveliness::Entity; using EntityType = liveliness::EntityType; ///============================================================================= -TopicStats::TopicStats(std::size_t pub_count, std::size_t sub_count) -: pub_count_(pub_count), - sub_count_(sub_count) +TopicData::TopicData( + liveliness::TopicInfo info, + std::unordered_set pubs, + std::unordered_set subs) +: info_(std::move(info)), + pubs_(std::move(pubs)), + subs_(std::move(subs)) +{} + +///============================================================================= +GraphCache::GraphCache(const z_id_t & zid) +: zid_str_(liveliness::zid_to_str(zid)) { // Do nothing. } ///============================================================================= -TopicData::TopicData( - liveliness::TopicInfo info, - TopicStats stats) -: info_(std::move(info)), - stats_(std::move(stats)) -{} +std::shared_ptr GraphCache::make_graph_node(const Entity & entity) const +{ + auto graph_node = std::make_shared(); + graph_node->zid_ = entity.zid(); + graph_node->nid_ = entity.nid(); + graph_node->ns_ = entity.node_namespace(); + graph_node->name_ = entity.node_name(); + graph_node->enclave_ = entity.node_enclave(); + + return graph_node; +} ///============================================================================= -void GraphCache::parse_put(const std::string & keyexpr) +void GraphCache::update_topic_maps_for_put( + GraphNodePtr graph_node, + const liveliness::Entity & entity) { - std::optional valid_entity = liveliness::Entity::make(keyexpr); - if (!valid_entity.has_value()) { - // Error message has already been logged. + if (entity.type() == EntityType::Node) { + // Nothing to update for a node entity. return; } - const liveliness::Entity & entity = *valid_entity; - // Helper lambda to append pub/subs to the GraphNode. - // We capture by reference to update graph_topics_ - auto add_topic_data = - [](const Entity & entity, GraphNode & graph_node, GraphCache & graph_cache) -> void - { - if (entity.type() == EntityType::Invalid || - entity.type() == EntityType::Node) - { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "add_topic_data() for invalid EntityType. Report this."); - return; - } + // First update the topic map within the node. + if (entity.type() == EntityType::Publisher) { + update_topic_map_for_put(graph_node->pubs_, entity); + } else if (entity.type() == EntityType::Subscription) { + update_topic_map_for_put(graph_node->subs_, entity); + } else if (entity.type() == EntityType::Service) { + update_topic_map_for_put(graph_node->services_, entity); + } else { + update_topic_map_for_put(graph_node->clients_, entity); + } - if (!entity.topic_info().has_value()) { - // This should not happen as add_topic_data() is called after validating the existence - // of topic_info. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "add_topic_data() called without valid TopicInfo. Report this."); - return; - } + // Then update the variables tracking topics across the graph. + // We invoke update_topic_map_for_put() with report_events set to true for + // pub/sub. + if (entity.type() == EntityType::Publisher || + entity.type() == EntityType::Subscription) + { + update_topic_map_for_put(this->graph_topics_, entity, true); + } else { + update_topic_map_for_put(this->graph_services_, entity); + } +} - const liveliness::TopicInfo topic_info = entity.topic_info().value(); - // For the sake of reusing data structures and lookup functions, we treat publishers and - // clients are equivalent. Similarly, subscriptions and services are equivalent. - const std::size_t pub_count = entity.type() == EntityType::Publisher || - entity.type() == EntityType::Client ? 1 : 0; - const std::size_t sub_count = !pub_count; - - // Helper lambda to update TopicMap within the node the one for the entire graph. - auto update_topic_map = - [](GraphNode::TopicMap & topic_map, - const liveliness::TopicInfo topic_info, - const std::size_t pub_count, - const std::size_t sub_count) -> void - { - TopicDataPtr graph_topic_data = std::make_shared( - topic_info, - TopicStats{pub_count, sub_count}); - std::string qos_str = liveliness::qos_to_keyexpr(topic_info.qos_); - GraphNode::TopicQoSMap topic_qos_map = { - {qos_str, graph_topic_data}}; - - GraphNode::TopicMap::iterator topic_map_it = topic_map.find(topic_info.name_); - if (topic_map_it == topic_map.end()) { - // First time this topic name is discovered for the node so we insert a TopicTypeMap. - GraphNode::TopicTypeMap topic_data_map = { - {topic_info.type_, std::move(topic_qos_map)} - }; - topic_map.insert(std::make_pair(topic_info.name_, std::move(topic_data_map))); +///============================================================================= +void GraphCache::update_topic_map_for_put( + GraphNode::TopicMap & topic_map, + const liveliness::Entity & entity, + bool report_events) +{ + if (!entity.topic_info().has_value()) { + // This should not happen as topic_info should be populated for all non-node entites. + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "update_topic_map_for_put() called for non-node entity without valid TopicInfo. " + "Report this."); + return; + } + const liveliness::TopicInfo topic_info = entity.topic_info().value(); + + // For the sake of reusing data structures and lookup functions, we treat publishers and + // clients as equivalent. Similarly, subscriptions and services are equivalent. + const bool is_pub = is_entity_pub(entity); + // Initialize a map that will be populated with any QoS events that may be detected. + std::unordered_map> + local_entities_with_events = {}; + std::unordered_set pubs = {}; + std::unordered_set subs = {}; + if (is_pub) { + pubs.insert(entity); + } else { + subs.insert(entity); + } + TopicDataPtr graph_topic_data = std::make_shared( + topic_info, + std::move(pubs), + std::move(subs)); + + std::string qos_str = liveliness::qos_to_keyexpr(topic_info.qos_); + GraphNode::TopicQoSMap topic_qos_map = { + {qos_str, graph_topic_data}}; + + GraphNode::TopicMap::iterator topic_map_it = topic_map.find(topic_info.name_); + if (topic_map_it == topic_map.end()) { + // First time this topic name is discovered for the node so we insert a TopicTypeMap. + GraphNode::TopicTypeMap topic_data_map = { + {topic_info.type_, std::move(topic_qos_map)} + }; + topic_map.insert(std::make_pair(topic_info.name_, std::move(topic_data_map))); + } else { + // The topic exists for the node so we check if the type also exists. + GraphNode::TopicTypeMap::iterator topic_type_map_it = topic_map_it->second.find( + topic_info.type_); + if (topic_type_map_it == topic_map_it->second.end()) { + // First time this topic type is added. + // TODO(Yadunund) Check for and report an *_INCOMPATIBLE_TYPE events. + + topic_map_it->second.insert( + std::make_pair( + topic_info.type_, + std::move(topic_qos_map))); + } else { + // The topic type already exists. + // With Zenoh, as long as topic name and type match, transport will ensure + // payloads are received by subs. Hence, we can check for matched events + // without having to check for any qos compatibilities. + if (report_events) { + // The entity added may be local with callbacks registered but there + // may be other local entities in the graph that are matched. + std::size_t match_count_for_entity = 0; + for (const auto & [_, topic_data_ptr] : topic_type_map_it->second) { + if (is_pub) { + // Count the number of matching subs for each set of qos settings. + if (!topic_data_ptr->subs_.empty()) { + match_count_for_entity += topic_data_ptr->subs_.size(); + } + // Also iterate through the subs to check if any are local and if update event counters. + for (const liveliness::Entity & sub_entity : topic_data_ptr->subs_) { + update_event_counters( + topic_info.name_, + ZENOH_EVENT_SUBSCRIPTION_MATCHED, + static_cast(1)); + if (is_entity_local(sub_entity)) { + local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + printf( + "Updating matched count by 1 for local sub: %s\n", + sub_entity.keyexpr().c_str()); + } + } + // Update event counters for the new entity. + update_event_counters( + topic_info.name_, + ZENOH_EVENT_PUBLICATION_MATCHED, + match_count_for_entity); + if (is_entity_local(entity) && match_count_for_entity > 0) { + local_entities_with_events[entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); + printf( + "Updating matched count by %ld for local pub: %s\n", + match_count_for_entity, entity.keyexpr().c_str()); + } } else { - // The topic exists for the node so we check if the type also exists. - GraphNode::TopicTypeMap::iterator topic_data_map_it = topic_map_it->second.find( - topic_info.type_); - if (topic_data_map_it == topic_map_it->second.end()) { - // First time this topic type is added. - topic_map_it->second.insert( - std::make_pair( - topic_info.type_, - std::move(topic_qos_map))); - } else { - // The topic type already exists so we check if qos also exists. - GraphNode::TopicQoSMap::iterator topic_qos_map_it = topic_data_map_it->second.find( - qos_str); - if (topic_qos_map_it == topic_data_map_it->second.end()) { - // First time this qos is added. - topic_data_map_it->second.insert(std::make_pair(qos_str, graph_topic_data)); - } else { - // We have another instance of a pub/sub over the same topic, - // type and qos so we increment the counters. - TopicDataPtr & existing_graph_topic = topic_qos_map_it->second; - existing_graph_topic->stats_.pub_count_ += pub_count; - existing_graph_topic->stats_.sub_count_ += sub_count; + // Entity is a sub. + // Count the number of matching pubs for each set of qos settings. + if (!topic_data_ptr->pubs_.empty()) { + match_count_for_entity += topic_data_ptr->pubs_.size(); + } + // Also iterate through the pubs to check if any are local and if update event counters. + for (const liveliness::Entity & pub_entity : topic_data_ptr->pubs_) { + update_event_counters( + topic_info.name_, + ZENOH_EVENT_PUBLICATION_MATCHED, + static_cast(1)); + if (is_entity_local(pub_entity)) { + local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); + printf( + "Updating matched count by 1 for local pub: %s\n", + pub_entity.keyexpr().c_str()); } } + // Update event counters for the new entity. + update_event_counters( + topic_info.name_, + ZENOH_EVENT_SUBSCRIPTION_MATCHED, + match_count_for_entity); + if (is_entity_local(entity) && match_count_for_entity > 0) { + local_entities_with_events[entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + printf( + "Updating matched count by %ld for local sub: %s\n", + match_count_for_entity, entity.keyexpr().c_str()); + } } - }; - - // First update the topic map within the node. - if (entity.type() == EntityType::Publisher) { - update_topic_map(graph_node.pubs_, topic_info, pub_count, sub_count); - } else if (entity.type() == EntityType::Subscription) { - update_topic_map(graph_node.subs_, topic_info, pub_count, sub_count); - } else if (entity.type() == EntityType::Service) { - update_topic_map(graph_node.services_, topic_info, pub_count, sub_count); - } else { - update_topic_map(graph_node.clients_, topic_info, pub_count, sub_count); + } } - - // Then update the variables tracking topics across the graph. - // TODO(Yadunund): Check for QoS events. - if (entity.type() == EntityType::Publisher || - entity.type() == EntityType::Subscription) - { - update_topic_map(graph_cache.graph_topics_, topic_info, pub_count, sub_count); + // We check if an entity with the exact same qos also exists. + GraphNode::TopicQoSMap::iterator topic_qos_map_it = topic_type_map_it->second.find( + qos_str); + if (topic_qos_map_it == topic_type_map_it->second.end()) { + // First time this qos is added. + // Update cache. + topic_type_map_it->second.insert(std::make_pair(qos_str, graph_topic_data)); } else { - update_topic_map(graph_cache.graph_services_, topic_info, pub_count, sub_count); + // We have another instance of a pub/sub over the same topic, + // type and qos so we increment the counters. + TopicDataPtr & existing_graph_topic = topic_qos_map_it->second; + if (is_pub) { + existing_graph_topic->pubs_.insert(entity); + } else { + existing_graph_topic->subs_.insert(entity); + } } - }; + } + } + // Take events if any. + if (report_events) { + take_local_entities_with_events(local_entities_with_events); + } +} - // Helper lambda to convert an Entity into a GraphNode. - // Note: this will update bookkeeping variables in GraphCache. - auto make_graph_node = - [&add_topic_data](const Entity & entity, GraphCache & graph_cache) -> std::shared_ptr - { - auto graph_node = std::make_shared(); - graph_node->id_ = entity.id(); - graph_node->ns_ = entity.node_namespace(); - graph_node->name_ = entity.node_name(); - graph_node->enclave_ = entity.node_enclave(); - - if (!entity.topic_info().has_value()) { - // Token was for a node. - return graph_node; +///============================================================================= +void GraphCache::take_local_entities_with_events( + std::unordered_map> & + local_entities_with_events) +{ + if (local_entities_with_events.empty()) { + return; + } + + for (const auto & [local_entity, event_set] : local_entities_with_events) { + // Trigger callback set for this entity for the event type. + GraphEventCallbackMap::const_iterator event_callbacks_it = + event_callbacks_.find(local_entity); + if (event_callbacks_it != event_callbacks_.end()) { + for (const rmw_zenoh_event_type_t & event_type : event_set) { + GraphEventCallbacks::const_iterator callback_it = + event_callbacks_it->second.find(event_type); + if (callback_it != event_callbacks_it->second.end()) { + std::unique_ptr taken_event = + take_event_status(local_entity.topic_info()->name_, event_type); + callback_it->second(std::move(taken_event)); + } } - // Add endpoint entries. - add_topic_data(entity, *graph_node, graph_cache); + } + } +} - return graph_node; - }; +///============================================================================= +void GraphCache::parse_put( + const std::string & keyexpr, + bool ignore_from_current_session) +{ + printf("[parse_put %s] %s\n", zid_str_.c_str(), keyexpr.c_str()); + std::optional maybe_entity = liveliness::Entity::make(keyexpr); + if (!maybe_entity.has_value()) { + // Error message has already been logged. + return; + } + + const liveliness::Entity & entity = *maybe_entity; + if (ignore_from_current_session && is_entity_local(entity)) { + RCUTILS_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Ignoring parse_put for %s from the same session.\n", entity.keyexpr().c_str()); + return; + } // Lock the graph mutex before accessing the graph. std::lock_guard lock(graph_mutex_); @@ -194,9 +310,15 @@ void GraphCache::parse_put(const std::string & keyexpr) // If the namespace did not exist, create it and add the node to the graph and return. NamespaceMap::iterator ns_it = graph_.find(entity.node_namespace()); if (ns_it == graph_.end()) { + GraphNodePtr node = make_graph_node(entity); + if (node == nullptr) { + // Error handled. + return; + } NodeMap node_map = { - {entity.node_name(), make_graph_node(entity, *this)}}; + {entity.node_name(), node}}; graph_.emplace(std::make_pair(entity.node_namespace(), std::move(node_map))); + update_topic_maps_for_put(node, entity); total_nodes_in_graph_ += 1; return; } @@ -212,151 +334,235 @@ void GraphCache::parse_put(const std::string & keyexpr) range.first, range.second, [&entity](const std::pair & node_it) { - return entity.id() == node_it.second->id_; + // Match nodes if their zenoh sesion and node ids match. + return entity.zid() == node_it.second->zid_ && entity.nid() == node_it.second->nid_; }); if (node_it == range.second) { // Either the first time a node with this name is added or with an existing // name but unique id. + GraphNodePtr node = make_graph_node(entity); + if (node == nullptr) { + // Error handled. + return; + } NodeMap::iterator insertion_it = - ns_it->second.insert(std::make_pair(entity.node_name(), make_graph_node(entity, *this))); + ns_it->second.insert(std::make_pair(entity.node_name(), node)); + update_topic_maps_for_put(node, entity); total_nodes_in_graph_ += 1; if (insertion_it == ns_it->second.end()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "Unable to add a new node /%s with id %s an " + "Unable to add a new node /%s to an " "existing namespace %s in the graph. Report this bug.", entity.node_name().c_str(), - entity.id().c_str(), entity.node_namespace().c_str()); } return; } // Otherwise, the entity represents a node that already exists in the graph. // Update topic info if required below. + update_topic_maps_for_put(node_it->second, entity); +} - // Handles additions to an existing node in the graph. +///============================================================================= +void GraphCache::update_topic_maps_for_del( + GraphNodePtr graph_node, + const liveliness::Entity & entity) +{ if (entity.type() == EntityType::Node) { - // Creating a new node above would have also updated the graph with any topic info. + // Nothing to update for a node entity. return; } - - if (!entity.topic_info().has_value()) { - // Likely an error with parsing the token. - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Put token %s parsed without extracting topic_info. Report this bug.", - keyexpr.c_str()); - return; + // First update the topic map within the node. + if (entity.type() == EntityType::Publisher) { + update_topic_map_for_del(graph_node->pubs_, entity); + } else if (entity.type() == EntityType::Subscription) { + update_topic_map_for_del(graph_node->subs_, entity); + } else if (entity.type() == EntityType::Service) { + update_topic_map_for_del(graph_node->services_, entity); + } else { + update_topic_map_for_del(graph_node->clients_, entity); } - // Update the graph based on the entity. - add_topic_data(entity, *(node_it->second), *this); + // Then update the variables tracking topics across the graph. + if (entity.type() == EntityType::Publisher || + entity.type() == EntityType::Subscription) + { + update_topic_map_for_del(this->graph_topics_, entity, true); + } else { + update_topic_map_for_del(this->graph_services_, entity); + } } ///============================================================================= -void GraphCache::parse_del(const std::string & keyexpr) +void GraphCache::update_topic_map_for_del( + GraphNode::TopicMap & topic_map, + const liveliness::Entity & entity, + bool report_events) { - std::optional valid_entity = liveliness::Entity::make(keyexpr); - if (!valid_entity.has_value()) { - // Error message has already been logged. + if (!entity.topic_info().has_value()) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "update_topic_maps_for_del() called for non-node entity without valid TopicInfo. " + "Report this."); return; } - const liveliness::Entity & entity = *valid_entity; - - // Helper lambda to update graph_topics_. - auto update_topic_map = - [](GraphNode::TopicMap & graph_endpoints, - const liveliness::TopicInfo & topic_info, - std::size_t pub_count, - std::size_t sub_count) -> void - { - GraphNode::TopicMap::iterator cache_topic_it = - graph_endpoints.find(topic_info.name_); - if (cache_topic_it == graph_endpoints.end()) { + const liveliness::TopicInfo topic_info = entity.topic_info().value(); + const bool is_pub = is_entity_pub(entity); + // Initialize a map that will be populated with any QoS events that may be detected. + std::unordered_map> + local_entities_with_events = {}; + + GraphNode::TopicMap::iterator cache_topic_it = + topic_map.find(topic_info.name_); + if (cache_topic_it == topic_map.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic name %s not found in topic_map. Report this.", + topic_info.name_.c_str()); + return; + } else { + GraphNode::TopicTypeMap::iterator cache_topic_type_it = + cache_topic_it->second.find(topic_info.type_); + if (cache_topic_type_it == cache_topic_it->second.end()) { + // This should not happen. + RCUTILS_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "topic type %s not found in for topic %s. Report this.", + topic_info.type_.c_str(), topic_info.name_.c_str()); + return; + } else { + const std::string qos_str = liveliness::qos_to_keyexpr(topic_info.qos_); + GraphNode::TopicQoSMap::iterator cache_topic_qos_it = cache_topic_type_it->second.find( + qos_str); + if (cache_topic_qos_it == cache_topic_type_it->second.end()) { // This should not happen. RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "topic_key %s not found in graph_endpoints. Report this.", - topic_info.name_.c_str()); + "rmw_zenoh_cpp", "qos %s not found in for topic type %s. Report this.", + qos_str.c_str(), topic_info.type_.c_str()); + return; + } + // Decrement the relevant counters. If both counters are 0 remove from cache. + if (is_pub) { + cache_topic_qos_it->second->pubs_.erase(entity); } else { - GraphNode::TopicTypeMap::iterator cache_topic_data_it = - cache_topic_it->second.find(topic_info.type_); - if (cache_topic_data_it != cache_topic_it->second.end()) { - const std::string qos_str = liveliness::qos_to_keyexpr(topic_info.qos_); - GraphNode::TopicQoSMap::iterator cache_topic_qos_it = cache_topic_data_it->second.find( - qos_str); - if (cache_topic_qos_it != cache_topic_data_it->second.end()) { - // Decrement the relevant counters. If both counters are 0 remove from cache. - cache_topic_qos_it->second->stats_.pub_count_ -= pub_count; - cache_topic_qos_it->second->stats_.sub_count_ -= sub_count; - if (cache_topic_qos_it->second->stats_.pub_count_ == 0 && - cache_topic_qos_it->second->stats_.sub_count_ == 0) - { - cache_topic_data_it->second.erase(qos_str); - } - // If the qos map is empty, erase it from the topic_data_map. - if (cache_topic_data_it->second.empty()) { - cache_topic_it->second.erase(cache_topic_data_it); + cache_topic_qos_it->second->subs_.erase(entity); + } + // If after removing the entity, the parent map is empty, then remove parent + // map. + if (cache_topic_qos_it->second->pubs_.empty() && + cache_topic_qos_it->second->subs_.empty()) + { + cache_topic_type_it->second.erase(qos_str); + } + // Check for matched events + if (report_events) { + // TODO(Yadunund): Refactor into lambdas to reduce code duplication. + // We do not have to report any events for the entity removed event + // as it is already destructed. So we only check for matched entities + // in the graph that may be local. + if (is_pub) { + // Notify any local subs of a matched event with change -1. + for (const auto & [_, topic_data_ptr] : cache_topic_type_it->second) { + for (const liveliness::Entity & sub_entity : topic_data_ptr->subs_) { + update_event_counters( + topic_info.name_, + ZENOH_EVENT_SUBSCRIPTION_MATCHED, + static_cast(-1)); + if (is_entity_local(sub_entity)) { + local_entities_with_events[sub_entity].insert(ZENOH_EVENT_SUBSCRIPTION_MATCHED); + printf( + "Updating matched count by -1 for local sub: %s\n", + sub_entity.keyexpr().c_str()); + } } } - // If the topic does not have any TopicData entries, erase the topic from the map. - if (cache_topic_it->second.empty()) { - graph_endpoints.erase(cache_topic_it); + } else { + // Notify any local pubs of a matched event with change -1. + for (const auto & [_, topic_data_ptr] : cache_topic_type_it->second) { + for (const liveliness::Entity & pub_entity : topic_data_ptr->pubs_) { + update_event_counters( + topic_info.name_, + ZENOH_EVENT_PUBLICATION_MATCHED, + static_cast(-1)); + if (is_entity_local(pub_entity)) { + local_entities_with_events[pub_entity].insert(ZENOH_EVENT_PUBLICATION_MATCHED); + printf( + "Updating matched count by -1 for pub sub: %s\n", + pub_entity.keyexpr().c_str()); + } + } } } } - }; - - // Helper lambda to remove pub/subs to the GraphNode. - // We capture by reference to update caches like graph_topics_ if update_cache is true. - auto remove_topic_data = - [&update_topic_map](const Entity & entity, GraphNode & graph_node, - GraphCache & graph_cache) -> void - { - if (entity.type() == EntityType::Invalid || - entity.type() == EntityType::Node) - { - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "remove_topic_data() for invalid EntityType. Report this."); - return; - } - - if (!entity.topic_info().has_value()) { - // This should not happen as add_topic_data() is called after validating the existence - // of topic_info. - RCUTILS_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "remove_topic_data() called without valid TopicInfo. Report this."); - return; + // If the type does not have any qos entries, erase it from the type map. + if (cache_topic_type_it->second.empty()) { + cache_topic_it->second.erase(cache_topic_type_it); } - const liveliness::TopicInfo topic_info = entity.topic_info().value(); - // For the sake of reusing data structures and lookup functions, we treat publishers and - // clients are equivalent. Similarly, subscriptions and services are equivalent. - const std::size_t pub_count = entity.type() == EntityType::Publisher || - entity.type() == EntityType::Client ? 1 : 0; - const std::size_t sub_count = !pub_count; - - // First update the topic map within the node. - if (entity.type() == EntityType::Publisher) { - update_topic_map(graph_node.pubs_, topic_info, pub_count, sub_count); - } else if (entity.type() == EntityType::Subscription) { - update_topic_map(graph_node.subs_, topic_info, pub_count, sub_count); - } else if (entity.type() == EntityType::Service) { - update_topic_map(graph_node.services_, topic_info, pub_count, sub_count); - } else { - update_topic_map(graph_node.clients_, topic_info, pub_count, sub_count); + // If the topic does not have any TopicData entries, erase the topic from the map. + if (cache_topic_it->second.empty()) { + topic_map.erase(cache_topic_it); } + } + } + // Take events if any. + if (report_events) { + take_local_entities_with_events(local_entities_with_events); + } +} - // Then update the variables tracking topics across the graph. - // TODO(Yadunund): Check for QoS events. - if (entity.type() == EntityType::Publisher || - entity.type() == EntityType::Subscription) +///============================================================================= +void GraphCache::remove_topic_map_from_cache( + const GraphNode::TopicMap & to_remove, + GraphNode::TopicMap & from_cache) +{ + for (GraphNode::TopicMap::const_iterator topic_it = to_remove.begin(); + topic_it != to_remove.end(); ++topic_it) + { + for (GraphNode::TopicTypeMap::const_iterator topic_type_it = topic_it->second.begin(); + topic_type_it != topic_it->second.end(); ++topic_type_it) + { + for (GraphNode::TopicQoSMap::const_iterator topic_qos_it = + topic_type_it->second.begin(); + topic_qos_it != topic_type_it->second.end(); ++topic_qos_it) { - update_topic_map(graph_cache.graph_topics_, topic_info, pub_count, sub_count); - } else { - update_topic_map(graph_cache.graph_services_, topic_info, pub_count, sub_count); + // Technically only one of pubs_ or sub_ will be populated and with one + // element at most since to_remove comes from a node. For completeness, + // we iterate though both and call update_topic_map_for_del(). + for (const liveliness::Entity & entity : topic_qos_it->second->pubs_) { + update_topic_map_for_del( + from_cache, + entity, + true); + } + for (const liveliness::Entity & entity : topic_qos_it->second->subs_) { + update_topic_map_for_del( + from_cache, + entity, + true); + } } - }; + } + } +} +///============================================================================= +void GraphCache::parse_del( + const std::string & keyexpr, + bool ignore_from_current_session) +{ + printf("[parse_del %s] %s\n", zid_str_.c_str(), keyexpr.c_str()); + std::optional maybe_entity = liveliness::Entity::make(keyexpr); + if (!maybe_entity.has_value()) { + // Error message has already been logged. + return; + } + const liveliness::Entity entity = *maybe_entity; + if (ignore_from_current_session && is_entity_local(entity)) { + RCUTILS_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Ignoring parse_del for %s from the same session.\n", entity.keyexpr().c_str()); + return; + } // Lock the graph mutex before accessing the graph. std::lock_guard lock(graph_mutex_); @@ -373,8 +579,8 @@ void GraphCache::parse_del(const std::string & keyexpr) range.first, range.second, [&entity](const std::pair & node_it) { - // An operator== overload is defined above. - return entity.id() == node_it.second->id_; + // Match nodes if their zenoh sesion and node ids match. + return entity.zid() == node_it.second->zid_ && entity.nid() == node_it.second->nid_; }); if (node_it == range.second) { // Node does not exist. @@ -392,37 +598,18 @@ void GraphCache::parse_del(const std::string & keyexpr) // given the reliability QoS for liveliness subs. However, if we find any pubs/subs present in // the node below, we should update the count in graph_topics_. const GraphNodePtr graph_node = node_it->second; - if (!graph_node->pubs_.empty() || !graph_node->subs_.empty()) { + if (!graph_node->pubs_.empty() || + !graph_node->subs_.empty() || + !graph_node->clients_.empty() || + !graph_node->services_.empty()) + { RCUTILS_LOG_WARN_NAMED( "rmw_zenoh_cpp", - "Received liveliness token to remove node /%s from the graph before all pub/subs for this " - "node have been removed. Removing all pub/subs first...", + "Received liveliness token to remove node /%s from the graph before all pub/subs/" + "clients/services for this node have been removed. Removing all entities first...", entity.node_name().c_str() ); // We update the tracking variables to reduce the count of entities present in this node. - auto remove_topic_map_from_cache = - [&update_topic_map](const GraphNode::TopicMap & to_remove, - GraphNode::TopicMap & from_cache) -> void - { - for (GraphNode::TopicMap::const_iterator topic_it = to_remove.begin(); - topic_it != to_remove.end(); ++topic_it) - { - for (GraphNode::TopicTypeMap::const_iterator topic_type_it = topic_it->second.begin(); - topic_type_it != topic_it->second.end(); ++topic_type_it) - { - for (GraphNode::TopicQoSMap::const_iterator topic_qos_it = - topic_type_it->second.begin(); - topic_qos_it != topic_type_it->second.end(); ++topic_qos_it) - { - update_topic_map( - from_cache, - topic_qos_it->second->info_, - topic_qos_it->second->stats_.pub_count_, - topic_qos_it->second->stats_.sub_count_); - } - } - } - }; remove_topic_map_from_cache(graph_node->pubs_, graph_topics_); remove_topic_map_from_cache(graph_node->subs_, graph_topics_); remove_topic_map_from_cache(graph_node->services_, graph_services_); @@ -436,16 +623,8 @@ void GraphCache::parse_del(const std::string & keyexpr) return; } - if (!entity.topic_info().has_value()) { - // Likely an error with parsing the token. - RCUTILS_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Del token %s parsed without extracting TopicData. Report this bug.", - keyexpr.c_str()); - return; - } - // Update the graph based on the entity. - remove_topic_data(entity, *(node_it->second), *this); + update_topic_maps_for_del(node_it->second, entity); } ///============================================================================= @@ -668,7 +847,7 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions( if (topic_data_it != topic_it->second.end()) { for (const auto & [_, topic_data] : topic_data_it->second) { // If a subscription exists with compatible QoS, update the subscription count. - if (topic_data->stats_.sub_count_ > 0) { + if (!topic_data->subs_.empty()) { rmw_qos_compatibility_type_t is_compatible; rmw_ret_t ret = rmw_qos_profile_check_compatible( pub_data->adapted_qos_profile, @@ -676,8 +855,8 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions( &is_compatible, nullptr, 0); - if (ret == RMW_RET_OK && is_compatible == RMW_QOS_COMPATIBILITY_OK) { - *subscription_count = *subscription_count + topic_data->stats_.sub_count_; + if (ret == RMW_RET_OK && is_compatible != RMW_QOS_COMPATIBILITY_ERROR) { + *subscription_count = *subscription_count + topic_data->subs_.size(); } } } @@ -703,7 +882,7 @@ rmw_ret_t GraphCache::subscription_count_matched_publishers( if (topic_data_it != topic_it->second.end()) { for (const auto & [_, topic_data] : topic_data_it->second) { // If a subscription exists with compatible QoS, update the subscription count. - if (topic_data->stats_.pub_count_ > 0) { + if (!topic_data->pubs_.empty()) { rmw_qos_compatibility_type_t is_compatible; rmw_ret_t ret = rmw_qos_profile_check_compatible( sub_data->adapted_qos_profile, @@ -711,8 +890,8 @@ rmw_ret_t GraphCache::subscription_count_matched_publishers( &is_compatible, nullptr, 0); - if (ret == RMW_RET_OK && is_compatible == RMW_QOS_COMPATIBILITY_OK) { - *publisher_count = *publisher_count + topic_data->stats_.pub_count_; + if (ret == RMW_RET_OK && is_compatible != RMW_QOS_COMPATIBILITY_ERROR) { + *publisher_count = *publisher_count + topic_data->pubs_.size(); } } } @@ -747,7 +926,7 @@ rmw_ret_t GraphCache::count_publishers( GraphNode::TopicQoSMap> & topic_data : graph_topics_.at(topic_name)) { for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { - *count += it->second->stats_.pub_count_; + *count += it->second->pubs_.size(); } } } @@ -768,7 +947,7 @@ rmw_ret_t GraphCache::count_subscriptions( GraphNode::TopicQoSMap> & topic_data : graph_topics_.at(topic_name)) { for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { - *count += it->second->stats_.sub_count_; + *count += it->second->subs_.size(); } } } @@ -789,7 +968,7 @@ rmw_ret_t GraphCache::count_services( GraphNode::TopicQoSMap> & topic_data : graph_services_.at(service_name)) { for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { - *count += it->second->stats_.sub_count_; + *count += it->second->subs_.size(); } } } @@ -810,7 +989,7 @@ rmw_ret_t GraphCache::count_clients( GraphNode::TopicQoSMap> & topic_data : graph_services_.at(service_name)) { for (auto it = topic_data.second.begin(); it != topic_data.second.end(); ++it) { - *count += it->second->stats_.pub_count_; + *count += it->second->pubs_.size(); } } } @@ -1019,7 +1198,7 @@ rmw_ret_t GraphCache::service_server_is_available( GraphNode::TopicTypeMap::iterator type_it = service_it->second.find(service_type); if (type_it != service_it->second.end()) { for (const auto & [_, topic_data] : type_it->second) { - if (topic_data->stats_.sub_count_ > 0) { + if (topic_data->subs_.size() > 0) { *is_available = true; return RMW_RET_OK; } @@ -1029,3 +1208,101 @@ rmw_ret_t GraphCache::service_server_is_available( return RMW_RET_OK; } + +///============================================================================= +void GraphCache::set_qos_event_callback( + const liveliness::Entity & entity, + const rmw_zenoh_event_type_t & event_type, + GraphCacheEventCallback callback) +{ + std::lock_guard lock(graph_mutex_); + + if (event_type > ZENOH_EVENT_ID_MAX) { + RCUTILS_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "set_qos_event_callback() called for unsupported event. Report this."); + return; + } + + const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity); + if (event_cb_it == event_callbacks_.end()) { + // First time a callback is being set for this entity. + event_callbacks_[entity] = {}; + event_callbacks_[entity].insert(std::make_pair(event_type, std::move(callback))); + return; + } + event_cb_it->second[event_type] = std::move(callback); + // printf( + // "[graph_cache] Set callback for rmw_zenoh_event_type_t %s for entity %s\n", + // std::to_string(event_type).c_str(), entity.keyexpr().c_str()); +} + +///============================================================================= +bool GraphCache::is_entity_local(const liveliness::Entity & entity) const +{ + // For now zenoh does not expose unique IDs for its entities and hence the id + // assigned to an entity is always the zenoh session id. When we update liveliness + // tokens to contain globally unique ids for entities, we should also update the logic here. + return entity.zid() == zid_str_; +} + +///============================================================================= +bool GraphCache::is_entity_pub(const liveliness::Entity & entity) const +{ + if (entity.type() == EntityType::Publisher || + entity.type() == EntityType::Client) + { + return true; + } + return false; +} + +///============================================================================= +void GraphCache::update_event_counters( + const std::string & topic_name, + const rmw_zenoh_event_type_t event_id, + int32_t change) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + return; + } + + std::lock_guard lock(events_mutex_); + + auto event_statuses_it = event_statuses_.find(topic_name); + if (event_statuses_it == event_statuses_.end()) { + // Initialize statuses. + std::array status_array {}; + event_statuses_[topic_name] = std::move(status_array); + } + + rmw_zenoh_event_status_t & status_to_update = event_statuses_[topic_name][event_id]; + status_to_update.total_count += std::abs(change); + status_to_update.total_count_change += std::abs(change); + status_to_update.current_count += change; + status_to_update.current_count_change = change; +} + +///============================================================================= +std::unique_ptr GraphCache::take_event_status( + const std::string & topic_name, + const rmw_zenoh_event_type_t event_id) +{ + if (event_id > ZENOH_EVENT_ID_MAX) { + return nullptr; + } + + std::lock_guard lock(events_mutex_); + + auto event_statuses_it = event_statuses_.find(topic_name); + if (event_statuses_it == event_statuses_.end()) { + return nullptr; + } + + rmw_zenoh_event_status_t & status_to_take = event_statuses_[topic_name][event_id]; + auto result = std::make_unique(status_to_take); + // Reset changes. + status_to_take.total_count_change = 0; + status_to_take.current_count_change = 0; + return result; +} diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 603422ac..d00be06f 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -15,6 +15,7 @@ #ifndef DETAIL__GRAPH_CACHE_HPP_ #define DETAIL__GRAPH_CACHE_HPP_ +#include #include #include #include @@ -37,34 +38,31 @@ ///============================================================================= // TODO(Yadunund): Since we reuse pub_count_ and sub_count_ for pub/sub and // service/client consider more general names for these fields. -struct TopicStats -{ - // The count of publishers or clients. - std::size_t pub_count_; - - // The count of subscriptions or services. - std::size_t sub_count_; - - // Constructor which initializes counters to 0. - TopicStats(std::size_t pub_count, std::size_t sub_count); -}; - -///============================================================================= +// Consider changing this to an array of unordered_set where the index of the +// array corresponds to the EntityType enum. This way we don't need to mix +// pub/sub with client/service. struct TopicData { liveliness::TopicInfo info_; - TopicStats stats_; + + // The publishers or clients entities. + std::unordered_set pubs_; + + // The subscriptions or services entities + std::unordered_set subs_; TopicData( liveliness::TopicInfo info, - TopicStats stats); + std::unordered_set pubs, + std::unordered_set subs); }; using TopicDataPtr = std::shared_ptr; ///============================================================================= struct GraphNode { - std::string id_; + std::string zid_; + std::string nid_; std::string ns_; std::string name_; // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node? @@ -91,10 +89,16 @@ using GraphNodePtr = std::shared_ptr; class GraphCache final { public: + /// @brief Constructor + /// @param id The id of the zenoh session that is building the graph cache. + /// This is used to infer which entities originated from the current session + /// so that appropriate event callbacks may be triggered. + explicit GraphCache(const z_id_t & zid); + // Parse a PUT message over a token's key-expression and update the graph. - void parse_put(const std::string & keyexpr); + void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false); // Parse a DELETE message over a token's key-expression and update the graph. - void parse_del(const std::string & keyexpr); + void parse_del(const std::string & keyexpr, bool ignore_from_current_session = false); rmw_ret_t get_node_names( rcutils_string_array_t * node_names, @@ -155,7 +159,67 @@ class GraphCache final const char * service_type, bool * is_available); + /// @brief Signature for a function that will be invoked by the GraphCache when a QoS + /// event is detected. + using GraphCacheEventCallback = std::function)>; + + /// Set a qos event callback for an entity from the current session. + /// @note The callback will be removed when the entity is removed from the graph. + void set_qos_event_callback( + const liveliness::Entity & entity, + const rmw_zenoh_event_type_t & event_type, + GraphCacheEventCallback callback); + private: + // Helper function to convert an Entity into a GraphNode. + // Note: this will update bookkeeping variables in GraphCache. + std::shared_ptr make_graph_node(const liveliness::Entity & entity) const; + + // Helper function to update TopicMap within the node the cache for the entire graph. + void update_topic_maps_for_put( + GraphNodePtr graph_node, + const liveliness::Entity & entity); + + void update_topic_map_for_put( + GraphNode::TopicMap & topic_map, + const liveliness::Entity & entity, + bool report_events = false); + + void update_topic_maps_for_del( + GraphNodePtr graph_node, + const liveliness::Entity & entity); + + void update_topic_map_for_del( + GraphNode::TopicMap & topic_map, + const liveliness::Entity & entity, + bool report_events = false); + + void remove_topic_map_from_cache( + const GraphNode::TopicMap & to_remove, + GraphNode::TopicMap & from_cache); + + /// Returns true if the entity was created within the same context / zenoh session. + bool is_entity_local(const liveliness::Entity & entity) const; + + /// Returns true if the entity is a publisher or client. False otherwise. + bool is_entity_pub(const liveliness::Entity & entity) const; + + void update_event_counters( + const std::string & topic_name, + const rmw_zenoh_event_type_t event_id, + int32_t change); + + // Take status and reset change counters. + std::unique_ptr take_event_status( + const std::string & topic_name, + const rmw_zenoh_event_type_t event_id); + + void take_local_entities_with_events( + std::unordered_map> & + local_entities_with_events); + + + std::string zid_str_; /* namespace_1: node_1: @@ -190,6 +254,22 @@ class GraphCache final // Optimize service/client lookups across the graph. GraphNode::TopicMap graph_services_ = {}; + using GraphEventCallbacks = std::unordered_map; + // Map entity (based on uuid) to a map of event callbacks. + // Note: Since we use unordered_map, we will only store a single callback for an + // entity string. So we do not support the case where a node create a duplicate + // pub/sub with the exact same topic, type & QoS but registers a different callback + // for the same event type. We could switch to a multimap here but removing the callback + // will be impossible right now since entities do not have unique IDs. + using GraphEventCallbackMap = std::unordered_map; + // EventCallbackMap for each type of event we support in rmw_zenoh_cpp. + GraphEventCallbackMap event_callbacks_; + // Counters to track changes to event statues for each topic. + std::unordered_map> event_statuses_; + std::mutex events_mutex_; + + // Mutex to lock before modifying the members above. mutable std::mutex graph_mutex_; }; diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp index 03ffccfe..d7455130 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.cpp @@ -14,6 +14,7 @@ #include "liveliness_utils.hpp" +#include #include #include #include @@ -60,6 +61,25 @@ TopicInfo::TopicInfo( ///============================================================================= namespace { +/// Enum of liveliness key-expression components. +enum KeyexprIndex +{ + AdminSpace, + DomainId, + Zid, + Nid, + Id, + EntityStr, + Namespace, + NodeName, + TopicName, + TopicType, + TopicQoS +}; + +// Every keyexpression will have components upto node name. +#define KEYEXPR_INDEX_MIN KeyexprIndex::NodeName +#define KEYEXPR_INDEX_MAX KeyexprIndex::TopicQoS /// The admin space used to prefix the liveliness tokens. static const char ADMIN_SPACE[] = "@ros2_lv"; @@ -68,6 +88,8 @@ static const char PUB_STR[] = "MP"; static const char SUB_STR[] = "MS"; static const char SRV_STR[] = "SS"; static const char CLI_STR[] = "SC"; +static const char EMPTY_NAMESPACE_REPLACEMENT = '_'; +static const char KEYEXPR_DELIMITER = '/'; static const char SLASH_REPLACEMENT = '%'; static const char QOS_DELIMITER = ':'; static const char QOS_HISTORY_DELIMITER = ','; @@ -113,20 +135,9 @@ static const std::unordered_map str_to {std::to_string(RMW_QOS_POLICY_DURABILITY_UNKNOWN), RMW_QOS_POLICY_DURABILITY_UNKNOWN} }; -std::string zid_to_str(z_id_t id) -{ - std::stringstream ss; - ss << std::hex; - size_t i = 0; - for (; i < (sizeof(id.id)); i++) { - ss << static_cast(id.id[i]); - } - return ss.str(); -} - std::vector split_keyexpr( const std::string & keyexpr, - const char delim = '/') + const char delim = KEYEXPR_DELIMITER) { std::vector result = {}; size_t start = 0; @@ -208,6 +219,18 @@ std::optional keyexpr_to_qos(const std::string & keyexpr) return qos; } +///============================================================================= +std::string zid_to_str(const z_id_t & id) +{ + std::stringstream ss; + ss << std::hex; + size_t i = 0; + for (; i < (sizeof(id.id)); i++) { + ss << static_cast(id.id[i]); + } + return ss.str(); +} + ///============================================================================= std::string subscription_token(size_t domain_id) { @@ -217,77 +240,69 @@ std::string subscription_token(size_t domain_id) ///============================================================================= Entity::Entity( + std::string zid, + std::string nid, std::string id, EntityType type, NodeInfo node_info, std::optional topic_info) -: id_(std::move(id)), +: zid_(std::move(zid)), + nid_(std::move(nid)), + id_(std::move(id)), type_(std::move(type)), node_info_(std::move(node_info)), topic_info_(std::move(topic_info)) { - /** - * Set the liveliness token for the particular entity. - * - * The liveliness token keyexprs are in the form: - * - * ///// - * - * Where: - * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. - * - A unique ID to identify this entity. Currently the id is the zenoh session's id with elements concatenated into a string using '.' as separator. - * - The type of entity. This can be one of "NN" for a node, "MP" for a publisher, "MS" for a subscription, "SS" for a service server, or "SC" for a service client. - * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. - * - The ROS node name for this entity. - * - * For entities with topic infomation, the liveliness token keyexpr have additional fields: - * - * //////// - * - The ROS topic name for this entity. - * - The type for the topic. - * - The qos for the topic (see qos_to_keyexpr() docstring for more information). - * - * For example, the liveliness expression for a publisher within a /talker node that publishes - * an std_msgs/msg/String over topic /chatter and with QoS settings of Reliability: best_effort, - * Durability: transient_local, History: keep_all, and depth: 10, would be - * "@ros2_lv/0/q1w2e3r4t5y6/MP/_/talker/dds_::std_msgs::msg::String/2:1:2,10". - * Note: The domain_id is assumed to be 0 and a random id is used in the example. Also the - * _dds:: prefix in the topic_type is an artifact of the type support implementation and is - * removed when reporting the topic_type in graph_cache.cpp (see _demangle_if_ros_type()). - */ - std::stringstream token_ss; - const std::string & ns = node_info_.ns_; - token_ss << ADMIN_SPACE << "/" << node_info_.domain_id_ << "/" << id_ << "/" << entity_to_str.at( - type_) << ns; + std::string keyexpr_parts[KEYEXPR_INDEX_MAX + 1] {}; + keyexpr_parts[KeyexprIndex::AdminSpace] = ADMIN_SPACE; + keyexpr_parts[KeyexprIndex::DomainId] = std::to_string(node_info_.domain_id_); + keyexpr_parts[KeyexprIndex::Zid] = zid_; + keyexpr_parts[KeyexprIndex::Nid] = nid_; + keyexpr_parts[KeyexprIndex::Id] = id_; + keyexpr_parts[KeyexprIndex::EntityStr] = entity_to_str.at(type_); // An empty namespace from rcl will contain "/" but zenoh does not allow keys with "//". // Hence we add an "_" to denote an empty namespace such that splitting the key // will always result in 5 parts. - if (ns == "/") { - token_ss << "_/"; - } else { - token_ss << "/"; - } - // Finally append node name. - token_ss << mangle_name(node_info_.name_); + keyexpr_parts[KeyexprIndex::Namespace] = mangle_name(node_info_.ns_); + keyexpr_parts[KeyexprIndex::NodeName] = mangle_name(node_info_.name_); // If this entity has a topic info, append it to the token. if (topic_info_.has_value()) { const auto & topic_info = this->topic_info_.value(); - // Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/". - token_ss << - "/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + qos_to_keyexpr( - topic_info.qos_); + keyexpr_parts[KeyexprIndex::TopicName] = mangle_name(topic_info.name_); + keyexpr_parts[KeyexprIndex::TopicType] = mangle_name(topic_info.type_); + keyexpr_parts[KeyexprIndex::TopicQoS] = qos_to_keyexpr(topic_info.qos_); } - this->keyexpr_ = token_ss.str(); + for (std::size_t i = 0; i < KEYEXPR_INDEX_MAX + 1; ++i) { + bool last = false; + if (!keyexpr_parts[i].empty()) { + this->keyexpr_ += std::move(keyexpr_parts[i]); + } + if (i == KEYEXPR_INDEX_MAX || keyexpr_parts[i + 1].empty()) { + last = true; + } + if (last) { + break; + } + // Append the delimiter unless it is the last component. + this->keyexpr_ += KEYEXPR_DELIMITER; + } + this->guid_ = std::hash{}(this->keyexpr_); } ///============================================================================= std::optional Entity::make( - z_id_t id, + z_id_t zid, + const std::string & nid, + const std::string & id, EntityType type, NodeInfo node_info, std::optional topic_info) { + if (id.empty()) { + RCUTILS_SET_ERROR_MSG("Invalid id."); + return std::nullopt; + } if (entity_to_str.find(type) == entity_to_str.end()) { RCUTILS_SET_ERROR_MSG("Invalid entity type."); return std::nullopt; @@ -301,7 +316,13 @@ std::optional Entity::make( return std::nullopt; } - Entity entity{zid_to_str(id), std::move(type), std::move(node_info), std::move(topic_info)}; + Entity entity{ + zid_to_str(zid), + std::move(nid), + std::move(id), + std::move(type), + std::move(node_info), + std::move(topic_info)}; return entity; } @@ -309,10 +330,10 @@ std::optional Entity::make( std::optional Entity::make(const std::string & keyexpr) { std::vector parts = split_keyexpr(keyexpr); - // A token will contain at least 5 parts: - // (ADMIN_SPACE, domain_id, entity_str, namespace, node_name). + // Every token will contain at least 7 parts: + // (ADMIN_SPACE, domain_id, zid, id, entity_type, namespace, node_name). // Basic validation. - if (parts.size() < 6) { + if (parts.size() < KEYEXPR_INDEX_MIN + 1) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received invalid liveliness token"); @@ -327,7 +348,7 @@ std::optional Entity::make(const std::string & keyexpr) } } - if (parts[0] != ADMIN_SPACE) { + if (parts[KeyexprIndex::AdminSpace] != ADMIN_SPACE) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received liveliness token with invalid admin space."); @@ -335,7 +356,7 @@ std::optional Entity::make(const std::string & keyexpr) } // Get the entity, ie NN, MP, MS, SS, SC. - std::string & entity_str = parts[3]; + std::string & entity_str = parts[KeyexprIndex::EntityStr]; std::unordered_map::const_iterator entity_it = str_to_entity.find(entity_str); if (entity_it == str_to_entity.end()) { @@ -346,21 +367,23 @@ std::optional Entity::make(const std::string & keyexpr) } EntityType entity_type = entity_it->second; - std::size_t domain_id = std::stoul(parts[1]); - std::string & id = parts[2]; - std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]); - std::string node_name = demangle_name(std::move(parts[5])); + std::size_t domain_id = std::stoul(parts[KeyexprIndex::DomainId]); + std::string & zid = parts[KeyexprIndex::Zid]; + std::string & nid = parts[KeyexprIndex::Nid]; + std::string & id = parts[KeyexprIndex::Id]; + std::string ns = demangle_name(std::move(parts[KeyexprIndex::Namespace])); + std::string node_name = demangle_name(std::move(parts[KeyexprIndex::NodeName])); std::optional topic_info = std::nullopt; // Populate topic_info if we have a token for an entity other than a node. if (entity_type != EntityType::Node) { - if (parts.size() < 9) { + if (parts.size() < KEYEXPR_INDEX_MAX + 1) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Received liveliness token for non-node entity without required parameters."); return std::nullopt; } - std::optional qos = keyexpr_to_qos(parts[8]); + std::optional qos = keyexpr_to_qos(parts[KeyexprIndex::TopicQoS]); if (!qos.has_value()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -368,25 +391,45 @@ std::optional Entity::make(const std::string & keyexpr) return std::nullopt; } topic_info = TopicInfo{ - demangle_name(std::move(parts[6])), - std::move(parts[7]), + demangle_name(std::move(parts[KeyexprIndex::TopicName])), + demangle_name(std::move(parts[KeyexprIndex::TopicType])), std::move(qos.value()) }; } return Entity{ + std::move(zid), + std::move(nid), std::move(id), std::move(entity_type), NodeInfo{std::move(domain_id), std::move(ns), std::move(node_name), ""}, std::move(topic_info)}; } +///============================================================================= +std::string Entity::zid() const +{ + return this->zid_; +} + +///============================================================================= +std::string Entity::nid() const +{ + return this->nid_; +} + ///============================================================================= std::string Entity::id() const { return this->id_; } +///============================================================================= +std::size_t Entity::guid() const +{ + return this->guid_; +} + ///============================================================================= EntityType Entity::type() const { @@ -420,6 +463,14 @@ std::string Entity::keyexpr() const return this->keyexpr_; } +///============================================================================= +bool Entity::operator==(const Entity & other) const +{ + // TODO(Yadunund): If we decide to directly store the guid as a + // rmw_gid_t type, we should rely on rmw_compare_gids_equal() instead. + return other.guid() == guid_; +} + ///============================================================================= std::string mangle_name(const std::string & input) { diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index 89b606dc..db498073 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -61,7 +62,6 @@ std::string subscription_token(size_t domain_id); ///============================================================================= enum class EntityType : uint8_t { - Invalid = 0, Node, Publisher, Subscription, @@ -71,16 +71,53 @@ enum class EntityType : uint8_t ///============================================================================= // An struct to bundle results of parsing a token. -// TODO(Yadunund): Consider using variadic templates to pass args instead of -// relying on optional fields. +/** + * Every entity will generate a unique key-expression for setting up a liveliness token. + * + * The minimal key-expression is of the form: + * + * /////// + * + * Where: + * - A number set by the user to "partition" graphs. Roughly equivalent to the domain ID in DDS. + * - The zenoh session's id with elements concatenated into a string using '.' as separator. + * - A unique ID within the zenoh session of the node which created this entity. + * - A unique ID within the zenoh session to identify this entity. When entity is a node, the id and nid are equal. + * - The type of entity. This can be one of "NN" for a Network Node, "MP" for a Message Publisher, "MS" for a Message Subscription, "SS" for a Service Server, or "SC" for a Service Client. + * - The ROS namespace for this entity. If the namespace is absolute, this function will add in an _ for later parsing reasons. + * - The ROS node name for this entity. + * + * For entities with topic infomation, the liveliness token keyexpr have additional fields: + * + * ///////// + * - The ROS topic name for this entity. + * - The type for the topic. + * - The qos for the topic (see qos_to_keyexpr() docstring for more information). + * + * For example, the liveliness expression for a publisher within a /talker node that publishes + * an std_msgs/msg/String over topic /chatter and with QoS settings of Reliability: best_effort, + * Durability: transient_local, History: keep_all, and depth: 10, would be + * "@ros2_lv/0/q1w2e3r4t5y6/1/32/MP/_/talker/dds_::std_msgs::msg::String/2:1:2,10". + * Note: The domain_id is assumed to be 0 and a random id is used in the example. Also the + * _dds:: prefix in the topic_type is an artifact of the type support implementation and is + * removed when reporting the topic_type in graph_cache.cpp (see _demangle_if_ros_type()). + */ class Entity { public: - /// Make an Entity from datatypes. This will return nullopt if the required - /// fields are not present for the EntityType. // TODO(Yadunund): Find a way to better bundle the type and the associated data. + /// @brief Make an Entity from datatypes. This will return nullopt if the required + /// fields are not present for the EntityType. + /// @param zid The zenoh session id within which this entity was created. + /// @param id A unique id for this entity within the zenoh session. + /// @param type The type of the entity. + /// @param node_info The node information that is required for all entities. + /// @param topic_info An optional topic information for relevant entities. + /// @return An entity if all inputs are valid. This way no invalid entities can be created. static std::optional make( - z_id_t id, + z_id_t zid, + const std::string & nid, + const std::string & id, EntityType type, NodeInfo node_info, std::optional topic_info = std::nullopt); @@ -88,8 +125,22 @@ class Entity /// Make an Entity from a liveliness keyexpr. static std::optional make(const std::string & keyexpr); + // Get the zenoh session id as a string. This is not unique as entities + // created within the same session, will have the same ids. + std::string zid() const; + + // Get the id of the node of this entity. + std::string nid() const; + + // Get the id of the entity local to a zenoh session. + // Use guid() to retrieve a globally unique id. std::string id() const; + // Interim method to get a globally unique id for this entity which is the hash of the keyexpr. + // TODO(Yadunund): Should this return a rmw_gid_t? + // This is named guid and not gid to remain distinct as it is not of type rmw_gid_t. + std::size_t guid() const; + /// Get the entity type. EntityType type() const; @@ -105,26 +156,37 @@ class Entity /// Get the liveliness keyexpr for this entity. std::string keyexpr() const; + // Two entities are equal if their guids are equal. + bool operator==(const Entity & other) const; + private: Entity( + std::string zid, + std::string nid, std::string id, EntityType type, NodeInfo node_info, std::optional topic_info); + std::string zid_; + std::string nid_; std::string id_; + std::size_t guid_; EntityType type_; NodeInfo node_info_; std::optional topic_info_; std::string keyexpr_; }; +///============================================================================= /// Replace "/" instances with "%". std::string mangle_name(const std::string & input); +///============================================================================= /// Replace "%" instances with "/". std::string demangle_name(const std::string & input); +///============================================================================= /** * Convert a rmw_qos_profile_t to a string with format: * @@ -140,10 +202,28 @@ std::string demangle_name(const std::string & input); */ std::string qos_to_keyexpr(rmw_qos_profile_t qos); +///============================================================================= /// Convert a rmw_qos_profile_t from a keyexpr. Return std::nullopt if invalid. std::optional keyexpr_to_qos(const std::string & keyexpr); +///============================================================================= +/// Convert a Zenoh id to a string. +std::string zid_to_str(const z_id_t & id); } // namespace liveliness +///============================================================================= +// Allow Entity to be hashed and used as a key in unordered_maps/sets +namespace std +{ +template<> +struct hash +{ + auto operator()(const liveliness::Entity & entity) const -> size_t + { + return entity.guid(); + } +}; +} // namespace std + #endif // DETAIL__LIVELINESS_UTILS_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index cebb27bb..5a374aaa 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -26,6 +26,11 @@ #include "rmw_data_types.hpp" +///============================================================================= +size_t rmw_context_impl_s::get_next_entity_id() +{ + return next_entity_id_++; +} ///============================================================================= saved_msg_data::saved_msg_data(zc_owned_payload_t p, uint64_t recv_ts, const uint8_t pub_gid[16]) @@ -111,10 +116,12 @@ void rmw_subscription_data_t::add_new_message( } } + // TODO(Yadunund): Check for ZENOH_EVENT_MESSAGE_LOST. + message_queue_.emplace_back(std::move(msg)); // Since we added new data, trigger user callback and guard condition if they are available - trigger_user_callback(); + data_callback_mgr.trigger_callback(); notify(); } @@ -169,7 +176,7 @@ void rmw_service_data_t::add_new_query(std::unique_ptr query) query_queue_.emplace_back(std::move(query)); // Since we added new data, trigger user callback and guard condition if they are available - trigger_user_callback(); + data_callback_mgr.trigger_callback(); notify(); } @@ -218,7 +225,7 @@ void rmw_client_data_t::add_new_reply(std::unique_ptr reply) reply_queue_.emplace_back(std::move(reply)); // Since we added new data, trigger user callback and guard condition if they are available - trigger_user_callback(); + data_callback_mgr.trigger_callback(); notify(); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index d81902b9..6f7490aa 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -41,8 +41,9 @@ /// Structs for various type erased data fields. ///============================================================================= -struct rmw_context_impl_s +class rmw_context_impl_s { +public: // An owned session. z_owned_session_t session; @@ -59,7 +60,13 @@ struct rmw_context_impl_s /// Guard condition that should be triggered when the graph changes. rmw_guard_condition_t * graph_guard_condition; - GraphCache graph_cache; + std::unique_ptr graph_cache; + + size_t get_next_entity_id(); + +private: + // A counter to assign a local id for every entity created in this session. + size_t next_entity_id_{0}; }; ///============================================================================= @@ -69,12 +76,19 @@ struct rmw_node_data_t // for cases where a node may spin up but does not have any publishers or subscriptions. // Liveliness token for the node. zc_owned_liveliness_token_t token; + + // The entity id of this node as generated by get_next_entity_id(). + // Every interface created by this node will include this id in its liveliness token. + size_t id; }; ///============================================================================= -class rmw_publisher_data_t : public EventsBase +class rmw_publisher_data_t { public: + // The Entity generated for the publisher. + std::optional entity; + // An owned publisher. z_owned_publisher_t pub; @@ -94,6 +108,8 @@ class rmw_publisher_data_t : public EventsBase // Context for memory allocation for messages. rmw_context_t * context; + + EventsManager events_mgr; }; ///============================================================================= @@ -121,9 +137,12 @@ struct saved_msg_data }; ///============================================================================= -class rmw_subscription_data_t : public EventsBase +class rmw_subscription_data_t { public: + // The Entity generated for the subscription. + std::optional entity; + // An owned subscriber or querying_subscriber depending on the QoS settings. std::variant sub; @@ -148,6 +167,9 @@ class rmw_subscription_data_t : public EventsBase void add_new_message(std::unique_ptr msg, const std::string & topic_name); + DataCallbackManager data_callback_mgr; + EventsManager events_mgr; + private: std::deque> message_queue_; mutable std::mutex message_queue_mutex_; @@ -180,9 +202,12 @@ class ZenohQuery final }; ///============================================================================= -class rmw_service_data_t : public EventsBase +class rmw_service_data_t { public: + // The Entity generated for the service. + std::optional entity; + z_owned_keyexpr_t keyexpr; z_owned_queryable_t qable; @@ -215,6 +240,8 @@ class rmw_service_data_t : public EventsBase std::unique_ptr take_from_query_map(int64_t sequence_number); + DataCallbackManager data_callback_mgr; + private: void notify(); @@ -245,9 +272,12 @@ class ZenohReply final }; ///============================================================================= -class rmw_client_data_t : public EventsBase +class rmw_client_data_t { public: + // The Entity generated for the client. + std::optional entity; + z_owned_keyexpr_t keyexpr; z_owned_closure_reply_t zn_closure_reply; @@ -280,6 +310,8 @@ class rmw_client_data_t : public EventsBase std::unique_ptr pop_next_reply(); + DataCallbackManager data_callback_mgr; + private: void notify(); diff --git a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp index 353de39a..5e674089 100644 --- a/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp +++ b/rmw_zenoh_cpp/src/detail/zenoh_router_check.cpp @@ -21,32 +21,6 @@ #include #include -namespace -{ - -// Convert a Zenoh Id to a string -// Zenoh IDs are LSB-first 128bit unsigned and non-zero integers in hexadecimal lowercase. -// @param pid Zenoh Id to convert -std::string zid_to_str(const z_id_t & pid) -{ - std::stringstream ss; - int len = 0; - for (size_t i = 0; i < sizeof(pid.id); ++i) { - if (pid.id[i]) { - len = static_cast(i) + 1; - } - } - if (!len) { - ss << ""; - } else { - for (int i = len - 1; i >= 0; --i) { - ss << std::hex << std::setfill('0') << std::setw(2) << static_cast(pid.id[i]); - } - } - return ss.str(); -} - -} // namespace rmw_ret_t zenoh_router_check(z_session_t session) { @@ -55,9 +29,9 @@ rmw_ret_t zenoh_router_check(z_session_t session) // Define callback auto callback = [](const struct z_id_t * id, void * ctx) { - const std::string id_str = zid_to_str(*id); // Note: Callback is guaranteed to never be called // concurrently according to z_info_routers_zid docstring + static_cast(id); (*(static_cast(ctx)))++; }; diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 0fde0913..a22cdcf8 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -14,9 +14,11 @@ #include "rmw/error_handling.h" #include "rmw/event.h" +#include "rmw/events_statuses/events_statuses.h" #include "rmw/types.h" #include "detail/event.hpp" +#include "detail/graph_cache.hpp" #include "detail/identifier.hpp" #include "detail/rmw_data_types.hpp" @@ -37,22 +39,40 @@ rmw_publisher_event_init( RMW_CHECK_ARGUMENT_FOR_NULL(publisher->data, RMW_RET_INVALID_ARGUMENT); rmw_publisher_data_t * pub_data = static_cast(publisher->data); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); if (publisher->implementation_identifier != rmw_zenoh_identifier) { RMW_SET_ERROR_MSG("Publisher implementation identifier not from this implementation"); return RMW_RET_INCORRECT_RMW_IMPLEMENTATION; } - if (event_map.count(event_type) != 1) { + auto rmw_event_it = event_map.find(event_type); + if (rmw_event_it == event_map.end()) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "provided event_type %d is not supported by rmw_zenoh_cpp", event_type); return RMW_RET_UNSUPPORTED; } rmw_event->implementation_identifier = publisher->implementation_identifier; - rmw_event->data = pub_data; + rmw_event->data = &pub_data->events_mgr; rmw_event->event_type = event_type; + // Register the event with graph cache. + pub_data->context->impl->graph_cache->set_qos_event_callback( + pub_data->entity.value(), + rmw_event_it->second, + [pub_data, + event_id = rmw_event_it->second](std::unique_ptr zenoh_event) + { + if (pub_data == nullptr) { + return; + } + pub_data->events_mgr.add_new_event( + event_id, + std::move(zenoh_event)); + } + ); + return RMW_RET_OK; } @@ -70,6 +90,7 @@ rmw_subscription_event_init( RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_INVALID_ARGUMENT); rmw_subscription_data_t * sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); if (subscription->implementation_identifier != rmw_zenoh_identifier) { RMW_SET_ERROR_MSG( @@ -77,16 +98,38 @@ rmw_subscription_event_init( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION; } - if (event_map.count(event_type) != 1) { + auto rmw_event_it = event_map.find(event_type); + if (rmw_event_it == event_map.end()) { RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( "provided event_type %d is not supported by rmw_zenoh_cpp", event_type); return RMW_RET_UNSUPPORTED; } rmw_event->implementation_identifier = subscription->implementation_identifier; - rmw_event->data = sub_data; + rmw_event->data = &sub_data->events_mgr; rmw_event->event_type = event_type; + // Register the event with graph cache if the event is not ZENOH_EVENT_MESSAGE_LOST + // since this is checked for in the subscription callback. + if (rmw_event_it->second == ZENOH_EVENT_MESSAGE_LOST) { + return RMW_RET_OK; + } + + sub_data->context->impl->graph_cache->set_qos_event_callback( + sub_data->entity.value(), + rmw_event_it->second, + [sub_data, + event_id = rmw_event_it->second](std::unique_ptr zenoh_event) + { + if (sub_data == nullptr) { + return; + } + sub_data->events_mgr.add_new_event( + event_id, + std::move(zenoh_event)); + } + ); + return RMW_RET_OK; } @@ -110,7 +153,7 @@ rmw_event_set_callback( } // Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase. - EventsBase * event_data = static_cast(rmw_event->data); + EventsManager * event_data = static_cast(rmw_event->data); RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT); event_data->event_set_callback( zenoh_event_it->second, @@ -147,8 +190,7 @@ rmw_take_event( return RMW_RET_ERROR; } - // Both rmw_subscription_data_t and rmw_publisher_data_t inherit EventsBase. - EventsBase * event_data = static_cast(event_handle->data); + EventsManager * event_data = static_cast(event_handle->data); RMW_CHECK_ARGUMENT_FOR_NULL(event_data, RMW_RET_INVALID_ARGUMENT); std::unique_ptr st = event_data->pop_next_event( zenoh_event_it->second); @@ -169,6 +211,25 @@ rmw_take_event( *taken = true; return RMW_RET_OK; } + case ZENOH_EVENT_MESSAGE_LOST: { + auto ei = static_cast(event_info); + RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); + ei->total_count = st->total_count; + ei->total_count_change = st->total_count_change; + *taken = true; + return RMW_RET_OK; + } + case ZENOH_EVENT_PUBLICATION_MATCHED: + case ZENOH_EVENT_SUBSCRIPTION_MATCHED: { + auto ei = static_cast(event_info); + RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); + ei->total_count = st->total_count; + ei->total_count_change = st->total_count_change; + ei->current_count = st->current_count; + ei->current_count_change = st->current_count_change; + *taken = true; + return RMW_RET_OK; + } case ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE: { auto ei = static_cast(event_info); RMW_CHECK_ARGUMENT_FOR_NULL(ei, RMW_RET_INVALID_ARGUMENT); diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index abcb6058..bac35e45 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -43,7 +43,7 @@ rmw_get_subscriber_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_entity_names_and_types_by_node( + return node->context->impl->graph_cache->get_entity_names_and_types_by_node( liveliness::EntityType::Subscription, allocator, node_name, @@ -71,7 +71,7 @@ rmw_get_publisher_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_entity_names_and_types_by_node( + return node->context->impl->graph_cache->get_entity_names_and_types_by_node( liveliness::EntityType::Publisher, allocator, node_name, @@ -98,7 +98,7 @@ rmw_get_service_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_entity_names_and_types_by_node( + return node->context->impl->graph_cache->get_entity_names_and_types_by_node( liveliness::EntityType::Service, allocator, node_name, @@ -125,7 +125,7 @@ rmw_get_client_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_entity_names_and_types_by_node( + return node->context->impl->graph_cache->get_entity_names_and_types_by_node( liveliness::EntityType::Client, allocator, node_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 34f747f5..acda0b19 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -36,7 +36,7 @@ rmw_get_service_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_service_names_and_types( + return node->context->impl->graph_cache->get_service_names_and_types( allocator, service_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index d6f3b690..9dcc0d65 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -43,7 +43,7 @@ rmw_get_publishers_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_entities_info_by_topic( + return node->context->impl->graph_cache->get_entities_info_by_topic( liveliness::EntityType::Publisher, allocator, topic_name, @@ -69,7 +69,7 @@ rmw_get_subscriptions_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_entities_info_by_topic( + return node->context->impl->graph_cache->get_entities_info_by_topic( liveliness::EntityType::Subscription, allocator, topic_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index 7e2f3d24..673f56d5 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -37,7 +37,7 @@ rmw_get_topic_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_topic_names_and_types( + return node->context->impl->graph_cache->get_topic_names_and_types( allocator, no_demangle, topic_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 5e0310a4..90e0bf1c 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -66,10 +66,10 @@ static void graph_sub_data_handler( switch (sample->kind) { case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - context_impl->graph_cache.parse_put(keystr._cstr); + context_impl->graph_cache->parse_put(keystr._cstr); break; case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - context_impl->graph_cache.parse_del(keystr._cstr); + context_impl->graph_cache->parse_del(keystr._cstr); break; default: break; @@ -171,6 +171,10 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) z_close(z_move(context->impl->session)); }); + /// Initialize the graph cache. + z_id_t zid = z_info_zid(z_loan(context->impl->session)); + context->impl->graph_cache = std::make_unique(zid); + // Verify if the zenoh router is running. if ((ret = zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) { RMW_SET_ERROR_MSG("Error while checking for Zenoh router"); @@ -181,13 +185,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) if (shm_enabled._cstr != nullptr && strcmp(shm_enabled._cstr, "true") == 0) { - z_id_t id = z_info_zid(z_loan(context->impl->session)); - char idstr[sizeof(id.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 + char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 - for (size_t i = 0; i < sizeof(id.id); ++i) { - snprintf(idstr + 2 * i, max_size_of_each, "%02x", id.id[i]); + for (size_t i = 0; i < sizeof(zid.id); ++i) { + snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); } - idstr[sizeof(id.id) * 2] = '\0'; + idstr[sizeof(zid.id) * 2] = '\0'; // TODO(yadunund): Can we get the size of the shm from the config even though it's not // a standard parameter? context->impl->shm_manager = @@ -279,7 +282,9 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) if (z_reply_is_ok(&reply)) { z_sample_t sample = z_reply_ok(&reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - context->impl->graph_cache.parse_put(z_loan(keystr)); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + context->impl->graph_cache->parse_put(z_loan(keystr), true); z_drop(z_move(keystr)); } else { printf("[discovery] Received an error\n"); diff --git a/rmw_zenoh_cpp/src/rmw_qos.cpp b/rmw_zenoh_cpp/src/rmw_qos.cpp index 0dcf6a4d..dd566727 100644 --- a/rmw_zenoh_cpp/src/rmw_qos.cpp +++ b/rmw_zenoh_cpp/src/rmw_qos.cpp @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + +#include "rcutils/snprintf.h" + #include "rmw/error_handling.h" #include "rmw/types.h" #include "rmw/qos_profiles.h" @@ -20,6 +24,30 @@ extern "C" { +// Copied from rmw_dds_common::qos.cpp. +// Returns RMW_RET_OK if successful or no buffer was provided +// Returns RMW_RET_ERROR if there as an error copying the message to the buffer +static rmw_ret_t +_append_to_buffer(char * buffer, size_t buffer_size, const char * format, ...) +{ + // Only write if a buffer is provided + if (!buffer || buffer_size == 0u) { + return RMW_RET_OK; + } + // Determine available space left in buffer + size_t offset = strnlen(buffer, buffer_size); + size_t write_size = buffer_size - offset; + std::va_list args; + va_start(args, format); + int snprintf_ret = rcutils_vsnprintf(buffer + offset, write_size, format, args); + va_end(args); + if (snprintf_ret < 0) { + RMW_SET_ERROR_MSG("failed to append to character buffer"); + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + rmw_ret_t rmw_qos_profile_check_compatible( const rmw_qos_profile_t publisher_profile, @@ -28,13 +56,35 @@ rmw_qos_profile_check_compatible( char * reason, size_t reason_size) { - // In Zenoh, publishers do not have any reliability settings. - // A publisher and subscription are only incompatible if the durability of the publisher is - // TRANSIENT_LOCAL but that of the subscription is not. In such a scenario, a late-joining - // subscription can fail to receive messages so we flag it accordingly. - // However, we can reuse the qos_profile_check_compatible() method from rmw_dds_common - // since it largely applies in rmw_zenoh. - return rmw_dds_common::qos_profile_check_compatible( - publisher_profile, subscription_profile, compatibility, reason, reason_size); + if (!compatibility) { + RMW_SET_ERROR_MSG("compatibility parameter is null"); + return RMW_RET_INVALID_ARGUMENT; + } + if (!reason && reason_size != 0u) { + RMW_SET_ERROR_MSG("reason parameter is null, but reason_size parameter is not zero"); + return RMW_RET_INVALID_ARGUMENT; + } + // Initialize reason buffer + if (reason && reason_size != 0u) { + reason[0] = '\0'; + } + // In Zenoh, there are not qos incompatibilities. + // Further publishers do not have any reliability settings. + // The once scenario where transport may not occur is where a publisher with + // TRANSIENT_LOCAL durability publishes a message before a subscription with + // VOLATILE durability spins up. However, any subsequent messages published + // will be received by the subscription. + if (publisher_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL && + subscription_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE) + { + *compatibility = RMW_QOS_COMPATIBILITY_WARNING; + return _append_to_buffer( + reason, + reason_size, + "WARNING: Publisher's durability is TRANSIENT_LOCAL, but subscription's is VOLATILE;"); + } else { + *compatibility = RMW_QOS_COMPATIBILITY_OK; + } + return RMW_RET_OK; } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 68b5ba37..f67df6d5 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -286,8 +286,11 @@ rmw_create_node( // Initialize liveliness token for the node to advertise that a new node is in town. rmw_node_data_t * node_data = static_cast(node->data); + node_data->id = context->impl->get_next_entity_id(); const auto liveliness_entity = liveliness::Entity::make( z_info_zid(z_loan(context->impl->session)), + std::to_string(node_data->id), + std::to_string(node_data->id), liveliness::EntityType::Node, liveliness::NodeInfo{context->actual_domain_id, namespace_, name, ""}); if (!liveliness_entity.has_value()) { @@ -338,7 +341,7 @@ rmw_destroy_node(rmw_node_t * node) // Undeclare liveliness token for the node to advertise that the node has ridden // off into the sunset. rmw_node_data_t * node_data = static_cast(node->data); - z_drop(z_move(node_data->token)); + zc_liveliness_undeclare_token(z_move(node_data->token)); rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -437,6 +440,13 @@ rmw_create_publisher( "Strict requirement on unique network flow endpoints for publishers not supported"); return nullptr; } + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); + const rmw_node_data_t * node_data = static_cast(node->data); + if (node_data == nullptr) { + RMW_SET_ERROR_MSG( + "Unable to create publisher as node_data is invalid."); + return nullptr; + } // Get the RMW type support. const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); @@ -601,14 +611,16 @@ rmw_create_publisher( z_undeclare_publisher(z_move(publisher_data->pub)); }); - const auto liveliness_entity = liveliness::Entity::make( + publisher_data->entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), + std::to_string(node_data->id), + std::to_string(context_impl->get_next_entity_id()), liveliness::EntityType::Publisher, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_publisher->topic_name, publisher_data->type_support->get_name(), publisher_data->adapted_qos_profile} ); - if (!liveliness_entity.has_value()) { + if (!publisher_data->entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to generate keyexpr for liveliness token for the publisher."); @@ -616,7 +628,7 @@ rmw_create_publisher( } publisher_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(liveliness_entity->keyexpr().c_str()), + z_keyexpr(publisher_data->entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -631,6 +643,7 @@ rmw_create_publisher( "Unable to create liveliness token for the publisher."); return nullptr; } + printf("[rmw_create_publisher] Created pub %s\n", publisher_data->entity->keyexpr().c_str()); free_token.cancel(); undeclare_z_publisher_cache.cancel(); @@ -670,7 +683,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) auto publisher_data = static_cast(publisher->data); if (publisher_data != nullptr) { - z_drop(z_move(publisher_data->token)); + zc_liveliness_undeclare_token(z_move(publisher_data->token)); if (publisher_data->pub_cache.has_value()) { z_drop(z_move(publisher_data->pub_cache.value())); } @@ -913,7 +926,7 @@ rmw_publisher_count_matched_subscriptions( rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache.publisher_count_matched_subscriptions( + return context_impl->graph_cache->publisher_count_matched_subscriptions( publisher, subscription_count); } @@ -1145,6 +1158,7 @@ rmw_create_subscription( return nullptr; } + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); auto node_data = static_cast(node->data); RMW_CHECK_FOR_NULL_WITH_MSG( node_data, "unable to create subscription as node_data is invalid.", @@ -1247,7 +1261,6 @@ rmw_create_subscription( sub_data->context = node->context; rmw_subscription->implementation_identifier = rmw_zenoh_identifier; - rmw_subscription->data = sub_data; rmw_subscription->topic_name = rcutils_strdup(topic_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( @@ -1344,14 +1357,16 @@ rmw_create_subscription( }); // Publish to the graph that a new subscription is in town - const auto liveliness_entity = liveliness::Entity::make( + sub_data->entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), + std::to_string(node_data->id), + std::to_string(context_impl->get_next_entity_id()), liveliness::EntityType::Subscription, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_subscription->topic_name, sub_data->type_support->get_name(), sub_data->adapted_qos_profile} ); - if (!liveliness_entity.has_value()) { + if (!sub_data->entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to generate keyexpr for liveliness token for the subscription."); @@ -1359,7 +1374,7 @@ rmw_create_subscription( } sub_data->token = zc_liveliness_declare_token( z_loan(context_impl->session), - z_keyexpr(liveliness_entity->keyexpr().c_str()), + z_keyexpr(sub_data->entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -1375,6 +1390,8 @@ rmw_create_subscription( return nullptr; } + rmw_subscription->data = sub_data; + free_token.cancel(); undeclare_z_sub.cancel(); free_topic_name.cancel(); @@ -1412,7 +1429,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) auto sub_data = static_cast(subscription->data); if (sub_data != nullptr) { // Publish to the graph that a subscription has ridden off into the sunset - z_drop(z_move(sub_data->token)); + zc_liveliness_undeclare_token(z_move(sub_data->token)); RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); allocator->deallocate(sub_data->type_support, allocator->state); @@ -1463,7 +1480,7 @@ rmw_subscription_count_matched_publishers( rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache.subscription_count_matched_publishers( + return context_impl->graph_cache->subscription_count_matched_publishers( subscription, publisher_count); } @@ -1829,6 +1846,13 @@ rmw_create_client( RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); + const rmw_node_data_t * node_data = static_cast(node->data); + if (node_data == nullptr) { + RMW_SET_ERROR_MSG( + "Unable to create client as node data is invalid."); + return nullptr; + } rcutils_allocator_t * allocator = &node->context->options.allocator; @@ -1986,14 +2010,16 @@ rmw_create_client( service_type.c_str(), rmw_client->service_name); return nullptr; } - const auto liveliness_entity = liveliness::Entity::make( + client_data->entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), + std::to_string(node_data->id), + std::to_string(context_impl->get_next_entity_id()), liveliness::EntityType::Client, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_client->service_name, std::move(service_type), client_data->adapted_qos_profile} ); - if (!liveliness_entity.has_value()) { + if (!client_data->entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to generate keyexpr for liveliness token for the client."); @@ -2001,7 +2027,7 @@ rmw_create_client( } client_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(liveliness_entity->keyexpr().c_str()), + z_keyexpr(client_data->entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -2063,7 +2089,7 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) // CLEANUP =================================================================== z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); - z_drop(z_move(client_data->token)); + zc_liveliness_undeclare_token(z_move(client_data->token)); RMW_TRY_DESTRUCTOR( client_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); @@ -2463,7 +2489,13 @@ rmw_create_service( return nullptr; } } - + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); + const rmw_node_data_t * node_data = static_cast(node->data); + if (node_data == nullptr) { + RMW_SET_ERROR_MSG( + "Unable to create service as node data is invalid."); + return nullptr; + } RMW_CHECK_FOR_NULL_WITH_MSG( node->context, "expected initialized context", @@ -2643,14 +2675,16 @@ rmw_create_service( service_type.c_str(), rmw_service->service_name); return nullptr; } - const auto liveliness_entity = liveliness::Entity::make( + service_data->entity = liveliness::Entity::make( z_info_zid(z_loan(node->context->impl->session)), + std::to_string(node_data->id), + std::to_string(context_impl->get_next_entity_id()), liveliness::EntityType::Service, liveliness::NodeInfo{node->context->actual_domain_id, node->namespace_, node->name, ""}, liveliness::TopicInfo{rmw_service->service_name, std::move(service_type), service_data->adapted_qos_profile} ); - if (!liveliness_entity.has_value()) { + if (!service_data->entity.has_value()) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to generate keyexpr for liveliness token for the service."); @@ -2658,7 +2692,7 @@ rmw_create_service( } service_data->token = zc_liveliness_declare_token( z_loan(node->context->impl->session), - z_keyexpr(liveliness_entity->keyexpr().c_str()), + z_keyexpr(service_data->entity->keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( @@ -2722,7 +2756,7 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) // CLEANUP ================================================================ z_drop(z_move(service_data->keyexpr)); z_undeclare_queryable(z_move(service_data->qable)); - z_drop(z_move(service_data->token)); + zc_liveliness_undeclare_token(z_move(service_data->token)); RMW_TRY_DESTRUCTOR( service_data->request_type_support->~RequestTypeSupport(), RequestTypeSupport, ); @@ -3157,15 +3191,15 @@ static bool has_triggered_condition( // Check if the event queue for this event type is empty. auto zenoh_event_it = event_map.find(event_type); if (zenoh_event_it != event_map.end()) { - auto event_data = static_cast(event->data); + auto event_data = static_cast(event->data); if (event_data != nullptr) { if (!event_data->event_queue_is_empty(zenoh_event_it->second)) { - printf("EVENTS QUEUE IS NOT EMPTY!!\n"); return true; } } } else { - printf("ERROR!!!!!!!!!!!!!!\n"); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "has_triggered_condition() called with unknown event %u. Report this bug.", event_type); } } } @@ -3301,7 +3335,7 @@ rmw_wait( if (events) { for (size_t i = 0; i < events->event_count; ++i) { auto event = static_cast(events->events[i]); - auto event_data = static_cast(event->data); + auto event_data = static_cast(event->data); if (event_data != nullptr) { auto zenoh_event_it = event_map.find(event->event_type); if (zenoh_event_it != event_map.end()) { @@ -3348,7 +3382,7 @@ rmw_wait( // Now detach the condition variable and mutex from each of the subscriptions for (size_t i = 0; i < events->event_count; ++i) { auto event = static_cast(events->events[i]); - auto event_data = static_cast(event->data); + auto event_data = static_cast(event->data); if (event_data != nullptr) { auto zenoh_event_it = event_map.find(event->event_type); if (zenoh_event_it != event_map.end()) { @@ -3432,7 +3466,7 @@ rmw_get_node_names( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_node_names( + return node->context->impl->graph_cache->get_node_names( node_names, node_namespaces, nullptr, allocator); } @@ -3455,7 +3489,7 @@ rmw_get_node_names_with_enclaves( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.get_node_names( + return node->context->impl->graph_cache->get_node_names( node_names, node_namespaces, enclaves, allocator); } @@ -3486,7 +3520,7 @@ rmw_count_publishers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_publishers(topic_name, count); + return node->context->impl->graph_cache->count_publishers(topic_name, count); } //============================================================================== @@ -3516,7 +3550,7 @@ rmw_count_subscribers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_subscriptions(topic_name, count); + return node->context->impl->graph_cache->count_subscriptions(topic_name, count); } //============================================================================== @@ -3546,7 +3580,7 @@ rmw_count_clients( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_clients(service_name, count); + return node->context->impl->graph_cache->count_clients(service_name, count); } //============================================================================== @@ -3576,7 +3610,7 @@ rmw_count_services( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache.count_services(service_name, count); + return node->context->impl->graph_cache->count_services(service_name, count); } //============================================================================== @@ -3661,7 +3695,7 @@ rmw_service_server_is_available( return RMW_RET_INVALID_ARGUMENT; } - return node->context->impl->graph_cache.service_server_is_available( + return node->context->impl->graph_cache->service_server_is_available( client->service_name, service_type.c_str(), is_available); } @@ -3686,7 +3720,7 @@ rmw_subscription_set_on_new_message_callback( rmw_subscription_data_t * sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - sub_data->set_user_callback( + sub_data->data_callback_mgr.set_callback( user_data, callback); return RMW_RET_OK; } @@ -3703,7 +3737,7 @@ rmw_service_set_on_new_request_callback( rmw_service_data_t * service_data = static_cast(service->data); RMW_CHECK_ARGUMENT_FOR_NULL(service_data, RMW_RET_INVALID_ARGUMENT); - service_data->set_user_callback( + service_data->data_callback_mgr.set_callback( user_data, callback); return RMW_RET_OK; } @@ -3720,7 +3754,7 @@ rmw_client_set_on_new_response_callback( rmw_client_data_t * client_data = static_cast(client->data); RMW_CHECK_ARGUMENT_FOR_NULL(client_data, RMW_RET_INVALID_ARGUMENT); - client_data->set_user_callback( + client_data->data_callback_mgr.set_callback( user_data, callback); return RMW_RET_OK; }