diff --git a/docs/design.md b/docs/design.md index 43ecdc7a..2c163a2d 100644 --- a/docs/design.md +++ b/docs/design.md @@ -434,3 +434,20 @@ Thus, there is no direct implementation of actions in `rmw_zenoh_cpp`. ## Security TBD + +## Environment variables + +### `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES` + +The RMW recycles serialization buffers on transmission using a buffer pool with bounded memory +usage. +These buffers are returned to the pool - without being deallocated - once they cross the +network boundary in host-to-host communication, or after transmission in inter-process +communication, or upon being consumed by subscriptions in intra-process communication, etc. + +When the total size of the allocated buffers within the pool exceeds +`RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES`, serialization buffers are allocated using the system +allocator and moved to Zenoh; no recycling is performed in this case to prevent the buffer pool from +growing uncontrollably. + +The default value of `RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES` is 16MB; this value was chosen since it is roughly the size of the cache in a modern CPU. diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 732cd470..691ebc87 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -47,6 +47,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/type_support_common.cpp src/detail/zenoh_config.cpp src/detail/zenoh_utils.cpp + src/detail/buffer_pool.cpp src/rmw_event.cpp src/rmw_get_network_flow_endpoints.cpp src/rmw_get_node_info_and_types.cpp diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.cpp b/rmw_zenoh_cpp/src/detail/buffer_pool.cpp new file mode 100644 index 00000000..5d9a2d99 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.cpp @@ -0,0 +1,104 @@ +// Copyright 2025 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "buffer_pool.hpp" + +#include +#include +#include +#include + +#include "rcutils/allocator.h" +#include "rcutils/env.h" +#include "logging_macros.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +BufferPool::BufferPool() +: buffers_(), mutex_() +{ + const char * env_value; + const char * error_str = rcutils_get_env("RMW_ZENOH_BUFFER_POOL_MAX_SIZE_BYTES", &env_value); + if (error_str != nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to read maximum buffer pool size, falling back to default."); + max_size_ = DEFAULT_MAX_SIZE; + } else if (strcmp(env_value, "") == 0) { + max_size_ = DEFAULT_MAX_SIZE; + } else { + max_size_ = std::atoll(env_value); + } + size_ = 0; +} + +///============================================================================= +BufferPool::~BufferPool() +{ + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + for (Buffer & buffer : buffers_) { + allocator.deallocate(buffer.data, allocator.state); + } +} + +///============================================================================= +BufferPool::Buffer BufferPool::allocate(size_t size) +{ + std::lock_guard guard(mutex_); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + if (buffers_.empty()) { + if (size_ + size > max_size_) { + return {}; + } else { + size_ += size; + } + uint8_t * data = static_cast(allocator.allocate(size, allocator.state)); + if (data == nullptr) { + return {}; + } else { + return Buffer {data, size}; + } + } else { + Buffer buffer = buffers_.back(); + buffers_.pop_back(); + if (buffer.size < size) { + size_t size_diff = size - buffer.size; + if (size_ + size_diff > max_size_) { + return {}; + } + uint8_t * data = static_cast(allocator.reallocate( + buffer.data, size, allocator.state)); + if (data == nullptr) { + return {}; + } + size_ += size_diff; + buffer.data = data; + buffer.size = size; + } + return buffer; + } +} + +///============================================================================= +void BufferPool::deallocate(BufferPool::Buffer buffer) +{ + std::lock_guard guard(mutex_); + buffers_.push_back(buffer); +} + +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/buffer_pool.hpp b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp new file mode 100644 index 00000000..dcbd2588 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/buffer_pool.hpp @@ -0,0 +1,60 @@ +// Copyright 2025 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__BUFFER_POOL_HPP_ +#define DETAIL__BUFFER_POOL_HPP_ + +#include +#include +#include +#include + +#include "rcutils/allocator.h" +#include "rcutils/env.h" +#include "logging_macros.hpp" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +class BufferPool +{ +public: + struct Buffer + { + uint8_t * data; + size_t size; + }; + + BufferPool(); + + ~BufferPool(); + + Buffer allocate(size_t size); + + void deallocate(Buffer buffer); + +private: + std::vector buffers_; + std::mutex mutex_; + size_t max_size_; + size_t size_; + // NOTE(fuzzypixelz): Pooled buffers are recycled with the expectation that they would reside in + // cache, thus this this value should be comparable to the size of a modern CPU cache. The default + // value (16 MiB) is relatively conservative as CPU cache sizes range from a few MiB to a few + // hundred MiB. + const size_t DEFAULT_MAX_SIZE = 16 * 1024 * 1024; +}; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__BUFFER_POOL_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index a2fdaf5e..919f2760 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -24,6 +24,7 @@ #include "graph_cache.hpp" #include "rmw_node_data.hpp" +#include "buffer_pool.hpp" #include "rmw/ret_types.h" #include "rmw/types.h" @@ -92,6 +93,9 @@ struct rmw_context_impl_s final // Forward declaration class Data; + // Pool of serialization buffers. + rmw_zenoh_cpp::BufferPool serialization_buffer_pool; + private: std::shared_ptr data_{nullptr}; }; diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index f960b879..93596a49 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -216,24 +217,33 @@ rmw_ret_t PublisherData::publish( type_support_impl_); // To store serialized message byte array. - char * msg_bytes = nullptr; - - rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator; + uint8_t * msg_bytes = nullptr; + + rmw_context_impl_s *context_impl = static_cast(rmw_node_->data); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + // Try to get memory from the serialization buffer pool. + BufferPool::Buffer serialization_buffer = + context_impl->serialization_buffer_pool.allocate(max_data_length); + if (serialization_buffer.data == nullptr) { + void * data = allocator.allocate(max_data_length, allocator.state); + RMW_CHECK_FOR_NULL_WITH_MSG( + data, "failed to allocate serialization buffer", return RMW_RET_BAD_ALLOC); + msg_bytes = static_cast(data); + } else { + msg_bytes = serialization_buffer.data; + } auto always_free_msg_bytes = rcpputils::make_scope_exit( - [&msg_bytes, allocator]() { - if (msg_bytes) { - allocator->deallocate(msg_bytes, allocator->state); + [&msg_bytes, &allocator, &serialization_buffer]() { + if (serialization_buffer.data == nullptr) { + allocator.deallocate(msg_bytes, allocator.state); } }); - // Get memory from the allocator. - msg_bytes = static_cast(allocator->allocate(max_data_length, allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC); - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length); + eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast(msg_bytes), max_data_length); // Object that serializes the data rmw_zenoh_cpp::Cdr ser(fastbuffer); @@ -258,10 +268,19 @@ rmw_ret_t PublisherData::publish( sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes(); // TODO(ahcorde): shmbuf - std::vector raw_data( - reinterpret_cast(msg_bytes), - reinterpret_cast(msg_bytes) + data_length); - zenoh::Bytes payload(std::move(raw_data)); + zenoh::Bytes payload; + if (serialization_buffer.data == nullptr) { + std::vector raw_data( + reinterpret_cast(msg_bytes), + reinterpret_cast(msg_bytes) + data_length); + payload = zenoh::Bytes(std::move(raw_data)); + } else { + auto deleter = [buffer_pool = &context_impl->serialization_buffer_pool, + buffer = serialization_buffer](uint8_t *){ + buffer_pool->deallocate(buffer); + }; + payload = zenoh::Bytes(msg_bytes, data_length, deleter); + } TRACETOOLS_TRACEPOINT( rmw_publish, static_cast(rmw_publisher_), ros_message, source_timestamp);