Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ void PrefetchBuffer::reset_offset(size_t offset) {
} else {
_exceed = false;
}
// Lazy-allocate the backing buffer in the calling (query) thread, which has a
// MemTrackerLimiter attached. The prefetch thread pool threads are "Orphan" threads
// without a tracker, so allocation must not happen there.
if (_buf.empty()) {
_buf.resize(_size);
}
_prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
[buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); });
}
Expand All @@ -449,13 +455,6 @@ void PrefetchBuffer::prefetch_buffer() {
_prefetched.notify_all();
}

// Lazy-allocate the backing buffer on first actual prefetch, avoiding the cost of
// pre-allocating memory for readers that are initialized but never read (e.g. when
// many file readers are created concurrently for a TVF scan over many small S3 files).
if (!_buf) {
_buf = std::make_unique<char[]>(_size);
}

int read_range_index = search_read_range(_offset);
size_t buf_size;
if (read_range_index == -1) {
Expand All @@ -470,7 +469,7 @@ void PrefetchBuffer::prefetch_buffer() {

{
SCOPED_RAW_TIMER(&_statis.read_time);
s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx);
s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, _io_ctx);
}
if (UNLIKELY(s.ok() && buf_size != _len)) {
// This indicates that the data size returned by S3 object storage is smaller than what we requested,
Expand Down Expand Up @@ -600,7 +599,7 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off});
{
SCOPED_RAW_TIMER(&_statis.copy_time);
memcpy((void*)out, _buf.get() + (off - _offset), read_len);
memcpy((void*)out, _buf.data() + (off - _offset), read_len);
}
*bytes_read = read_len;
_statis.request_io += 1;
Expand All @@ -623,6 +622,11 @@ void PrefetchBuffer::close() {
}
_buffer_status = BufferStatus::CLOSED;
_prefetched.notify_all();
// Explicitly release the backing buffer here, in the calling (query) thread which has a
// MemTrackerLimiter. The destructor may run in the thread pool's Orphan thread (when the
// last shared_ptr ref is released after the prefetch lambda completes), so we must not
// rely on ~PODArray() to release memory — that would trigger memory_orphan_check().
PODArray<char>().swap(_buf);
}

void PrefetchBuffer::_collect_profile_before_close() {
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "common/status.h"
#include "core/custom_allocator.h"
#include "core/pod_array.h"
#include "core/typeid_cast.h"
#include "io/cache/cached_remote_file_reader.h"
#include "io/file_factory.h"
Expand Down Expand Up @@ -430,7 +431,6 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
_whole_buffer_size(whole_buffer_size),
_reader(reader),
_io_ctx(io_ctx),
_buf(nullptr),
_sync_profile(std::move(sync_profile)) {}

PrefetchBuffer(PrefetchBuffer&& other)
Expand All @@ -457,7 +457,7 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
size_t _whole_buffer_size;
io::FileReader* _reader = nullptr;
const IOContext* _io_ctx = nullptr;
std::unique_ptr<char[]> _buf;
PODArray<char> _buf;
BufferStatus _buffer_status {BufferStatus::RESET};
std::mutex _lock;
std::condition_variable _prefetched;
Expand Down
8 changes: 5 additions & 3 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "common/config.h"
#include "common/status.h"
#include "core/pod_array.h"
#include "io/fs/err_utils.h"
#include "io/fs/hdfs/hdfs_mgr.h"
#include "io/fs/hdfs_file_reader.h"
Expand Down Expand Up @@ -296,18 +297,19 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_
// 4. read remote and write to local
LOG(INFO) << "read remote file: " << remote_file << " to local: " << local_file;
constexpr size_t buf_sz = 1024 * 1024;
std::unique_ptr<char[]> read_buf(new char[buf_sz]);
PODArray<char> read_buf;
read_buf.resize(buf_sz);
size_t cur_offset = 0;
while (true) {
size_t read_len = 0;
Slice file_slice(read_buf.get(), buf_sz);
Slice file_slice(read_buf.data(), buf_sz);
RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, &read_len));
cur_offset += read_len;
if (read_len == 0) {
break;
}

RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len}));
RETURN_IF_ERROR(local_writer->append({read_buf.data(), read_len}));
}
return local_writer->close();
}
Expand Down
14 changes: 7 additions & 7 deletions be/src/io/fs/http_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, in
}
}

_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
_read_buffer.resize(READ_BUFFER_SIZE);
}

HttpFileReader::~HttpFileReader() {
Expand Down Expand Up @@ -214,8 +214,8 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size
<< " url=" << _url << " range_supported=" << _range_supported;

if (!_read_buffer) {
_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
if (_read_buffer.empty()) {
_read_buffer.resize(READ_BUFFER_SIZE);
}

size_t to_read = result.size;
Expand Down Expand Up @@ -252,7 +252,7 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
<< "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len
<< " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE;

std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len);
std::memcpy(result.data, _read_buffer.data() + buffer_idx, copy_len);
buffer_offset += copy_len;
to_read -= copy_len;
offset += copy_len;
Expand Down Expand Up @@ -419,12 +419,12 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
buffer_offset += buf.size();
} else {
size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE);
std::memcpy(_read_buffer.get(), buf.data(), cached);
std::memcpy(_read_buffer.data(), buf.data(), cached);
_buffer_start = offset;
_buffer_end = offset + cached;

size_t copy_len = std::min(remaining, cached);
std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len);
std::memcpy(result.data + buffer_offset, _read_buffer.data(), copy_len);
buffer_offset += copy_len;
}

Expand All @@ -443,7 +443,7 @@ Status HttpFileReader::close() {
}

// Release buffer memory (1MB)
_read_buffer.reset();
PODArray<char>().swap(_read_buffer);
_buffer_start = 0;
_buffer_end = 0;

Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/http_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>

#include "common/status.h"
#include "core/pod_array.h"
#include "io/fs/file_handle_cache.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
Expand Down Expand Up @@ -66,7 +67,7 @@ class HttpFileReader final : public FileReader {
// Called at the start of open() when enable_cdc_client=true.
Status setup_cdc_client();

std::unique_ptr<char[]> _read_buffer;
PODArray<char> _read_buffer;
static constexpr size_t READ_BUFFER_SIZE = 1 << 20; // 1MB
// Default maximum file size for servers that don't support Range requests
static constexpr size_t DEFAULT_MAX_REQUEST_SIZE = 100 << 20; // 100MB
Expand Down
Loading