Skip to content

Commit

Permalink
shortcircuit events executor callback invocation in rclcpp for ipc su…
Browse files Browse the repository at this point in the history
…bscriptions
  • Loading branch information
Alberto Soragna committed Mar 30, 2021
1 parent 53aa9fd commit 431a3a9
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class EventsExecutorNotifyWaitable final : public EventWaitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const override
rmw_listener_cb_t executor_callback) override
{
for (auto gc : notify_guard_conditions_) {
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,14 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return size_ == capacity_;
}

void clear() {}
void clear()
{
std::lock_guard<std::mutex> lock(mutex_);
ring_buffer_ = std::vector<BufferT>(capacity_);
write_index_ = capacity_ - 1;
read_index_ = 0;
size_ = 0;
}

private:
size_t capacity_;
Expand Down
27 changes: 25 additions & 2 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,25 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
provide_intra_process_message(ConstMessageSharedPtr message)
{
buffer_->add_shared(std::move(message));
trigger_guard_condition();
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
if (executor_callback_) {
executor_callback_(executor_, {this, WAITABLE_EVENT});
} else {
trigger_guard_condition();
}
}

void
provide_intra_process_message(MessageUniquePtr message)
{
buffer_->add_unique(std::move(message));
trigger_guard_condition();

std::lock_guard<std::mutex> lock(executor_callback_mutex_);
if (executor_callback_) {
executor_callback_(executor_, {this, WAITABLE_EVENT});
} else {
trigger_guard_condition();
}
}

bool
Expand All @@ -134,6 +145,18 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) override
{
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
executor_ = executor;
executor_callback_ = executor_callback;
// Buffer must be cleared under the executor callback lock to make sure that other threads wait for this
buffer_->clear();
}

private:
void
trigger_guard_condition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,16 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const override;
rmw_listener_cb_t executor_callback) override;

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;

const rclcpp::executors::EventsExecutor * executor_;
rmw_listener_cb_t executor_callback_ = nullptr;
std::mutex executor_callback_mutex_;

private:
virtual void
trigger_guard_condition() = 0;
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/qos_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class QOSEventHandlerBase : public Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const override;
rmw_listener_cb_t executor_callback) override;

protected:
rcl_event_t event_handle_;
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const;
rmw_listener_cb_t executor_callback);

private:
std::atomic<bool> in_use_by_wait_set_{false};
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/qos_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
void
QOSEventHandlerBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const
rmw_listener_cb_t executor_callback)
{
rcl_ret_t ret = rcl_event_set_listener_callback(
&event_handle_,
Expand Down
18 changes: 5 additions & 13 deletions rclcpp/src/rclcpp/subscription_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,12 @@ SubscriptionIntraProcessBase::get_actual_qos() const
return qos_profile_;
}


void
SubscriptionIntraProcessBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const
rmw_listener_cb_t executor_callback)
{
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
&gc_,
executor_callback,
executor,
this,
true /*Use previous events*/);

if (RCL_RET_OK != ret) {
throw std::runtime_error("Couldn't set guard condition callback");
}
}
(void)executor;
(void)executor_callback;
assert(0);
}
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state)
void
Waitable::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const
rmw_listener_cb_t executor_callback)
{
(void)executor;
(void)executor_callback;
Expand Down

0 comments on commit 431a3a9

Please sign in to comment.