From 47f86e9bb611086a8d61d2c655f5ad4af1fe5c71 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 10 Jun 2026 11:36:59 +0800 Subject: [PATCH 1/2] [test](be) Reproduce shared delta writer state lifetime bug ### What problem does this PR solve? Issue Number: None Problem Summary: Add a backend unit test that reproduces a shared DeltaWriterV2 lifetime bug. When shared delta writers are enabled, the first local sink can create the shared DeltaWriterV2 and the writer stores that sink's RuntimeState pointer. If that creator sink and its RuntimeState are destroyed while another local sink still reuses the shared writer, DeltaWriterV2::write() can access the destroyed RuntimeState in the flush-limit cancel check path. The test builds two VTabletWriterV2 instances for the same load, destroys the creator state after the shared writer is created, and then forces the second writer into the same wait path. On the unfixed code this deterministically reports an ASAN heap-use-after-free in DeltaWriterV2::write(). ### Release note None ### Check List (For Author) - Test: Unit Test - ./run-be-ut.sh --run --filter=TestVTabletWriterV2.shared_delta_writer_should_not_access_destroyed_creator_runtime_state -j 100 (fails as expected before the fix with AddressSanitizer heap-use-after-free in DeltaWriterV2::write()) - Behavior changed: No - Does this need documentation: No --- be/src/load/delta_writer/delta_writer_v2.cpp | 1 + be/test/exec/sink/vtablet_writer_v2_test.cpp | 218 +++++++++++++++++++ 2 files changed, 219 insertions(+) diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp b/be/src/load/delta_writer/delta_writer_v2.cpp index c117e2b3180799..de9945b69b094c 100644 --- a/be/src/load/delta_writer/delta_writer_v2.cpp +++ b/be/src/load/delta_writer/delta_writer_v2.cpp @@ -157,6 +157,7 @@ Status DeltaWriterV2::write(const Block* block, const DorisVector& row DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure", { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) { + DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait", DBUG_RUN_CALLBACK()); if (_state->is_cancelled()) { return _state->cancel_reason(); } diff --git a/be/test/exec/sink/vtablet_writer_v2_test.cpp b/be/test/exec/sink/vtablet_writer_v2_test.cpp index d62885d13959dd..8f09a3cb08f61b 100644 --- a/be/test/exec/sink/vtablet_writer_v2_test.cpp +++ b/be/test/exec/sink/vtablet_writer_v2_test.cpp @@ -18,9 +18,28 @@ #include "exec/sink/writer/vtablet_writer_v2.h" #include +#include +#include +#include +#include + +#include "common/config.h" +#include "core/data_type/data_type_number.h" +#include "exec/operator/operator_helper.h" +#include "exec/sink/delta_writer_v2_pool.h" #include "exec/sink/load_stream_map_pool.h" #include "exec/sink/load_stream_stub.h" +#include "exec/sink/sink_test_utils.h" +#include "io/fs/local_file_system.h" +#include "load/memtable/memtable_memory_limiter.h" +#include "runtime/exec_env.h" +#include "storage/storage_engine.h" +#include "storage/tablet/tablet_schema.h" +#include "testutil/column_helper.h" +#include "testutil/creators.h" +#include "util/debug_points.h" +#include "util/defer_op.h" namespace doris { @@ -63,6 +82,205 @@ static std::unique_ptr create_vtablet_writer(int num_replicas = return writer; } +static TColumn create_int_column_desc(bool is_nullable) { + auto column = testutil::create_tablet_column({"c1", TPrimitiveType::INT, true}); + column.__set_is_allow_null(is_nullable); + column.__set_col_unique_id(1); + return column; +} + +static void prepare_load_runtime_state(MockRuntimeState& state, int sender_id) { + state.set_backend_id(1); + state.set_per_fragment_instance_idx(sender_id); + state.set_num_per_fragment_instances(2); + state.set_load_stream_per_node(1); + state.set_total_load_streams(2); + state.set_num_local_sink(2); +} + +static void prepare_open_streams(std::shared_ptr load_stream_map, int64_t node_id, + int64_t index_id, const TabletSchemaSPtr& tablet_schema) { + auto streams = load_stream_map->get_or_create(node_id); + streams->mark_open(); + for (auto& stream : streams->streams()) { + stream->_is_open.store(true); + stream->_status = Status::OK(); + stream->_tablet_schema_for_index->emplace(index_id, tablet_schema); + stream->_enable_unique_mow_for_index->emplace(index_id, false); + } +} + +static TabletSchemaSPtr create_int_tablet_schema() { + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(1); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(2); + testutil::add_column_pb(&tablet_schema_pb, 1, "c1", "INT", true, false); + + auto tablet_schema = std::make_shared(); + tablet_schema->init_from_pb(tablet_schema_pb); + return tablet_schema; +} + +static TDataSink create_vtablet_writer_sink(const TOlapTableSchemaParam& schema, + const TOlapTablePartitionParam& partition, + const TOlapTableLocationParam& location, + TTupleId tuple_id, const TUniqueId& load_id) { + TDataSink t_sink; + t_sink.__isset.olap_table_sink = true; + auto& olap_sink = t_sink.olap_table_sink; + olap_sink.__set_load_id(load_id); + olap_sink.__set_txn_id(1); + olap_sink.__set_db_id(schema.db_id); + olap_sink.__set_table_id(schema.table_id); + olap_sink.__set_tuple_id(tuple_id); + olap_sink.__set_num_replicas(1); + olap_sink.__set_need_gen_rollup(false); + olap_sink.__set_schema(schema); + olap_sink.__set_partition(partition); + olap_sink.__set_location(location); + + TNodeInfo node; + node.__set_id(1); + node.__set_option(0); + node.__set_host("127.0.0.1"); + node.__set_async_internal_port(8060); + TPaloNodesInfo nodes; + nodes.nodes.push_back(node); + olap_sink.__set_nodes_info(nodes); + return t_sink; +} + +TEST_F(TestVTabletWriterV2, shared_delta_writer_should_not_access_destroyed_creator_runtime_state) { + const bool old_share_delta_writers = config::share_delta_writers; + const int32_t old_flush_running_count_limit = config::memtable_flush_running_count_limit; + const bool old_enable_debug_points = config::enable_debug_points; + config::share_delta_writers = true; + Defer restore_configs([&] { + config::share_delta_writers = old_share_delta_writers; + config::memtable_flush_running_count_limit = old_flush_running_count_limit; + config::enable_debug_points = old_enable_debug_points; + DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait"); + }); + + ExecEnv* exec_env = ExecEnv::GetInstance(); + auto old_load_stream_map_pool = std::move(exec_env->_load_stream_map_pool); + auto old_delta_writer_v2_pool = std::move(exec_env->_delta_writer_v2_pool); + auto old_memtable_memory_limiter = std::move(exec_env->_memtable_memory_limiter); + auto old_storage_engine = std::move(exec_env->_storage_engine); + const std::string old_storage_root_path = config::storage_root_path; + char cwd_buffer[1024]; + ASSERT_NE(nullptr, getcwd(cwd_buffer, sizeof(cwd_buffer))); + const std::string test_data_dir = + std::string(cwd_buffer) + "/vtablet_writer_v2_shared_delta_writer_test"; + Defer restore_exec_env([&]() mutable { + exec_env->_delta_writer_v2_pool.reset(); + exec_env->_load_stream_map_pool.reset(); + exec_env->_storage_engine.reset(); + exec_env->_memtable_memory_limiter.reset(); + exec_env->_storage_engine = std::move(old_storage_engine); + exec_env->_memtable_memory_limiter = std::move(old_memtable_memory_limiter); + exec_env->_delta_writer_v2_pool = std::move(old_delta_writer_v2_pool); + exec_env->_load_stream_map_pool = std::move(old_load_stream_map_pool); + config::storage_root_path = old_storage_root_path; + static_cast(io::global_local_filesystem()->delete_directory(test_data_dir)); + }); + + config::storage_root_path = test_data_dir; + ASSERT_TRUE(io::global_local_filesystem()->delete_directory(test_data_dir).ok()); + ASSERT_TRUE(io::global_local_filesystem()->create_directory(test_data_dir).ok()); + EngineOptions options; + options.store_paths.emplace_back(test_data_dir, -1); + auto engine = std::make_unique(options); + ASSERT_TRUE(engine->open().ok()); + exec_env->_storage_engine = std::move(engine); + auto memtable_memory_limiter = std::make_unique(); + ASSERT_TRUE(memtable_memory_limiter->init(1024 * 1024 * 1024).ok()); + exec_env->_memtable_memory_limiter = std::move(memtable_memory_limiter); + exec_env->_load_stream_map_pool = std::make_unique(); + exec_env->_delta_writer_v2_pool = std::make_unique(); + + auto creator_ctx = std::make_unique(); + OperatorContext current_ctx; + prepare_load_runtime_state(creator_ctx->state, 0); + prepare_load_runtime_state(current_ctx.state, 1); + + TOlapTableSchemaParam schema; + TTupleId tuple_id = 0; + int64_t index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(*creator_ctx, schema, tuple_id, index_id, false); + sink_test_utils::build_desc_tbl_and_schema(current_ctx, schema, tuple_id, index_id, false); + schema.indexes[0].__set_columns_desc({create_int_column_desc(false)}); + + TUniqueId load_id; + load_id.hi = 380; + load_id.lo = 1; + const auto partition = sink_test_utils::build_partition_param(index_id); + const auto location = sink_test_utils::build_location_param(); + const auto t_sink = create_vtablet_writer_sink(schema, partition, location, tuple_id, load_id); + + VExprContextSPtrs output_exprs; + auto creator_writer = std::make_unique(t_sink, output_exprs, nullptr, nullptr); + auto current_writer = std::make_unique(t_sink, output_exprs, nullptr, nullptr); + ASSERT_TRUE(creator_writer->_init(&creator_ctx->state, &creator_ctx->profile).ok()); + ASSERT_TRUE(current_writer->_init(¤t_ctx.state, ¤t_ctx.profile).ok()); + ASSERT_EQ(creator_writer->_delta_writer_for_tablet, current_writer->_delta_writer_for_tablet); + + const auto tablet_schema = create_int_tablet_schema(); + prepare_open_streams(creator_writer->_load_stream_map, 1, index_id, tablet_schema); + + auto block = std::make_shared(ColumnHelper::create_block({1})); + Rows rows; + rows.partition_id = 1; + rows.index_id = index_id; + rows.row_idxes.push_back(0); + + const auto first_write_status = creator_writer->_write_memtable(block, 100, rows); + ASSERT_TRUE(first_write_status.ok()) << first_write_status; + ASSERT_EQ(1, creator_writer->_delta_writer_for_tablet->size()); + + // The first write above creates the shared DeltaWriterV2 and stores + // creator_ctx->state in DeltaWriterV2::_state. Destroy the creator sink and + // its RuntimeState to reproduce the original lifetime boundary: another + // local sink can still reuse the shared writer after the creator state is + // gone. Do not call creator_writer->_cancel() here because it cancels the + // shared writer and would hide the dangling RuntimeState path. + creator_writer.reset(); + creator_ctx.reset(); + + // Force the current sink into DeltaWriterV2::write()'s flush-limit wait + // path, then cancel the current RuntimeState. Fixed code should observe the + // current sink's cancel state and exit cleanly; current broken code reads + // the destroyed creator RuntimeState from the shared DeltaWriterV2 and ASAN + // reports heap-use-after-free in the child process. + auto debug_point = std::make_shared(); + debug_point->execute_limit = 1; + debug_point->callback = std::function( + [&] { current_ctx.state.cancel(Status::Cancelled("current state cancelled")); }); + config::enable_debug_points = true; + DebugPoints::instance()->add("DeltaWriterV2.write.flush_limit_wait", debug_point); + config::memtable_flush_running_count_limit = 0; + + EXPECT_EXIT( + { + alarm(10); + auto status = current_writer->_write_memtable(block, 100, rows); + if (!status.ok() && + status.msg().find("current state cancelled") != std::string::npos) { + _exit(0); + } + _exit(1); + }, + ::testing::ExitedWithCode(0), ""); + + config::memtable_flush_running_count_limit = old_flush_running_count_limit; + DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait"); + + current_writer->_cancel(Status::Cancelled("test cleanup")); +} + TEST_F(TestVTabletWriterV2, one_replica) { UniqueId load_id; std::vector tablet_commit_infos; From e70d03df42de89d0c9a8702b54233257eaf3cd55 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 10 Jun 2026 11:43:54 +0800 Subject: [PATCH 2/2] [fix](be) Fix shared delta writer state lifetime ### What problem does this PR solve? Issue Number: None Problem Summary: Shared DeltaWriterV2 instances can be reused by multiple local sinks from the same load. Before this change the shared writer stored the RuntimeState pointer from the sink that first created it. If that creator sink finished and its RuntimeState was destroyed while another local sink continued to reuse the same shared writer, DeltaWriterV2::write() could access the destroyed RuntimeState while checking cancellation in the memtable flush-limit wait path, causing a BE crash or ASAN use-after-free. The fix removes the stored RuntimeState pointer from DeltaWriterV2. The writer now stores only the stable WorkloadGroup shared pointer needed by MemTableWriter initialization, and VTabletWriterV2 passes a per-call cancel checker for the current sink into DeltaWriterV2::write(). This keeps cancellation tied to the active caller and avoids retaining sink-local RuntimeState inside the shared writer. ### Release note Fix a possible BE crash when shared delta writers are reused by multiple local sinks. ### Check List (For Author) - Test: Unit Test - ./run-be-ut.sh --run --filter=TestVTabletWriterV2.shared_delta_writer_should_not_access_destroyed_creator_runtime_state -j 100 - ./run-be-ut.sh --run --filter=DeltaWriterV2PoolTest.* -j 100 - Behavior changed: Yes. Shared DeltaWriterV2 cancellation now uses the current sink's RuntimeState instead of the creator sink's RuntimeState. - Does this need documentation: No --- be/src/exec/sink/writer/vtablet_writer_v2.cpp | 13 ++++++++++-- be/src/load/delta_writer/delta_writer_v2.cpp | 20 ++++++++----------- be/src/load/delta_writer/delta_writer_v2.h | 10 ++++++---- .../delta_writer_v2_pool_test.cpp | 13 ++++++------ 4 files changed, 31 insertions(+), 25 deletions(-) diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp b/be/src/exec/sink/writer/vtablet_writer_v2.cpp index 06e35210635208..c5f212b5ad9279 100644 --- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp @@ -577,7 +577,11 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta << " not found in schema, load_id=" << print_id(_load_id); return std::unique_ptr(nullptr); } - return DeltaWriterV2::create_unique(&req, streams, _state); + std::shared_ptr workload_group = nullptr; + if (_state->get_query_ctx()) { + workload_group = _state->workload_group(); + } + return DeltaWriterV2::create_unique(&req, streams, workload_group); }); if (delta_writer == nullptr) { LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id @@ -593,7 +597,12 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta } } SCOPED_TIMER(_write_memtable_timer); - st = delta_writer->write(block.get(), rows.row_idxes); + st = delta_writer->write(block.get(), rows.row_idxes, [state = _state]() { + if (state->is_cancelled()) { + return state->cancel_reason(); + } + return Status::OK(); + }); return st; } diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp b/be/src/load/delta_writer/delta_writer_v2.cpp index de9945b69b094c..2074ff63761872 100644 --- a/be/src/load/delta_writer/delta_writer_v2.cpp +++ b/be/src/load/delta_writer/delta_writer_v2.cpp @@ -66,9 +66,9 @@ using namespace ErrorCode; DeltaWriterV2::DeltaWriterV2(WriteRequest* req, const std::vector>& streams, - RuntimeState* state) - : _state(state), - _req(*req), + std::shared_ptr workload_group) + : _req(*req), + _workload_group(std::move(workload_group)), _tablet_schema(new TabletSchema), _memtable_writer(new MemTableWriter(*req)), _streams(streams) {} @@ -129,19 +129,17 @@ Status DeltaWriterV2::init() { _rowset_writer = std::make_shared(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); - std::shared_ptr wg_sptr = nullptr; - if (_state->get_query_ctx()) { - wg_sptr = _state->get_query_ctx()->workload_group(); - } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, - wg_sptr, _streams[0]->enable_unique_mow(_req.index_id))); + _workload_group, + _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; _streams.clear(); return Status::OK(); } -Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs) { +Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs, + const std::function& cancel_check) { if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } @@ -158,9 +156,7 @@ Status DeltaWriterV2::write(const Block* block, const DorisVector& row { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) { DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait", DBUG_RUN_CALLBACK()); - if (_state->is_cancelled()) { - return _state->cancel_reason(); - } + RETURN_IF_ERROR(cancel_check()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } diff --git a/be/src/load/delta_writer/delta_writer_v2.h b/be/src/load/delta_writer/delta_writer_v2.h index d637c6afcbc2d1..bfdf0fd3a25e6c 100644 --- a/be/src/load/delta_writer/delta_writer_v2.h +++ b/be/src/load/delta_writer/delta_writer_v2.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -52,6 +53,7 @@ class SlotDescriptor; class OlapTableSchemaParam; class BetaRowsetWriterV2; class LoadStreamStub; +class WorkloadGroup; class Block; @@ -62,13 +64,14 @@ class DeltaWriterV2 { public: DeltaWriterV2(WriteRequest* req, const std::vector>& streams, - RuntimeState* state); + std::shared_ptr workload_group); ~DeltaWriterV2(); Status init(); - Status write(const Block* block, const DorisVector& row_idxs); + Status write(const Block* block, const DorisVector& row_idxs, + const std::function& cancel_check); // flush the last memtable to flush queue, must call it before close_wait() Status close(); @@ -88,11 +91,10 @@ class DeltaWriterV2 { void _update_profile(RuntimeProfile* profile); - RuntimeState* _state = nullptr; - bool _is_init = false; bool _is_cancelled = false; WriteRequest _req; + std::shared_ptr _workload_group; std::shared_ptr _rowset_writer; TabletSchemaSPtr _tablet_schema; bool _delta_written_success = false; diff --git a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp index 2b6ce8091bf6d9..159bc8b58c91d5 100644 --- a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp +++ b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp @@ -57,18 +57,17 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { auto map = pool.get_or_create(load_id); EXPECT_EQ(1, pool.size()); WriteRequest req; - RuntimeState state; - auto writer = map->get_or_create(100, [&req, &state]() { + auto writer = map->get_or_create(100, [&req]() { return std::make_unique( - &req, std::vector> {}, &state); + &req, std::vector> {}, nullptr); }); - auto writer2 = map->get_or_create(101, [&req, &state]() { + auto writer2 = map->get_or_create(101, [&req]() { return std::make_unique( - &req, std::vector> {}, &state); + &req, std::vector> {}, nullptr); }); - auto writer3 = map->get_or_create(100, [&req, &state]() { + auto writer3 = map->get_or_create(100, [&req]() { return std::make_unique( - &req, std::vector> {}, &state); + &req, std::vector> {}, nullptr); }); EXPECT_EQ(2, map->size()); EXPECT_EQ(writer, writer3);