diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index cc5a983b1cd236..7ac35734a45b42 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -423,6 +423,12 @@ void PrefetchBuffer::reset_offset(size_t offset) { } else { _exceed = false; } + // Lazy-allocate the backing buffer in the calling (query) thread, which has a + // MemTrackerLimiter attached. The prefetch thread pool threads are "Orphan" threads + // without a tracker, so allocation must not happen there. + if (_buf.empty()) { + _buf.resize(_size); + } _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); } @@ -449,13 +455,6 @@ void PrefetchBuffer::prefetch_buffer() { _prefetched.notify_all(); } - // Lazy-allocate the backing buffer on first actual prefetch, avoiding the cost of - // pre-allocating memory for readers that are initialized but never read (e.g. when - // many file readers are created concurrently for a TVF scan over many small S3 files). - if (!_buf) { - _buf = std::make_unique(_size); - } - int read_range_index = search_read_range(_offset); size_t buf_size; if (read_range_index == -1) { @@ -470,7 +469,7 @@ void PrefetchBuffer::prefetch_buffer() { { SCOPED_RAW_TIMER(&_statis.read_time); - s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx); + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, _io_ctx); } if (UNLIKELY(s.ok() && buf_size != _len)) { // This indicates that the data size returned by S3 object storage is smaller than what we requested, @@ -600,7 +599,7 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); { SCOPED_RAW_TIMER(&_statis.copy_time); - memcpy((void*)out, _buf.get() + (off - _offset), read_len); + memcpy((void*)out, _buf.data() + (off - _offset), read_len); } *bytes_read = read_len; _statis.request_io += 1; @@ -623,6 +622,11 @@ void PrefetchBuffer::close() { } _buffer_status = BufferStatus::CLOSED; _prefetched.notify_all(); + // Explicitly release the backing buffer here, in the calling (query) thread which has a + // MemTrackerLimiter. The destructor may run in the thread pool's Orphan thread (when the + // last shared_ptr ref is released after the prefetch lambda completes), so we must not + // rely on ~PODArray() to release memory — that would trigger memory_orphan_check(). + PODArray().swap(_buf); } void PrefetchBuffer::_collect_profile_before_close() { diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 34b59d2e19a4d2..e6151d3ce39b64 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -29,6 +29,7 @@ #include "common/status.h" #include "core/custom_allocator.h" +#include "core/pod_array.h" #include "core/typeid_cast.h" #include "io/cache/cached_remote_file_reader.h" #include "io/file_factory.h" @@ -430,7 +431,6 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro _whole_buffer_size(whole_buffer_size), _reader(reader), _io_ctx(io_ctx), - _buf(nullptr), _sync_profile(std::move(sync_profile)) {} PrefetchBuffer(PrefetchBuffer&& other) @@ -457,7 +457,7 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t _whole_buffer_size; io::FileReader* _reader = nullptr; const IOContext* _io_ctx = nullptr; - std::unique_ptr _buf; + PODArray _buf; BufferStatus _buffer_status {BufferStatus::RESET}; std::mutex _lock; std::condition_variable _prefetched; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 7c71603f07c58f..a137a2934a05c0 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -31,6 +31,7 @@ #include "common/config.h" #include "common/status.h" +#include "core/pod_array.h" #include "io/fs/err_utils.h" #include "io/fs/hdfs/hdfs_mgr.h" #include "io/fs/hdfs_file_reader.h" @@ -296,18 +297,19 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_ // 4. read remote and write to local LOG(INFO) << "read remote file: " << remote_file << " to local: " << local_file; constexpr size_t buf_sz = 1024 * 1024; - std::unique_ptr read_buf(new char[buf_sz]); + PODArray read_buf; + read_buf.resize(buf_sz); size_t cur_offset = 0; while (true) { size_t read_len = 0; - Slice file_slice(read_buf.get(), buf_sz); + Slice file_slice(read_buf.data(), buf_sz); RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, &read_len)); cur_offset += read_len; if (read_len == 0) { break; } - RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len})); + RETURN_IF_ERROR(local_writer->append({read_buf.data(), read_len})); } return local_writer->close(); } diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index a2085c6fe6b0fc..dc676e4a9f413e 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -99,7 +99,7 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, in } } - _read_buffer = std::make_unique(READ_BUFFER_SIZE); + _read_buffer.resize(READ_BUFFER_SIZE); } HttpFileReader::~HttpFileReader() { @@ -214,8 +214,8 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size << " url=" << _url << " range_supported=" << _range_supported; - if (!_read_buffer) { - _read_buffer = std::make_unique(READ_BUFFER_SIZE); + if (_read_buffer.empty()) { + _read_buffer.resize(READ_BUFFER_SIZE); } size_t to_read = result.size; @@ -252,7 +252,7 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r << "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len << " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE; - std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len); + std::memcpy(result.data, _read_buffer.data() + buffer_idx, copy_len); buffer_offset += copy_len; to_read -= copy_len; offset += copy_len; @@ -419,12 +419,12 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r buffer_offset += buf.size(); } else { size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE); - std::memcpy(_read_buffer.get(), buf.data(), cached); + std::memcpy(_read_buffer.data(), buf.data(), cached); _buffer_start = offset; _buffer_end = offset + cached; size_t copy_len = std::min(remaining, cached); - std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len); + std::memcpy(result.data + buffer_offset, _read_buffer.data(), copy_len); buffer_offset += copy_len; } @@ -443,7 +443,7 @@ Status HttpFileReader::close() { } // Release buffer memory (1MB) - _read_buffer.reset(); + PODArray().swap(_read_buffer); _buffer_start = 0; _buffer_end = 0; diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 91a360a000dfbf..467abf251b650e 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -23,6 +23,7 @@ #include #include "common/status.h" +#include "core/pod_array.h" #include "io/fs/file_handle_cache.h" #include "io/fs/file_reader.h" #include "io/fs/file_system.h" @@ -66,7 +67,7 @@ class HttpFileReader final : public FileReader { // Called at the start of open() when enable_cdc_client=true. Status setup_cdc_client(); - std::unique_ptr _read_buffer; + PODArray _read_buffer; static constexpr size_t READ_BUFFER_SIZE = 1 << 20; // 1MB // Default maximum file size for servers that don't support Range requests static constexpr size_t DEFAULT_MAX_REQUEST_SIZE = 100 << 20; // 100MB