Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support selective QoS based events #118

Merged
merged 12 commits into from
Feb 26, 2024
24 changes: 12 additions & 12 deletions rmw_zenoh_cpp/src/detail/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@


///=============================================================================
void EventsBase::set_user_callback(
void DataCallbackManager::set_callback(
const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
std::lock_guard<std::mutex> lock_mutex(event_mutex_);

if (callback) {
// Push events arrived before setting the the executor callback.
Expand All @@ -42,10 +42,10 @@ void EventsBase::set_user_callback(
}

///=============================================================================
void EventsBase::trigger_user_callback()
void DataCallbackManager::trigger_callback()
{
// Trigger the user provided event callback if available.
std::lock_guard<std::recursive_mutex> lock_mutex(event_mutex_);
std::lock_guard<std::mutex> lock_mutex(event_mutex_);
if (callback_ != nullptr) {
callback_(user_data_, 1);
} else {
Expand All @@ -54,7 +54,7 @@ void EventsBase::trigger_user_callback()
}

///=============================================================================
void EventsBase::event_set_callback(
void EventsManager::event_set_callback(
rmw_zenoh_event_type_t event_id,
rmw_event_callback_t callback,
const void * user_data)
Expand Down Expand Up @@ -82,7 +82,7 @@ void EventsBase::event_set_callback(
}

///=============================================================================
void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
void EventsManager::trigger_event_callback(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -103,7 +103,7 @@ void EventsBase::trigger_event_callback(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
bool EventsManager::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -119,7 +119,7 @@ bool EventsBase::event_queue_is_empty(rmw_zenoh_event_type_t event_id) const
}

///=============================================================================
std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
std::unique_ptr<rmw_zenoh_event_status_t> EventsManager::pop_next_event(
rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
Expand All @@ -145,7 +145,7 @@ std::unique_ptr<rmw_zenoh_event_status_t> EventsBase::pop_next_event(
}

///=============================================================================
void EventsBase::add_new_event(
void EventsManager::add_new_event(
rmw_zenoh_event_type_t event_id,
std::unique_ptr<rmw_zenoh_event_status_t> event)
{
Expand Down Expand Up @@ -180,7 +180,7 @@ void EventsBase::add_new_event(
}

///=============================================================================
void EventsBase::attach_event_condition(
void EventsManager::attach_event_condition(
rmw_zenoh_event_type_t event_id,
std::condition_variable * condition_variable)
{
Expand All @@ -197,7 +197,7 @@ void EventsBase::attach_event_condition(
}

///=============================================================================
void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
void EventsManager::detach_event_condition(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand All @@ -212,7 +212,7 @@ void EventsBase::detach_event_condition(rmw_zenoh_event_type_t event_id)
}

///=============================================================================
void EventsBase::notify_event(rmw_zenoh_event_type_t event_id)
void EventsManager::notify_event(rmw_zenoh_event_type_t event_id)
{
if (event_id > ZENOH_EVENT_ID_MAX) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
Expand Down
43 changes: 34 additions & 9 deletions rmw_zenoh_cpp/src/detail/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ enum rmw_zenoh_event_type_t
// subscription events
ZENOH_EVENT_REQUESTED_QOS_INCOMPATIBLE,
ZENOH_EVENT_MESSAGE_LOST,
// RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE,
ZENOH_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE,
ZENOH_EVENT_SUBSCRIPTION_MATCHED,

// publisher events
// RMW_EVENT_LIVELINESS_LOST,
// RMW_EVENT_OFFERED_DEADLINE_MISSED,
ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE,
// RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE,
ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE,
ZENOH_EVENT_PUBLICATION_MATCHED,
};

Expand All @@ -56,35 +56,60 @@ static const std::unordered_map<rmw_event_type_t, rmw_zenoh_event_type_t> event_
{RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, ZENOH_EVENT_OFFERED_QOS_INCOMPATIBLE},
{RMW_EVENT_MESSAGE_LOST, ZENOH_EVENT_MESSAGE_LOST},
{RMW_EVENT_SUBSCRIPTION_MATCHED, ZENOH_EVENT_SUBSCRIPTION_MATCHED},
{RMW_EVENT_PUBLICATION_MATCHED, ZENOH_EVENT_PUBLICATION_MATCHED}
{RMW_EVENT_PUBLICATION_MATCHED, ZENOH_EVENT_PUBLICATION_MATCHED},
{RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, ZENOH_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE},
{RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, ZENOH_EVENT_PUBLISHER_INCOMPATIBLE_TYPE}
// TODO(clalancette): Implement remaining events
};

///=============================================================================
/// A struct to store status changes which can be mapped to rmw event statuses.
/// The data field can be used to store serialized information for more complex statuses.
struct rmw_zenoh_event_status_t
{
size_t total_count;
size_t total_count_change;
size_t current_count;
size_t current_count_change;
// The data field can be used to store serialized information for more complex statuses.
std::string data;

rmw_zenoh_event_status_t()
: total_count(0),
total_count_change(0),
current_count(0),
current_count_change(0)
{}
};

///=============================================================================
/// Base class to be inherited by entities that support events.
class EventsBase
/// A class that manages callbacks that should be triggered when a new
/// message/request/response is received by an entity.
class DataCallbackManager
{
public:
/// @brief Set the user defined callback that should be called when
/// a new message/response/request is received.
/// @param user_data the data that should be passed to the callback.
/// @param callback the callback to be set.
void set_user_callback(const void * user_data, rmw_event_callback_t callback);
void set_callback(const void * user_data, rmw_event_callback_t callback);

/// Trigger the user callback.
void trigger_user_callback();
void trigger_callback();

private:
std::mutex event_mutex_;
/// User callback that can be set via set_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
/// number of trigger requests made before the callback was set.
size_t unread_count_ {0};
};

/// Base class to be inherited by entities that support events.
class EventsManager
{
public:
/// @brief Set the callback to be triggered when the relevant event is triggered.
/// @param event_id the id of the event
/// @param callback the callback to trigger for this event.
Expand Down Expand Up @@ -133,7 +158,7 @@ class EventsBase
mutable std::mutex event_condition_mutex_;
/// Condition variable to attach for event notifications.
std::condition_variable * event_conditions_[ZENOH_EVENT_ID_MAX + 1]{nullptr};
/// User callback that can be set via set_user_callback().
/// User callback that can be set via data_callback_mgr.set_callback().
rmw_event_callback_t callback_ {nullptr};
/// User data that should be passed to the user callback.
const void * user_data_ {nullptr};
Expand Down
Loading
Loading