diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index c2b94ffb..5f03f5da 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -383,6 +383,36 @@ std::unique_ptr rmw_client_data_t::pop_next_reply() return latest_reply; } +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +void rmw_client_data_t::increment_in_flight_callbacks() +{ + std::lock_guard lock(in_flight_mutex_); + num_in_flight_++; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +bool rmw_client_data_t::shutdown_and_query_in_flight() +{ + std::lock_guard lock(in_flight_mutex_); + is_shutdown_ = true; + + return num_in_flight_ > 0; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure +// for the use of this method. +bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +{ + std::lock_guard lock(in_flight_mutex_); + queries_in_flight = --num_in_flight_ > 0; + return is_shutdown_; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -519,6 +549,20 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + client_data->context->options.allocator.deallocate( + client_data, client_data->context->options.allocator.state); + } + return; + } + if (!z_reply_check(reply)) { RCUTILS_LOG_ERROR_NAMED( "rmw_zenoh_cpp", diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index fc719953..378ce0b7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -328,6 +328,15 @@ class rmw_client_data_t final DataCallbackManager data_callback_mgr; + // See the comment for "num_in_flight" below on the use of this method. + void increment_in_flight_callbacks(); + + // See the comment for "num_in_flight" below on the use of this method. + bool shutdown_and_query_in_flight(); + + // See the comment for "num_in_flight" below on the use of this method. + bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + private: void notify(); @@ -339,6 +348,28 @@ class rmw_client_data_t final std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; + + // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no + // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an + // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the + // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. + // + // The next 3 variables are used to avoid that situation. Any time a query is initiated via + // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the + // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ + // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. + // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. + // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query + // callback, the query callback will free up the structure. + // + // There is one case which is not handled by this, which has to do with timeouts. The query + // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never + // returns, the memory in this structure will never be freed. There isn't much we can do about + // that at this time, but we may want to consider changing the timeout so that the memory can + // eventually be freed up. + std::mutex in_flight_mutex_; + bool is_shutdown_{false}; + size_t num_in_flight_{0}; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 6ee087d2..d800afab 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2363,7 +2363,11 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) allocator->deallocate(client_data->response_type_support, allocator->state); RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (!client_data->shutdown_and_query_in_flight()) { + allocator->deallocate(client->data, allocator->state); + } allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -2471,6 +2475,10 @@ rmw_send_request( z_move(zn_closure_reply), &opts); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + client_data->increment_in_flight_callbacks(); + return RMW_RET_OK; }