Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions be/src/exec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,11 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t ta
<< " not found in schema, load_id=" << print_id(_load_id);
return std::unique_ptr<DeltaWriterV2>(nullptr);
}
return DeltaWriterV2::create_unique(&req, streams, _state);
std::shared_ptr<WorkloadGroup> workload_group = nullptr;
if (_state->get_query_ctx()) {
workload_group = _state->workload_group();
}
return DeltaWriterV2::create_unique(&req, streams, workload_group);
});
if (delta_writer == nullptr) {
LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
Expand All @@ -593,7 +597,12 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t ta
}
}
SCOPED_TIMER(_write_memtable_timer);
st = delta_writer->write(block.get(), rows.row_idxes);
st = delta_writer->write(block.get(), rows.row_idxes, [state = _state]() {
if (state->is_cancelled()) {
return state->cancel_reason();
}
return Status::OK();
});
return st;
}

Expand Down
21 changes: 9 additions & 12 deletions be/src/load/delta_writer/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ using namespace ErrorCode;

DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state)
: _state(state),
_req(*req),
std::shared_ptr<WorkloadGroup> workload_group)
: _req(*req),
_workload_group(std::move(workload_group)),
_tablet_schema(new TabletSchema),
_memtable_writer(new MemTableWriter(*req)),
_streams(streams) {}
Expand Down Expand Up @@ -129,19 +129,17 @@ Status DeltaWriterV2::init() {

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (_state->get_query_ctx()) {
wg_sptr = _state->get_query_ctx()->workload_group();
}
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
wg_sptr, _streams[0]->enable_unique_mow(_req.index_id)));
_workload_group,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
_streams.clear();
return Status::OK();
}

Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row_idxs,
const std::function<Status()>& cancel_check) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand All @@ -157,9 +155,8 @@ Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row
DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) {
if (_state->is_cancelled()) {
return _state->cancel_reason();
}
DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait", DBUG_RUN_CALLBACK());
RETURN_IF_ERROR(cancel_check());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
Expand Down
10 changes: 6 additions & 4 deletions be/src/load/delta_writer/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <stdint.h>

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <shared_mutex>
Expand Down Expand Up @@ -52,6 +53,7 @@ class SlotDescriptor;
class OlapTableSchemaParam;
class BetaRowsetWriterV2;
class LoadStreamStub;
class WorkloadGroup;

class Block;

Expand All @@ -62,13 +64,14 @@ class DeltaWriterV2 {

public:
DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
RuntimeState* state);
std::shared_ptr<WorkloadGroup> workload_group);

~DeltaWriterV2();

Status init();

Status write(const Block* block, const DorisVector<uint32_t>& row_idxs);
Status write(const Block* block, const DorisVector<uint32_t>& row_idxs,
const std::function<Status()>& cancel_check);

// flush the last memtable to flush queue, must call it before close_wait()
Status close();
Expand All @@ -88,11 +91,10 @@ class DeltaWriterV2 {

void _update_profile(RuntimeProfile* profile);

RuntimeState* _state = nullptr;

bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
std::shared_ptr<WorkloadGroup> _workload_group;
std::shared_ptr<BetaRowsetWriterV2> _rowset_writer;
TabletSchemaSPtr _tablet_schema;
bool _delta_written_success = false;
Expand Down
218 changes: 218 additions & 0 deletions be/test/exec/sink/vtablet_writer_v2_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,28 @@
#include "exec/sink/writer/vtablet_writer_v2.h"

#include <gtest/gtest.h>
#include <unistd.h>

#include <memory>
#include <string>
#include <vector>

#include "common/config.h"
#include "core/data_type/data_type_number.h"
#include "exec/operator/operator_helper.h"
#include "exec/sink/delta_writer_v2_pool.h"
#include "exec/sink/load_stream_map_pool.h"
#include "exec/sink/load_stream_stub.h"
#include "exec/sink/sink_test_utils.h"
#include "io/fs/local_file_system.h"
#include "load/memtable/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet_schema.h"
#include "testutil/column_helper.h"
#include "testutil/creators.h"
#include "util/debug_points.h"
#include "util/defer_op.h"

namespace doris {

Expand Down Expand Up @@ -63,6 +82,205 @@ static std::unique_ptr<VTabletWriterV2> create_vtablet_writer(int num_replicas =
return writer;
}

static TColumn create_int_column_desc(bool is_nullable) {
auto column = testutil::create_tablet_column({"c1", TPrimitiveType::INT, true});
column.__set_is_allow_null(is_nullable);
column.__set_col_unique_id(1);
return column;
}

static void prepare_load_runtime_state(MockRuntimeState& state, int sender_id) {
state.set_backend_id(1);
state.set_per_fragment_instance_idx(sender_id);
state.set_num_per_fragment_instances(2);
state.set_load_stream_per_node(1);
state.set_total_load_streams(2);
state.set_num_local_sink(2);
}

static void prepare_open_streams(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t node_id,
int64_t index_id, const TabletSchemaSPtr& tablet_schema) {
auto streams = load_stream_map->get_or_create(node_id);
streams->mark_open();
for (auto& stream : streams->streams()) {
stream->_is_open.store(true);
stream->_status = Status::OK();
stream->_tablet_schema_for_index->emplace(index_id, tablet_schema);
stream->_enable_unique_mow_for_index->emplace(index_id, false);
}
}

static TabletSchemaSPtr create_int_tablet_schema() {
TabletSchemaPB tablet_schema_pb;
tablet_schema_pb.set_keys_type(DUP_KEYS);
tablet_schema_pb.set_num_short_key_columns(1);
tablet_schema_pb.set_num_rows_per_row_block(1024);
tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
tablet_schema_pb.set_next_column_unique_id(2);
testutil::add_column_pb(&tablet_schema_pb, 1, "c1", "INT", true, false);

auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(tablet_schema_pb);
return tablet_schema;
}

static TDataSink create_vtablet_writer_sink(const TOlapTableSchemaParam& schema,
const TOlapTablePartitionParam& partition,
const TOlapTableLocationParam& location,
TTupleId tuple_id, const TUniqueId& load_id) {
TDataSink t_sink;
t_sink.__isset.olap_table_sink = true;
auto& olap_sink = t_sink.olap_table_sink;
olap_sink.__set_load_id(load_id);
olap_sink.__set_txn_id(1);
olap_sink.__set_db_id(schema.db_id);
olap_sink.__set_table_id(schema.table_id);
olap_sink.__set_tuple_id(tuple_id);
olap_sink.__set_num_replicas(1);
olap_sink.__set_need_gen_rollup(false);
olap_sink.__set_schema(schema);
olap_sink.__set_partition(partition);
olap_sink.__set_location(location);

TNodeInfo node;
node.__set_id(1);
node.__set_option(0);
node.__set_host("127.0.0.1");
node.__set_async_internal_port(8060);
TPaloNodesInfo nodes;
nodes.nodes.push_back(node);
olap_sink.__set_nodes_info(nodes);
return t_sink;
}

TEST_F(TestVTabletWriterV2, shared_delta_writer_should_not_access_destroyed_creator_runtime_state) {
const bool old_share_delta_writers = config::share_delta_writers;
const int32_t old_flush_running_count_limit = config::memtable_flush_running_count_limit;
const bool old_enable_debug_points = config::enable_debug_points;
config::share_delta_writers = true;
Defer restore_configs([&] {
config::share_delta_writers = old_share_delta_writers;
config::memtable_flush_running_count_limit = old_flush_running_count_limit;
config::enable_debug_points = old_enable_debug_points;
DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait");
});

ExecEnv* exec_env = ExecEnv::GetInstance();
auto old_load_stream_map_pool = std::move(exec_env->_load_stream_map_pool);
auto old_delta_writer_v2_pool = std::move(exec_env->_delta_writer_v2_pool);
auto old_memtable_memory_limiter = std::move(exec_env->_memtable_memory_limiter);
auto old_storage_engine = std::move(exec_env->_storage_engine);
const std::string old_storage_root_path = config::storage_root_path;
char cwd_buffer[1024];
ASSERT_NE(nullptr, getcwd(cwd_buffer, sizeof(cwd_buffer)));
const std::string test_data_dir =
std::string(cwd_buffer) + "/vtablet_writer_v2_shared_delta_writer_test";
Defer restore_exec_env([&]() mutable {
exec_env->_delta_writer_v2_pool.reset();
exec_env->_load_stream_map_pool.reset();
exec_env->_storage_engine.reset();
exec_env->_memtable_memory_limiter.reset();
exec_env->_storage_engine = std::move(old_storage_engine);
exec_env->_memtable_memory_limiter = std::move(old_memtable_memory_limiter);
exec_env->_delta_writer_v2_pool = std::move(old_delta_writer_v2_pool);
exec_env->_load_stream_map_pool = std::move(old_load_stream_map_pool);
config::storage_root_path = old_storage_root_path;
static_cast<void>(io::global_local_filesystem()->delete_directory(test_data_dir));
});

config::storage_root_path = test_data_dir;
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(test_data_dir).ok());
ASSERT_TRUE(io::global_local_filesystem()->create_directory(test_data_dir).ok());
EngineOptions options;
options.store_paths.emplace_back(test_data_dir, -1);
auto engine = std::make_unique<StorageEngine>(options);
ASSERT_TRUE(engine->open().ok());
exec_env->_storage_engine = std::move(engine);
auto memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
ASSERT_TRUE(memtable_memory_limiter->init(1024 * 1024 * 1024).ok());
exec_env->_memtable_memory_limiter = std::move(memtable_memory_limiter);
exec_env->_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
exec_env->_delta_writer_v2_pool = std::make_unique<DeltaWriterV2Pool>();

auto creator_ctx = std::make_unique<OperatorContext>();
OperatorContext current_ctx;
prepare_load_runtime_state(creator_ctx->state, 0);
prepare_load_runtime_state(current_ctx.state, 1);

TOlapTableSchemaParam schema;
TTupleId tuple_id = 0;
int64_t index_id = 0;
sink_test_utils::build_desc_tbl_and_schema(*creator_ctx, schema, tuple_id, index_id, false);
sink_test_utils::build_desc_tbl_and_schema(current_ctx, schema, tuple_id, index_id, false);
schema.indexes[0].__set_columns_desc({create_int_column_desc(false)});

TUniqueId load_id;
load_id.hi = 380;
load_id.lo = 1;
const auto partition = sink_test_utils::build_partition_param(index_id);
const auto location = sink_test_utils::build_location_param();
const auto t_sink = create_vtablet_writer_sink(schema, partition, location, tuple_id, load_id);

VExprContextSPtrs output_exprs;
auto creator_writer = std::make_unique<VTabletWriterV2>(t_sink, output_exprs, nullptr, nullptr);
auto current_writer = std::make_unique<VTabletWriterV2>(t_sink, output_exprs, nullptr, nullptr);
ASSERT_TRUE(creator_writer->_init(&creator_ctx->state, &creator_ctx->profile).ok());
ASSERT_TRUE(current_writer->_init(&current_ctx.state, &current_ctx.profile).ok());
ASSERT_EQ(creator_writer->_delta_writer_for_tablet, current_writer->_delta_writer_for_tablet);

const auto tablet_schema = create_int_tablet_schema();
prepare_open_streams(creator_writer->_load_stream_map, 1, index_id, tablet_schema);

auto block = std::make_shared<Block>(ColumnHelper::create_block<DataTypeInt32>({1}));
Rows rows;
rows.partition_id = 1;
rows.index_id = index_id;
rows.row_idxes.push_back(0);

const auto first_write_status = creator_writer->_write_memtable(block, 100, rows);
ASSERT_TRUE(first_write_status.ok()) << first_write_status;
ASSERT_EQ(1, creator_writer->_delta_writer_for_tablet->size());

// The first write above creates the shared DeltaWriterV2 and stores
// creator_ctx->state in DeltaWriterV2::_state. Destroy the creator sink and
// its RuntimeState to reproduce the original lifetime boundary: another
// local sink can still reuse the shared writer after the creator state is
// gone. Do not call creator_writer->_cancel() here because it cancels the
// shared writer and would hide the dangling RuntimeState path.
creator_writer.reset();
creator_ctx.reset();

// Force the current sink into DeltaWriterV2::write()'s flush-limit wait
// path, then cancel the current RuntimeState. Fixed code should observe the
// current sink's cancel state and exit cleanly; current broken code reads
// the destroyed creator RuntimeState from the shared DeltaWriterV2 and ASAN
// reports heap-use-after-free in the child process.
auto debug_point = std::make_shared<DebugPoint>();
debug_point->execute_limit = 1;
debug_point->callback = std::function<void()>(
[&] { current_ctx.state.cancel(Status::Cancelled("current state cancelled")); });
config::enable_debug_points = true;
DebugPoints::instance()->add("DeltaWriterV2.write.flush_limit_wait", debug_point);
config::memtable_flush_running_count_limit = 0;

EXPECT_EXIT(
{
alarm(10);
auto status = current_writer->_write_memtable(block, 100, rows);
if (!status.ok() &&
status.msg().find("current state cancelled") != std::string::npos) {
_exit(0);
}
_exit(1);
},
::testing::ExitedWithCode(0), "");

config::memtable_flush_running_count_limit = old_flush_running_count_limit;
DebugPoints::instance()->remove("DeltaWriterV2.write.flush_limit_wait");

current_writer->_cancel(Status::Cancelled("test cleanup"));
}

TEST_F(TestVTabletWriterV2, one_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
Expand Down
13 changes: 6 additions & 7 deletions be/test/load/delta_writer/delta_writer_v2_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,17 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
auto map = pool.get_or_create(load_id);
EXPECT_EQ(1, pool.size());
WriteRequest req;
RuntimeState state;
auto writer = map->get_or_create(100, [&req, &state]() {
auto writer = map->get_or_create(100, [&req]() {
return std::make_unique<DeltaWriterV2>(
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, nullptr);
});
auto writer2 = map->get_or_create(101, [&req, &state]() {
auto writer2 = map->get_or_create(101, [&req]() {
return std::make_unique<DeltaWriterV2>(
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, nullptr);
});
auto writer3 = map->get_or_create(100, [&req, &state]() {
auto writer3 = map->get_or_create(100, [&req]() {
return std::make_unique<DeltaWriterV2>(
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, &state);
&req, std::vector<std::shared_ptr<LoadStreamStub>> {}, nullptr);
});
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
Expand Down
Loading