diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index e453bc7ee443a2..48e8d6624de77f 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -30,6 +30,7 @@ #include "io/cache/block_file_cache.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "io/cache/file_cache_expiration.h" #include "runtime/thread_context.h" #include "runtime/workload_management/io_throttle.h" #include "util/async_io.h" @@ -446,13 +447,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c << " us, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rowset_id.to_string(); } - int64_t expiration_time = - tablet_meta->ttl_seconds() == 0 || rs_meta.newest_write_timestamp() <= 0 - ? 0 - : rs_meta.newest_write_timestamp() + tablet_meta->ttl_seconds(); - if (expiration_time <= UnixSeconds()) { - expiration_time = 0; - } + int64_t expiration_time = io::calc_file_cache_expiration_time( + rs_meta.newest_write_timestamp(), tablet_meta->ttl_seconds()); if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpTriggerSource::EVENT_DRIVEN)) { LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string() diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index c5b58049ae42b4..de152ae8341391 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -65,6 +65,9 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { _is_pending = true; _rowset_meta->set_load_id(_context.load_id); + if (_context.newest_write_timestamp > 0) { + _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); + } } else { // Rowset generated by compaction or schema change _rowset_meta->set_version(_context.version); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 75f3218c5f3cda..99ecc4d8908830 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -47,6 +47,7 @@ #include "cpp/sync_point.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "io/cache/file_cache_expiration.h" #include "olap/base_tablet.h" #include "olap/compaction.h" #include "olap/cumulative_compaction_time_series_policy.h" @@ -445,12 +446,8 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ continue; } - int64_t expiration_time = - _tablet_meta->ttl_seconds() == 0 || - rowset_meta->newest_write_timestamp() <= 0 - ? 0 - : rowset_meta->newest_write_timestamp() + - _tablet_meta->ttl_seconds(); + int64_t expiration_time = io::calc_file_cache_expiration_time( + rowset_meta->newest_write_timestamp(), _tablet_meta->ttl_seconds()); g_file_cache_cloud_tablet_submitted_segment_num << 1; if (rs->rowset_meta()->segment_file_size(seg_id) > 0) { g_file_cache_cloud_tablet_submitted_segment_size diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 85f350ef0b0cd6..dd2b678447c549 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -33,6 +33,7 @@ #include "common/cast_set.h" #include "common/logging.h" #include "io/cache/block_file_cache_downloader.h" +#include "io/cache/file_cache_expiration.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/tablet.h" @@ -237,13 +238,8 @@ void CloudWarmUpManager::handle_jobs() { continue; } - int64_t expiration_time = - tablet_meta->ttl_seconds() == 0 || rs->newest_write_timestamp() <= 0 - ? 0 - : rs->newest_write_timestamp() + tablet_meta->ttl_seconds(); - if (expiration_time <= UnixSeconds()) { - expiration_time = 0; - } + int64_t expiration_time = io::calc_file_cache_expiration_time( + rs->newest_write_timestamp(), tablet_meta->ttl_seconds()); if (!tablet->add_rowset_warmup_state(*rs, WarmUpTriggerSource::JOB)) { LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id() << ", skip it"; diff --git a/be/src/io/cache/file_cache_expiration.h b/be/src/io/cache/file_cache_expiration.h new file mode 100644 index 00000000000000..050658ecf77df7 --- /dev/null +++ b/be/src/io/cache/file_cache_expiration.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "util/time.h" + +namespace doris::io { + +// Calc absolute expiration timestamp for file cache TTL mode. +// +// Return 0 means treat it as non-TTL cache (NORMAL/INDEX/DISPOSABLE) and avoid +// putting data into TTL queues / TTL-path directories. +inline int64_t calc_file_cache_expiration_time(int64_t newest_write_timestamp, + int64_t ttl_seconds) { + if (ttl_seconds <= 0 || newest_write_timestamp <= 0) { + return 0; + } + + // Overflow protection. + if (newest_write_timestamp > std::numeric_limits::max() - ttl_seconds) { + return 0; + } + + const int64_t expiration_time = newest_write_timestamp + ttl_seconds; + // Clamp expired TTL to 0 to keep behavior consistent across read/write/warmup. + return expiration_time > UnixSeconds() ? expiration_time : 0; +} + +} // namespace doris::io diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 0b44597ed95671..6a0bb53a48fcc9 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -30,6 +30,7 @@ #include "common/logging.h" #include "common/status.h" +#include "io/cache/file_cache_expiration.h" #include "io/io_common.h" #include "olap/block_column_predicate.h" #include "olap/column_predicate.h" @@ -224,13 +225,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_context->runtime_state->query_options().disable_file_cache; } - _read_options.io_ctx.expiration_time = - read_context->ttl_seconds > 0 && _rowset->rowset_meta()->newest_write_timestamp() > 0 - ? _rowset->rowset_meta()->newest_write_timestamp() + read_context->ttl_seconds - : 0; - if (_read_options.io_ctx.expiration_time <= UnixSeconds()) { - _read_options.io_ctx.expiration_time = 0; - } + _read_options.io_ctx.expiration_time = io::calc_file_cache_expiration_time( + _rowset->rowset_meta()->newest_write_timestamp(), read_context->ttl_seconds); bool enable_segment_cache = true; auto* state = read_context->runtime_state; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 8e0794ce5d3018..d3039182428947 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -315,6 +315,9 @@ Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte _is_pending = true; _rowset_meta->set_txn_id(_context.txn_id); _rowset_meta->set_load_id(_context.load_id); + if (_context.newest_write_timestamp > 0) { + _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); + } } else { _rowset_meta->set_version(_context.version); _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 727c445a61edb4..c57346bca163f2 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -27,6 +27,7 @@ #include "cloud/config.h" #include "common/status.h" +#include "io/cache/file_cache_expiration.h" #include "io/fs/encrypted_fs_factory.h" #include "io/fs/file_system.h" #include "io/fs/file_writer.h" @@ -249,9 +250,8 @@ struct RowsetWriterContext { return io::FileWriterOptions { .write_file_cache = should_write_cache, .is_cold_data = is_hot_data, - .file_cache_expiration = file_cache_ttl_sec > 0 && newest_write_timestamp > 0 - ? newest_write_timestamp + file_cache_ttl_sec - : 0, + .file_cache_expiration = io::calc_file_cache_expiration_time( + newest_write_timestamp, static_cast(file_cache_ttl_sec)), .approximate_bytes_to_write = approximate_bytes_to_write}; } };