Skip to content

Commit

Permalink
Implement generic client (#2358)
Browse files Browse the repository at this point in the history
* Implement generic client

Signed-off-by: Barry Xu <[email protected]>

* Fix the incorrect parameter declaration

Signed-off-by: Barry Xu <[email protected]>

* Deleted copy constructor and assignment for FutureAndRequestId

Signed-off-by: Barry Xu <[email protected]>

* Update codes after rebase

Signed-off-by: Barry Xu <[email protected]>

* Address review comments

Signed-off-by: Barry Xu <[email protected]>

* Address review comments from iuhilnehc-ynos

Signed-off-by: Barry Xu <[email protected]>

* Correct an error in a description

Signed-off-by: Barry Xu <[email protected]>

* Fix window build errors

Signed-off-by: Barry Xu <[email protected]>

* Address review comments from William

Signed-off-by: Barry Xu <[email protected]>

* Add doc strings to create_generic_client

Signed-off-by: Barry Xu <[email protected]>

---------

Signed-off-by: Barry Xu <[email protected]>
  • Loading branch information
Barry-Xu-2018 authored Mar 5, 2024
1 parent b007204 commit 3df73f0
Show file tree
Hide file tree
Showing 12 changed files with 1,413 additions and 398 deletions.
2 changes: 2 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/create_generic_client.cpp
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
src/rclcpp/detail/resolve_intra_process_buffer_type.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
Expand Down Expand Up @@ -74,6 +75,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
src/rclcpp/experimental/timers_manager.cpp
src/rclcpp/future_return_code.cpp
src/rclcpp/generic_client.cpp
src/rclcpp/generic_publisher.cpp
src/rclcpp/generic_subscription.cpp
src/rclcpp/graph_listener.cpp
Expand Down
41 changes: 28 additions & 13 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,29 @@ struct FutureAndRequestId
/// Destructor.
~FutureAndRequestId() = default;
};

template<typename PendingRequestsT, typename AllocatorT = std::allocator<int64_t>>
size_t
prune_requests_older_than_impl(
PendingRequestsT & pending_requests,
std::mutex & pending_requests_mutex,
std::chrono::time_point<std::chrono::system_clock> time_point,
std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
{
std::lock_guard guard(pending_requests_mutex);
auto old_size = pending_requests.size();
for (auto it = pending_requests.begin(), last = pending_requests.end(); it != last; ) {
if (it->second.first < time_point) {
if (pruned_requests) {
pruned_requests->push_back(it->first);
}
it = pending_requests.erase(it);
} else {
++it;
}
}
return old_size - pending_requests.size();
}
} // namespace detail

namespace node_interfaces
Expand Down Expand Up @@ -771,19 +794,11 @@ class Client : public ClientBase
std::chrono::time_point<std::chrono::system_clock> time_point,
std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
{
std::lock_guard guard(pending_requests_mutex_);
auto old_size = pending_requests_.size();
for (auto it = pending_requests_.begin(), last = pending_requests_.end(); it != last; ) {
if (it->second.first < time_point) {
if (pruned_requests) {
pruned_requests->push_back(it->first);
}
it = pending_requests_.erase(it);
} else {
++it;
}
}
return old_size - pending_requests_.size();
return detail::prune_requests_older_than_impl(
pending_requests_,
pending_requests_mutex_,
time_point,
pruned_requests);
}

/// Configure client introspection.
Expand Down
90 changes: 90 additions & 0 deletions rclcpp/include/rclcpp/create_generic_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 Sony Group Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__CREATE_GENERIC_CLIENT_HPP_
#define RCLCPP__CREATE_GENERIC_CLIENT_HPP_

#include <memory>
#include <string>

#include "rclcpp/generic_client.hpp"
#include "rclcpp/node_interfaces/get_node_base_interface.hpp"
#include "rclcpp/node_interfaces/get_node_graph_interface.hpp"
#include "rclcpp/node_interfaces/get_node_services_interface.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/node_interfaces/node_services_interface.hpp"
#include "rclcpp/qos.hpp"

namespace rclcpp
{
/// Create a generic service client with a name of given type.
/**
* \param[in] node_base NodeBaseInterface implementation of the node on which
* to create the client.
* \param[in] node_graph NodeGraphInterface implementation of the node on which
* to create the client.
* \param[in] node_services NodeServicesInterface implementation of the node on
* which to create the client.
* \param[in] service_name The name on which the service is accessible.
* \param[in] service_type The name of service type, e.g. "test_msgs/srv/BasicTypes"
* \param[in] qos Quality of service profile for client.
* \param[in] group Callback group to handle the reply to service calls.
* \return Shared pointer to the created client.
*/
RCLCPP_PUBLIC
rclcpp::GenericClient::SharedPtr
create_generic_client(
std::shared_ptr<node_interfaces::NodeBaseInterface> node_base,
std::shared_ptr<node_interfaces::NodeGraphInterface> node_graph,
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
const std::string & service_name,
const std::string & service_type,
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
rclcpp::CallbackGroup::SharedPtr group = nullptr);

/// Create a generic service client with a name of given type.
/**
* The NodeT type needs to have NodeBaseInterface implementation, NodeGraphInterface implementation
* and NodeServicesInterface implementation of the node which to create the client.
*
* \param[in] node The node on which to create the client.
* \param[in] service_name The name on which the service is accessible.
* \param[in] service_type The name of service type, e.g. "test_msgs/srv/BasicTypes"
* \param[in] qos Quality of service profile for client.
* \param[in] group Callback group to handle the reply to service calls.
* \return Shared pointer to the created client.
*/
template<typename NodeT>
rclcpp::GenericClient::SharedPtr
create_generic_client(
NodeT node,
const std::string & service_name,
const std::string & service_type,
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
rclcpp::CallbackGroup::SharedPtr group = nullptr)
{
return create_generic_client(
rclcpp::node_interfaces::get_node_base_interface(node),
rclcpp::node_interfaces::get_node_graph_interface(node),
rclcpp::node_interfaces::get_node_services_interface(node),
service_name,
service_type,
qos,
group
);
}
} // namespace rclcpp

#endif // RCLCPP__CREATE_GENERIC_CLIENT_HPP_
207 changes: 207 additions & 0 deletions rclcpp/include/rclcpp/generic_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2023 Sony Group Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__GENERIC_CLIENT_HPP_
#define RCLCPP__GENERIC_CLIENT_HPP_

#include <map>
#include <memory>
#include <future>
#include <string>
#include <vector>
#include <utility>

#include "rcl/client.h"

#include "rclcpp/client.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcpputils/shared_library.hpp"

#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp"

namespace rclcpp
{
class GenericClient : public ClientBase
{
public:
using Request = void *; // Serialized data pointer of request message
using Response = void *; // Serialized data pointer of response message

using SharedResponse = std::shared_ptr<void>;

using Promise = std::promise<SharedResponse>;
using SharedPromise = std::shared_ptr<Promise>;

using Future = std::future<SharedResponse>;
using SharedFuture = std::shared_future<SharedResponse>;

RCLCPP_SMART_PTR_DEFINITIONS(GenericClient)

/// A convenient GenericClient::Future and request id pair.
/**
* Public members:
* - future: a std::future<void *>.
* - request_id: the request id associated with the future.
*
* All the other methods are equivalent to the ones std::future provides.
*/
struct FutureAndRequestId
: detail::FutureAndRequestId<Future>
{
using detail::FutureAndRequestId<Future>::FutureAndRequestId;

/// See std::future::share().
SharedFuture share() noexcept {return this->future.share();}

/// Move constructor.
FutureAndRequestId(FutureAndRequestId && other) noexcept = default;
/// Deleted copy constructor, each instance is a unique owner of the future.
FutureAndRequestId(const FutureAndRequestId & other) = delete;
/// Move assignment.
FutureAndRequestId & operator=(FutureAndRequestId && other) noexcept = default;
/// Deleted copy assignment, each instance is a unique owner of the future.
FutureAndRequestId & operator=(const FutureAndRequestId & other) = delete;
/// Destructor.
~FutureAndRequestId() = default;
};

GenericClient(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
const std::string & service_name,
const std::string & service_type,
rcl_client_options_t & client_options);

RCLCPP_PUBLIC
SharedResponse
create_response() override;

RCLCPP_PUBLIC
std::shared_ptr<rmw_request_id_t>
create_request_header() override;

RCLCPP_PUBLIC
void
handle_response(
std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response) override;

/// Send a request to the service server.
/**
* This method returns a `FutureAndRequestId` instance
* that can be passed to Executor::spin_until_future_complete() to
* wait until it has been completed.
*
* If the future never completes,
* e.g. the call to Executor::spin_until_future_complete() times out,
* GenericClient::remove_pending_request() must be called to clean the client internal state.
* Not doing so will make the `Client` instance to use more memory each time a response is not
* received from the service server.
*
* ```cpp
* auto future = client->async_send_request(my_request);
* if (
* rclcpp::FutureReturnCode::TIMEOUT ==
* executor->spin_until_future_complete(future, timeout))
* {
* client->remove_pending_request(future);
* // handle timeout
* } else {
* handle_response(future.get());
* }
* ```
*
* \param[in] request request to be send.
* \return a FutureAndRequestId instance.
*/
RCLCPP_PUBLIC
FutureAndRequestId
async_send_request(const Request request);

/// Clean all pending requests older than a time_point.
/**
* \param[in] time_point Requests that were sent before this point are going to be removed.
* \param[inout] pruned_requests Removed requests id will be pushed to the vector
* if a pointer is provided.
* \return number of pending requests that were removed.
*/
template<typename AllocatorT = std::allocator<int64_t>>
size_t
prune_requests_older_than(
std::chrono::time_point<std::chrono::system_clock> time_point,
std::vector<int64_t, AllocatorT> * pruned_requests = nullptr)
{
return detail::prune_requests_older_than_impl(
pending_requests_,
pending_requests_mutex_,
time_point,
pruned_requests);
}

RCLCPP_PUBLIC
size_t
prune_pending_requests();

RCLCPP_PUBLIC
bool
remove_pending_request(
int64_t request_id);

/// Take the next response for this client.
/**
* \sa ClientBase::take_type_erased_response().
*
* \param[out] response_out The reference to a Service Response into
* which the middleware will copy the response being taken.
* \param[out] request_header_out The request header to be filled by the
* middleware when taking, and which can be used to associate the response
* to a specific request.
* \returns true if the response was taken, otherwise false.
* \throws rclcpp::exceptions::RCLError based exceptions if the underlying
* rcl function fail.
*/
RCLCPP_PUBLIC
bool
take_response(Response response_out, rmw_request_id_t & request_header_out)
{
return this->take_type_erased_response(response_out, request_header_out);
}

protected:
using CallbackInfoVariant = std::variant<
std::promise<SharedResponse>>; // Use variant for extension

int64_t
async_send_request_impl(
const Request request,
CallbackInfoVariant value);

std::optional<CallbackInfoVariant>
get_and_erase_pending_request(
int64_t request_number);

RCLCPP_DISABLE_COPY(GenericClient)

std::map<int64_t, std::pair<
std::chrono::time_point<std::chrono::system_clock>,
CallbackInfoVariant>> pending_requests_;
std::mutex pending_requests_mutex_;

private:
std::shared_ptr<rcpputils::SharedLibrary> ts_lib_;
const rosidl_typesupport_introspection_cpp::MessageMembers * response_members_;
};
} // namespace rclcpp

#endif // RCLCPP__GENERIC_CLIENT_HPP_
Loading

0 comments on commit 3df73f0

Please sign in to comment.