Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shortcircuit events executor callback invocation #60

Open
wants to merge 1 commit into
base: foxy-events-executor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class EventsExecutorNotifyWaitable final : public EventWaitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const override
rmw_listener_cb_t executor_callback) override
{
for (auto gc : notify_guard_conditions_) {
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,14 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return size_ == capacity_;
}

void clear() {}
void clear()
{
std::lock_guard<std::mutex> lock(mutex_);
ring_buffer_ = std::vector<BufferT>(capacity_);
write_index_ = capacity_ - 1;
read_index_ = 0;
size_ = 0;
}

private:
size_t capacity_;
Expand Down
27 changes: 25 additions & 2 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,25 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
provide_intra_process_message(ConstMessageSharedPtr message)
{
buffer_->add_shared(std::move(message));
trigger_guard_condition();
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
if (executor_callback_) {
executor_callback_(executor_, {this, WAITABLE_EVENT});
} else {
trigger_guard_condition();
}
}

void
provide_intra_process_message(MessageUniquePtr message)
{
buffer_->add_unique(std::move(message));
trigger_guard_condition();

std::lock_guard<std::mutex> lock(executor_callback_mutex_);
if (executor_callback_) {
executor_callback_(executor_, {this, WAITABLE_EVENT});
} else {
trigger_guard_condition();
}
}

bool
Expand All @@ -134,6 +145,18 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) override
{
std::lock_guard<std::mutex> lock(executor_callback_mutex_);
executor_ = executor;
executor_callback_ = executor_callback;
// Buffer must be cleared under the executor callback lock to make sure that other threads wait for this
buffer_->clear();
}

private:
void
trigger_guard_condition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,16 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const override;
rmw_listener_cb_t executor_callback) override;

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;

const rclcpp::executors::EventsExecutor * executor_;
rmw_listener_cb_t executor_callback_ = nullptr;
std::mutex executor_callback_mutex_;

private:
virtual void
trigger_guard_condition() = 0;
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/qos_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class QOSEventHandlerBase : public Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const override;
rmw_listener_cb_t executor_callback) override;

protected:
rcl_event_t event_handle_;
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class Waitable
void
set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const;
rmw_listener_cb_t executor_callback);

private:
std::atomic<bool> in_use_by_wait_set_{false};
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/qos_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
void
QOSEventHandlerBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const
rmw_listener_cb_t executor_callback)
{
rcl_ret_t ret = rcl_event_set_listener_callback(
&event_handle_,
Expand Down
18 changes: 5 additions & 13 deletions rclcpp/src/rclcpp/subscription_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,12 @@ SubscriptionIntraProcessBase::get_actual_qos() const
return qos_profile_;
}


void
SubscriptionIntraProcessBase::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const
rmw_listener_cb_t executor_callback)
{
rcl_ret_t ret = rcl_guard_condition_set_listener_callback(
&gc_,
executor_callback,
executor,
this,
true /*Use previous events*/);

if (RCL_RET_OK != ret) {
throw std::runtime_error("Couldn't set guard condition callback");
}
}
(void)executor;
(void)executor_callback;
assert(0);
}
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state)
void
Waitable::set_events_executor_callback(
const rclcpp::executors::EventsExecutor * executor,
rmw_listener_cb_t executor_callback) const
rmw_listener_cb_t executor_callback)
{
(void)executor;
(void)executor_callback;
Expand Down