Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ Status OlapScanner::prepare() {
.query_id = &_state->query_id(),
.file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats,
.is_inverted_index = true,
.runtime_state = _state,
};

RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect(
Expand Down
11 changes: 10 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "io/io_common.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "service/backend_options.h"
Expand Down Expand Up @@ -504,6 +505,11 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
SCOPED_CONCURRENCY_COUNT(
ConcurrencyStatsManager::instance().cached_remote_reader_blocking);
do {
// Bail out early if the query has been cancelled, so that cancelled
// scanners do not keep waiting for cache block download.
if (io_ctx && io_ctx->runtime_state && io_ctx->runtime_state->is_cancelled()) {
return Status::Cancelled("cache wait cancelled due to query cancellation");
}
SCOPED_RAW_TIMER(&stats.remote_wait_timer);
SCOPED_RAW_TIMER(&stats.remote_read_timer);
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING");
Expand Down Expand Up @@ -561,7 +567,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
RETURN_IF_ERROR(_remote_file_reader->read_at(
current_offset,
Slice(result.data + (current_offset - offset), read_size),
&nest_bytes_read));
&nest_bytes_read, io_ctx));
indirect_read_bytes += read_size;
DCHECK(nest_bytes_read == read_size);
}
Expand Down Expand Up @@ -650,9 +656,12 @@ void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IO
dryrun_ctx = *io_ctx;
}
dryrun_ctx.is_dryrun = true;
// Clear per-query pointers: prefetch tasks may outlive the query lifecycle,
// so we must not hold dangling references to query-scoped objects.
dryrun_ctx.query_id = nullptr;
dryrun_ctx.file_cache_stats = nullptr;
dryrun_ctx.file_reader_stats = nullptr;
dryrun_ctx.runtime_state = nullptr;

LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
<< fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}",
Expand Down
8 changes: 7 additions & 1 deletion be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "io/fs/obj_storage_client.h"
#include "io/fs/s3_common.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
Expand Down Expand Up @@ -107,7 +108,7 @@ Status S3FileReader::close() {
}

Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
Expand Down Expand Up @@ -160,6 +161,11 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea

int total_sleep_time = 0;
while (retry_count <= max_retries) {
// Bail out early if the query has been cancelled, so that cancelled
// scanners do not keep retrying against S3 with exponential backoff.
if (io_ctx && io_ctx->runtime_state && io_ctx->runtime_state->is_cancelled()) {
return Status::Cancelled("S3 read cancelled due to query cancellation");
}
*bytes_read = 0;
s3_file_reader_read_counter << 1;
// clang-format off
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace doris {

class RuntimeState;

enum class ReaderType : uint8_t {
READER_QUERY = 0,
READER_ALTER_TABLE = 1,
Expand Down Expand Up @@ -95,6 +97,10 @@ struct IOContext {
// if `is_warmup` == true, this I/O request is from a warm up task
bool is_warmup {false};
int64_t condition_cache_filtered_rows = 0;
// Non-owning pointer to the query's RuntimeState for cancellation checks at
// I/O blocking boundaries (S3 retry loop, cache wait loop). Must be nullptr
// for contexts that may outlive the query (e.g. async prefetch tasks).
RuntimeState* runtime_state = nullptr;
};

} // namespace io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,12 @@ void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx.reader_type = ctx->reader_type;
_io_ctx.query_id = ctx->query_id;
_io_ctx.file_cache_stats = ctx->file_cache_stats;
_io_ctx.runtime_state = ctx->runtime_state;
} else {
_io_ctx.reader_type = ReaderType::UNKNOWN;
_io_ctx.query_id = nullptr;
_io_ctx.file_cache_stats = nullptr;
_io_ctx.runtime_state = nullptr;
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_context->runtime_state->query_options().enable_file_cache;
_read_options.io_ctx.is_disposable =
_read_context->runtime_state->query_options().disable_file_cache;
_read_options.io_ctx.runtime_state = _read_context->runtime_state;
}

if (_read_context->condition_cache_digest) {
Expand Down
Loading