diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 2166c72d..4908d667 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -28,6 +28,7 @@ #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "rmw/impl/cpp/macros.hpp" #include "attachment_helpers.hpp" #include "rmw_data_types.hpp" @@ -387,6 +388,42 @@ std::unique_ptr rmw_client_data_t::pop_next_reply() return latest_reply; } +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +void rmw_client_data_t::increment_in_flight_callbacks() +{ + std::lock_guard lock(in_flight_mutex_); + num_in_flight_++; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class +// for the use of this method. +bool rmw_client_data_t::shutdown_and_query_in_flight() +{ + std::lock_guard lock(in_flight_mutex_); + is_shutdown_ = true; + + return num_in_flight_ > 0; +} + +//============================================================================== +// See the comment about the "num_in_flight" class variable in the rmw_client_data_t structure +// for the use of this method. +bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight) +{ + std::lock_guard lock(in_flight_mutex_); + queries_in_flight = --num_in_flight_ > 0; + return is_shutdown_; +} + +bool rmw_client_data_t::is_shutdown() const +{ + std::lock_guard lock(in_flight_mutex_); + return is_shutdown_; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -525,6 +562,13 @@ void client_data_handler(z_owned_reply_t * reply, void * data) ); return; } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (client_data->is_shutdown()) { + return; + } + if (!z_reply_check(reply)) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -550,4 +594,30 @@ void client_data_handler(z_owned_reply_t * reply, void * data) // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } + +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + client_data->context->options.allocator.deallocate( + client_data, client_data->context->options.allocator.state); + } + } +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index f10e84f3..60e4bd01 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -204,6 +204,7 @@ void service_data_handler(const z_query_t * query, void * service_data); ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * client_data); +void client_data_drop(void * data); ///============================================================================= class ZenohQuery final @@ -297,7 +298,6 @@ class rmw_client_data_t final std::shared_ptr entity; z_owned_keyexpr_t keyexpr; - z_owned_closure_reply_t zn_closure_reply; // Store the actual QoS profile used to configure this client. // The QoS is reused for sending requests and getting responses. @@ -329,6 +329,17 @@ class rmw_client_data_t final DataCallbackManager data_callback_mgr; + // See the comment for "num_in_flight" below on the use of this method. + void increment_in_flight_callbacks(); + + // See the comment for "num_in_flight" below on the use of this method. + bool shutdown_and_query_in_flight(); + + // See the comment for "num_in_flight" below on the use of this method. + bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + + bool is_shutdown() const; + private: void notify(); @@ -340,6 +351,28 @@ class rmw_client_data_t final std::deque> reply_queue_; mutable std::mutex reply_queue_mutex_; + + // rmw_zenoh uses Zenoh queries to implement clients. It turns out that in Zenoh, there is no + // way to cancel a query once it is in-flight via the z_get() zenoh-c API. Thus, if an + // rmw_zenoh_cpp user does rmw_create_client(), rmw_send_request(), rmw_destroy_client(), but the + // query comes in after the rmw_destroy_client(), rmw_zenoh_cpp could access already-freed memory. + // + // The next 3 variables are used to avoid that situation. Any time a query is initiated via + // rmw_send_request(), num_in_flight_ is incremented. When the Zenoh calls the callback for the + // query reply, num_in_flight_ is decremented. When rmw_destroy_client() is called, is_shutdown_ + // is set to true. If num_in_flight_ is 0, the data associated with this structure is freed. + // If num_in_flight_ is *not* 0, then the data associated with this structure is maintained. + // In the situation where is_shutdown_ is true, and num_in_flight_ drops to 0 in the query + // callback, the query callback will free up the structure. + // + // There is one case which is not handled by this, which has to do with timeouts. The query + // timeout is currently set to essentially infinite. Thus, if a query is in-flight but never + // returns, the memory in this structure will never be freed. There isn't much we can do about + // that at this time, but we may want to consider changing the timeout so that the memory can + // eventually be freed up. + mutable std::mutex in_flight_mutex_; + bool is_shutdown_{false}; + size_t num_in_flight_{0}; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 3e4660f1..3dec87f0 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2378,7 +2378,6 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return RMW_RET_INVALID_ARGUMENT); // CLEANUP =================================================================== - z_drop(z_move(client_data->zn_closure_reply)); z_drop(z_move(client_data->keyexpr)); zc_liveliness_undeclare_token(z_move(client_data->token)); @@ -2390,9 +2389,13 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, ); allocator->deallocate(client_data->response_type_support, allocator->state); - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); - allocator->deallocate(client->data, allocator->state); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + if (!client_data->shutdown_and_query_in_flight()) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + allocator->deallocate(client->data, allocator->state); + } allocator->deallocate(const_cast(client->service_name), allocator->state); allocator->deallocate(client, allocator->state); @@ -2425,6 +2428,10 @@ rmw_send_request( "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + if (client_data->is_shutdown()) { + return RMW_RET_ERROR; + } + rmw_context_impl_s * context_impl = static_cast( client_data->context->impl); @@ -2479,6 +2486,10 @@ rmw_send_request( z_bytes_map_drop(z_move(map)); }); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + client_data->increment_in_flight_callbacks(); + opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; @@ -2492,11 +2503,13 @@ rmw_send_request( // and any number. opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; - client_data->zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, nullptr, client_data); + z_owned_closure_reply_t zn_closure_reply = + z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); z_get( - z_loan(context_impl->session), z_loan( - client_data->keyexpr), "", &client_data->zn_closure_reply, &opts); + z_loan(context_impl->session), + z_loan(client_data->keyexpr), "", + z_move(zn_closure_reply), + &opts); return RMW_RET_OK; }