diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index b4f59146cd41a5..58630de76e2ea9 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -423,6 +423,12 @@ void PrefetchBuffer::reset_offset(size_t offset) { } else { _exceed = false; } + // Lazy-allocate the backing buffer in the calling (query) thread, which has a + // MemTrackerLimiter attached. The prefetch thread pool threads are "Orphan" threads + // without a tracker, so allocation must not happen there. + if (_buf.empty()) { + _buf.resize(_size); + } _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); } @@ -449,13 +455,6 @@ void PrefetchBuffer::prefetch_buffer() { _prefetched.notify_all(); } - // Lazy-allocate the backing buffer on first actual prefetch, avoiding the cost of - // pre-allocating memory for readers that are initialized but never read (e.g. when - // many file readers are created concurrently for a TVF scan over many small S3 files). - if (!_buf) { - _buf = std::make_unique(_size); - } - int read_range_index = search_read_range(_offset); size_t buf_size; if (read_range_index == -1) { @@ -470,7 +469,7 @@ void PrefetchBuffer::prefetch_buffer() { { SCOPED_RAW_TIMER(&_statis.read_time); - s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx); + s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, _io_ctx); } if (UNLIKELY(s.ok() && buf_size != _len)) { // This indicates that the data size returned by S3 object storage is smaller than what we requested, @@ -609,7 +608,7 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); { SCOPED_RAW_TIMER(&_statis.copy_time); - memcpy((void*)out, _buf.get() + (off - _offset), read_len); + memcpy((void*)out, _buf.data() + (off - _offset), read_len); } *bytes_read = read_len; _statis.request_io += 1; @@ -632,6 +631,11 @@ void PrefetchBuffer::close() { } _buffer_status = BufferStatus::CLOSED; _prefetched.notify_all(); + // Explicitly release the backing buffer here, in the calling (query) thread which has a + // MemTrackerLimiter. The destructor may run in the thread pool's Orphan thread (when the + // last shared_ptr ref is released after the prefetch lambda completes), so we must not + // rely on ~PODArray() to release memory — that would trigger memory_orphan_check(). + vectorized::PODArray().swap(_buf); } void PrefetchBuffer::_collect_profile_before_close() { diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 709c0439b24072..0ac256211beb9e 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -38,6 +38,7 @@ #include "util/runtime_profile.h" #include "util/slice.h" #include "vec/common/custom_allocator.h" +#include "vec/common/pod_array.h" #include "vec/common/typeid_cast.h" namespace doris { @@ -425,7 +426,6 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro _whole_buffer_size(whole_buffer_size), _reader(reader), _io_ctx(io_ctx), - _buf(nullptr), _sync_profile(std::move(sync_profile)) {} PrefetchBuffer(PrefetchBuffer&& other) @@ -452,7 +452,7 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t _whole_buffer_size; io::FileReader* _reader = nullptr; const IOContext* _io_ctx = nullptr; - std::unique_ptr _buf; + vectorized::PODArray _buf; BufferStatus _buffer_status {BufferStatus::RESET}; std::mutex _lock; std::condition_variable _prefetched; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 7c71603f07c58f..8b6196d392fb64 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -41,6 +41,7 @@ #include "runtime/exec_env.h" #include "util/obj_lru_cache.h" #include "util/slice.h" +#include "vec/common/pod_array.h" namespace doris::io { @@ -296,18 +297,19 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_ // 4. read remote and write to local LOG(INFO) << "read remote file: " << remote_file << " to local: " << local_file; constexpr size_t buf_sz = 1024 * 1024; - std::unique_ptr read_buf(new char[buf_sz]); + vectorized::PODArray read_buf; + read_buf.resize(buf_sz); size_t cur_offset = 0; while (true) { size_t read_len = 0; - Slice file_slice(read_buf.get(), buf_sz); + Slice file_slice(read_buf.data(), buf_sz); RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, &read_len)); cur_offset += read_len; if (read_len == 0) { break; } - RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len})); + RETURN_IF_ERROR(local_writer->append({read_buf.data(), read_len})); } return local_writer->close(); } diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index fb243179baf557..761a8c825e70f1 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -83,7 +83,7 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url) } } - _read_buffer = std::make_unique(READ_BUFFER_SIZE); + _read_buffer.resize(READ_BUFFER_SIZE); } HttpFileReader::~HttpFileReader() { @@ -153,8 +153,8 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size << " url=" << _url << " range_supported=" << _range_supported; - if (!_read_buffer) { - _read_buffer = std::make_unique(READ_BUFFER_SIZE); + if (_read_buffer.empty()) { + _read_buffer.resize(READ_BUFFER_SIZE); } size_t to_read = result.size; @@ -175,7 +175,7 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r << "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len << " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE; - std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len); + std::memcpy(result.data, _read_buffer.data() + buffer_idx, copy_len); buffer_offset += copy_len; to_read -= copy_len; offset += copy_len; @@ -287,7 +287,7 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r buffer_offset += slice_len; size_t cached = std::min(slice_len, (size_t)READ_BUFFER_SIZE); - std::memcpy(_read_buffer.get(), buf.data() + offset, cached); + std::memcpy(_read_buffer.data(), buf.data() + offset, cached); _buffer_start = offset; _buffer_end = offset + cached; @@ -303,12 +303,12 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r buffer_offset += buf.size(); } else { size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE); - std::memcpy(_read_buffer.get(), buf.data(), cached); + std::memcpy(_read_buffer.data(), buf.data(), cached); _buffer_start = offset; _buffer_end = offset + cached; size_t copy_len = std::min(remaining, cached); - std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len); + std::memcpy(result.data + buffer_offset, _read_buffer.data(), copy_len); buffer_offset += copy_len; } @@ -327,7 +327,7 @@ Status HttpFileReader::close() { } // Release buffer memory (1MB) - _read_buffer.reset(); + vectorized::PODArray().swap(_read_buffer); _buffer_start = 0; _buffer_end = 0; diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 607eedf3d1a50b..6dad0585491008 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -29,6 +29,7 @@ #include "io/fs/file_system.h" #include "util/runtime_profile.h" #include "util/slice.h" +#include "vec/common/pod_array.h" namespace doris::io { typedef struct OpenFileInfo { @@ -60,7 +61,7 @@ class HttpFileReader final : public FileReader { // Returns OK on success with _range_supported set appropriately Status detect_range_support(); - std::unique_ptr _read_buffer; + vectorized::PODArray _read_buffer; static constexpr size_t READ_BUFFER_SIZE = 1 << 20; // 1MB // Default maximum file size for servers that don't support Range requests static constexpr size_t DEFAULT_MAX_REQUEST_SIZE = 100 << 20; // 100MB