diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp b/be/src/exec/sink/writer/vtablet_writer_v2.cpp index 06e35210635208..c5f212b5ad9279 100644 --- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp @@ -577,7 +577,11 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta << " not found in schema, load_id=" << print_id(_load_id); return std::unique_ptr(nullptr); } - return DeltaWriterV2::create_unique(&req, streams, _state); + std::shared_ptr workload_group = nullptr; + if (_state->get_query_ctx()) { + workload_group = _state->workload_group(); + } + return DeltaWriterV2::create_unique(&req, streams, workload_group); }); if (delta_writer == nullptr) { LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id @@ -593,7 +597,12 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t ta } } SCOPED_TIMER(_write_memtable_timer); - st = delta_writer->write(block.get(), rows.row_idxes); + st = delta_writer->write(block.get(), rows.row_idxes, [state = _state]() { + if (state->is_cancelled()) { + return state->cancel_reason(); + } + return Status::OK(); + }); return st; } diff --git a/be/src/load/delta_writer/delta_writer_v2.cpp b/be/src/load/delta_writer/delta_writer_v2.cpp index c117e2b3180799..2074ff63761872 100644 --- a/be/src/load/delta_writer/delta_writer_v2.cpp +++ b/be/src/load/delta_writer/delta_writer_v2.cpp @@ -66,9 +66,9 @@ using namespace ErrorCode; DeltaWriterV2::DeltaWriterV2(WriteRequest* req, const std::vector>& streams, - RuntimeState* state) - : _state(state), - _req(*req), + std::shared_ptr workload_group) + : _req(*req), + _workload_group(std::move(workload_group)), _tablet_schema(new TabletSchema), _memtable_writer(new MemTableWriter(*req)), _streams(streams) {} @@ -129,19 +129,17 @@ Status DeltaWriterV2::init() { _rowset_writer = std::make_shared(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); - std::shared_ptr wg_sptr = nullptr; - if (_state->get_query_ctx()) { - wg_sptr = _state->get_query_ctx()->workload_group(); - } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, - wg_sptr, _streams[0]->enable_unique_mow(_req.index_id))); + _workload_group, + _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; _streams.clear(); return Status::OK(); } -Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs) { +Status DeltaWriterV2::write(const Block* block, const DorisVector& row_idxs, + const std::function& cancel_check) { if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } @@ -157,9 +155,8 @@ Status DeltaWriterV2::write(const Block* block, const DorisVector& row DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure", { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) { - if (_state->is_cancelled()) { - return _state->cancel_reason(); - } + DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait", DBUG_RUN_CALLBACK()); + RETURN_IF_ERROR(cancel_check()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } diff --git a/be/src/load/delta_writer/delta_writer_v2.h b/be/src/load/delta_writer/delta_writer_v2.h index d637c6afcbc2d1..bfdf0fd3a25e6c 100644 --- a/be/src/load/delta_writer/delta_writer_v2.h +++ b/be/src/load/delta_writer/delta_writer_v2.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -52,6 +53,7 @@ class SlotDescriptor; class OlapTableSchemaParam; class BetaRowsetWriterV2; class LoadStreamStub; +class WorkloadGroup; class Block; @@ -62,13 +64,14 @@ class DeltaWriterV2 { public: DeltaWriterV2(WriteRequest* req, const std::vector>& streams, - RuntimeState* state); + std::shared_ptr workload_group); ~DeltaWriterV2(); Status init(); - Status write(const Block* block, const DorisVector& row_idxs); + Status write(const Block* block, const DorisVector& row_idxs, + const std::function& cancel_check); // flush the last memtable to flush queue, must call it before close_wait() Status close(); @@ -88,11 +91,10 @@ class DeltaWriterV2 { void _update_profile(RuntimeProfile* profile); - RuntimeState* _state = nullptr; - bool _is_init = false; bool _is_cancelled = false; WriteRequest _req; + std::shared_ptr _workload_group; std::shared_ptr _rowset_writer; TabletSchemaSPtr _tablet_schema; bool _delta_written_success = false; diff --git a/be/test/exec/sink/vtablet_writer_v2_test.cpp b/be/test/exec/sink/vtablet_writer_v2_test.cpp index d62885d13959dd..8f09a3cb08f61b 100644 --- a/be/test/exec/sink/vtablet_writer_v2_test.cpp +++ b/be/test/exec/sink/vtablet_writer_v2_test.cpp @@ -18,9 +18,28 @@ #include "exec/sink/writer/vtablet_writer_v2.h" #include +#include +#include +#include +#include + +#include "common/config.h" +#include "core/data_type/data_type_number.h" +#include "exec/operator/operator_helper.h" +#include "exec/sink/delta_writer_v2_pool.h" #include "exec/sink/load_stream_map_pool.h" #include "exec/sink/load_stream_stub.h" +#include "exec/sink/sink_test_utils.h" +#include "io/fs/local_file_system.h" +#include "load/memtable/memtable_memory_limiter.h" +#include "runtime/exec_env.h" +#include "storage/storage_engine.h" +#include "storage/tablet/tablet_schema.h" +#include "testutil/column_helper.h" +#include "testutil/creators.h" +#include "util/debug_points.h" +#include "util/defer_op.h" namespace doris { @@ -63,6 +82,205 @@ static std::unique_ptr create_vtablet_writer(int num_replicas = return writer; } +static TColumn create_int_column_desc(bool is_nullable) { + auto column = testutil::create_tablet_column({"c1", TPrimitiveType::INT, true}); + column.__set_is_allow_null(is_nullable); + column.__set_col_unique_id(1); + return column; +} + +static void prepare_load_runtime_state(MockRuntimeState& state, int sender_id) { + state.set_backend_id(1); + state.set_per_fragment_instance_idx(sender_id); + state.set_num_per_fragment_instances(2); + state.set_load_stream_per_node(1); + state.set_total_load_streams(2); + state.set_num_local_sink(2); +} + +static void prepare_open_streams(std::shared_ptr load_stream_map, int64_t node_id, + int64_t index_id, const TabletSchemaSPtr& tablet_schema) { + auto streams = load_stream_map->get_or_create(node_id); + streams->mark_open(); + for (auto& stream : streams->streams()) { + stream->_is_open.store(true); + stream->_status = Status::OK(); + stream->_tablet_schema_for_index->emplace(index_id, tablet_schema); + stream->_enable_unique_mow_for_index->emplace(index_id, false); + } +} + +static TabletSchemaSPtr create_int_tablet_schema() { + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(1); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(2); + testutil::add_column_pb(&tablet_schema_pb, 1, "c1", "INT", true, false); + + auto tablet_schema = std::make_shared(); + tablet_schema->init_from_pb(tablet_schema_pb); + return tablet_schema; +} + +static TDataSink create_vtablet_writer_sink(const TOlapTableSchemaParam& schema, + const TOlapTablePartitionParam& partition, + const TOlapTableLocationParam& location, + TTupleId tuple_id, const TUniqueId& load_id) { + TDataSink t_sink; + t_sink.__isset.olap_table_sink = true; + auto& olap_sink = t_sink.olap_table_sink; + olap_sink.__set_load_id(load_id); + olap_sink.__set_txn_id(1); + olap_sink.__set_db_id(schema.db_id); + olap_sink.__set_table_id(schema.table_id); + olap_sink.__set_tuple_id(tuple_id); + olap_sink.__set_num_replicas(1); + olap_sink.__set_need_gen_rollup(false); + olap_sink.__set_schema(schema); + olap_sink.__set_partition(partition); + olap_sink.__set_location(location); + + TNodeInfo node; + node.__set_id(1); + node.__set_option(0); + node.__set_host("127.0.0.1"); + node.__set_async_internal_port(8060); + TPaloNodesInfo nodes; + nodes.nodes.push_back(node); + olap_sink.__set_nodes_info(nodes); + return t_sink; +} + +TEST_F(TestVTabletWriterV2, shared_delta_writer_should_not_access_destroyed_creator_runtime_state) { + const bool old_share_delta_writers = config::share_delta_writers; + const int32_t old_flush_running_count_limit = config::memtable_flush_running_count_limit; + const bool old_enable_debug_points = config::enable_debug_points; + config::share_delta_writers = true; + Defer restore_configs([&] { + config::share_delta_writers = old_share_delta_writers; + config::memtable_flush_running_count_limit = old_flush_running_count_limit; + config::enable_debug_points = old_enable_debug_points; + DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait"); + }); + + ExecEnv* exec_env = ExecEnv::GetInstance(); + auto old_load_stream_map_pool = std::move(exec_env->_load_stream_map_pool); + auto old_delta_writer_v2_pool = std::move(exec_env->_delta_writer_v2_pool); + auto old_memtable_memory_limiter = std::move(exec_env->_memtable_memory_limiter); + auto old_storage_engine = std::move(exec_env->_storage_engine); + const std::string old_storage_root_path = config::storage_root_path; + char cwd_buffer[1024]; + ASSERT_NE(nullptr, getcwd(cwd_buffer, sizeof(cwd_buffer))); + const std::string test_data_dir = + std::string(cwd_buffer) + "/vtablet_writer_v2_shared_delta_writer_test"; + Defer restore_exec_env([&]() mutable { + exec_env->_delta_writer_v2_pool.reset(); + exec_env->_load_stream_map_pool.reset(); + exec_env->_storage_engine.reset(); + exec_env->_memtable_memory_limiter.reset(); + exec_env->_storage_engine = std::move(old_storage_engine); + exec_env->_memtable_memory_limiter = std::move(old_memtable_memory_limiter); + exec_env->_delta_writer_v2_pool = std::move(old_delta_writer_v2_pool); + exec_env->_load_stream_map_pool = std::move(old_load_stream_map_pool); + config::storage_root_path = old_storage_root_path; + static_cast(io::global_local_filesystem()->delete_directory(test_data_dir)); + }); + + config::storage_root_path = test_data_dir; + ASSERT_TRUE(io::global_local_filesystem()->delete_directory(test_data_dir).ok()); + ASSERT_TRUE(io::global_local_filesystem()->create_directory(test_data_dir).ok()); + EngineOptions options; + options.store_paths.emplace_back(test_data_dir, -1); + auto engine = std::make_unique(options); + ASSERT_TRUE(engine->open().ok()); + exec_env->_storage_engine = std::move(engine); + auto memtable_memory_limiter = std::make_unique(); + ASSERT_TRUE(memtable_memory_limiter->init(1024 * 1024 * 1024).ok()); + exec_env->_memtable_memory_limiter = std::move(memtable_memory_limiter); + exec_env->_load_stream_map_pool = std::make_unique(); + exec_env->_delta_writer_v2_pool = std::make_unique(); + + auto creator_ctx = std::make_unique(); + OperatorContext current_ctx; + prepare_load_runtime_state(creator_ctx->state, 0); + prepare_load_runtime_state(current_ctx.state, 1); + + TOlapTableSchemaParam schema; + TTupleId tuple_id = 0; + int64_t index_id = 0; + sink_test_utils::build_desc_tbl_and_schema(*creator_ctx, schema, tuple_id, index_id, false); + sink_test_utils::build_desc_tbl_and_schema(current_ctx, schema, tuple_id, index_id, false); + schema.indexes[0].__set_columns_desc({create_int_column_desc(false)}); + + TUniqueId load_id; + load_id.hi = 380; + load_id.lo = 1; + const auto partition = sink_test_utils::build_partition_param(index_id); + const auto location = sink_test_utils::build_location_param(); + const auto t_sink = create_vtablet_writer_sink(schema, partition, location, tuple_id, load_id); + + VExprContextSPtrs output_exprs; + auto creator_writer = std::make_unique(t_sink, output_exprs, nullptr, nullptr); + auto current_writer = std::make_unique(t_sink, output_exprs, nullptr, nullptr); + ASSERT_TRUE(creator_writer->_init(&creator_ctx->state, &creator_ctx->profile).ok()); + ASSERT_TRUE(current_writer->_init(¤t_ctx.state, ¤t_ctx.profile).ok()); + ASSERT_EQ(creator_writer->_delta_writer_for_tablet, current_writer->_delta_writer_for_tablet); + + const auto tablet_schema = create_int_tablet_schema(); + prepare_open_streams(creator_writer->_load_stream_map, 1, index_id, tablet_schema); + + auto block = std::make_shared(ColumnHelper::create_block({1})); + Rows rows; + rows.partition_id = 1; + rows.index_id = index_id; + rows.row_idxes.push_back(0); + + const auto first_write_status = creator_writer->_write_memtable(block, 100, rows); + ASSERT_TRUE(first_write_status.ok()) << first_write_status; + ASSERT_EQ(1, creator_writer->_delta_writer_for_tablet->size()); + + // The first write above creates the shared DeltaWriterV2 and stores + // creator_ctx->state in DeltaWriterV2::_state. Destroy the creator sink and + // its RuntimeState to reproduce the original lifetime boundary: another + // local sink can still reuse the shared writer after the creator state is + // gone. Do not call creator_writer->_cancel() here because it cancels the + // shared writer and would hide the dangling RuntimeState path. + creator_writer.reset(); + creator_ctx.reset(); + + // Force the current sink into DeltaWriterV2::write()'s flush-limit wait + // path, then cancel the current RuntimeState. Fixed code should observe the + // current sink's cancel state and exit cleanly; current broken code reads + // the destroyed creator RuntimeState from the shared DeltaWriterV2 and ASAN + // reports heap-use-after-free in the child process. + auto debug_point = std::make_shared(); + debug_point->execute_limit = 1; + debug_point->callback = std::function( + [&] { current_ctx.state.cancel(Status::Cancelled("current state cancelled")); }); + config::enable_debug_points = true; + DebugPoints::instance()->add("DeltaWriterV2.write.flush_limit_wait", debug_point); + config::memtable_flush_running_count_limit = 0; + + EXPECT_EXIT( + { + alarm(10); + auto status = current_writer->_write_memtable(block, 100, rows); + if (!status.ok() && + status.msg().find("current state cancelled") != std::string::npos) { + _exit(0); + } + _exit(1); + }, + ::testing::ExitedWithCode(0), ""); + + config::memtable_flush_running_count_limit = old_flush_running_count_limit; + DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait"); + + current_writer->_cancel(Status::Cancelled("test cleanup")); +} + TEST_F(TestVTabletWriterV2, one_replica) { UniqueId load_id; std::vector tablet_commit_infos; diff --git a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp index 2b6ce8091bf6d9..159bc8b58c91d5 100644 --- a/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp +++ b/be/test/load/delta_writer/delta_writer_v2_pool_test.cpp @@ -57,18 +57,17 @@ TEST_F(DeltaWriterV2PoolTest, test_map) { auto map = pool.get_or_create(load_id); EXPECT_EQ(1, pool.size()); WriteRequest req; - RuntimeState state; - auto writer = map->get_or_create(100, [&req, &state]() { + auto writer = map->get_or_create(100, [&req]() { return std::make_unique( - &req, std::vector> {}, &state); + &req, std::vector> {}, nullptr); }); - auto writer2 = map->get_or_create(101, [&req, &state]() { + auto writer2 = map->get_or_create(101, [&req]() { return std::make_unique( - &req, std::vector> {}, &state); + &req, std::vector> {}, nullptr); }); - auto writer3 = map->get_or_create(100, [&req, &state]() { + auto writer3 = map->get_or_create(100, [&req]() { return std::make_unique( - &req, std::vector> {}, &state); + &req, std::vector> {}, nullptr); }); EXPECT_EQ(2, map->size()); EXPECT_EQ(writer, writer3);