Skip to content

Commit

Permalink
[Enhancement] Adjust base version before publish version (#41491)
Browse files Browse the repository at this point in the history
Signed-off-by: Zijie Lu <[email protected]>
  • Loading branch information
TszKitLo40 authored Feb 23, 2024
1 parent 40c9893 commit bcce25e
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 24 deletions.
74 changes: 73 additions & 1 deletion be/src/storage/lake/transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@
#include "storage/lake/vacuum.h" // delete_files_async
#include "util/lru_cache.h"

namespace {

template <class T>
using ParallelSet = phmap::parallel_flat_hash_set<T, phmap::priv::hash_default_hash<T>, phmap::priv::hash_default_eq<T>,
phmap::priv::Allocator<T>, 4, std::mutex, true>;
ParallelSet<int64_t> tablet_txns;

bool add_tablet(int64_t tablet_id) {
auto [_, ok] = tablet_txns.insert(tablet_id);
return ok;
}

void remove_tablet(int64_t tablet_id) {
tablet_txns.erase(tablet_id);
}

} // namespace

namespace starrocks::lake {

static void clear_remote_snapshot_async(TabletManager* tablet_mgr, int64_t tablet_id, int64_t txn_id,
Expand All @@ -53,9 +71,43 @@ static void clear_remote_snapshot_async(TabletManager* tablet_mgr, int64_t table
files_to_delete->emplace_back(std::move(slog_path));
}

int64_t cal_new_base_version(int64_t tablet_id, TabletManager* tablet_mgr, int64_t base_version, int64_t new_version) {
int64_t version = base_version;
auto metadata = tablet_mgr->get_latest_cached_tablet_metadata(tablet_id);
if (metadata != nullptr && metadata->version() <= new_version) {
version = std::max(version, metadata->version());
}

auto index_version = tablet_mgr->update_mgr()->get_primary_index_data_version(tablet_id);
if (index_version > new_version) {
tablet_mgr->update_mgr()->unload_and_remove_primary_index(tablet_id);
return version;
}
if (index_version > version) {
// There is a possibility that the index version is newer than the version in remote storage.
// Check whether the index version exists in remote storage. If not, clear and rebuild the index.
auto res = tablet_mgr->get_tablet_metadata(tablet_id, index_version);
if (res.ok()) {
version = index_version;
} else {
tablet_mgr->update_mgr()->unload_and_remove_primary_index(tablet_id);
}
}

return version;
}

StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t tablet_id, int64_t base_version,
int64_t new_version, std::span<const int64_t> txn_ids,
int64_t commit_time) {
if (!add_tablet(tablet_id)) {
return Status::ResourceBusy(
fmt::format("The previous publish version task for tablet {} has not finished. You can ignore this "
"error and the task will retry later.",
tablet_id));
}
DeferOp remove_tablet_txn([&] { remove_tablet(tablet_id); });

if (txn_ids.size() > 1) {
CHECK_EQ(new_version, base_version + txn_ids.size());
}
Expand All @@ -78,6 +130,25 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
return error;
};

int64_t ori_base_version = base_version;
int64_t new_base_version = cal_new_base_version(tablet_id, tablet_mgr, base_version, new_version);
if (new_base_version > base_version) {
LOG(INFO) << "Base version has been adjusted. tablet_id=" << tablet_id << " base_version=" << base_version
<< " new_base_version=" << new_base_version << " new_version=" << new_version
<< " txn_ids=" << JoinInts(txn_ids, ",");
base_version = new_base_version;
}

if (base_version > new_version) {
LOG(ERROR) << "base version should be less than or equal to new version, "
<< "base version=" << base_version << ", new version=" << new_version << ", tablet_id=" << tablet_id;
return Status::InternalError("base version is larger than new version");
}

if (base_version == new_version) {
return tablet_mgr->get_tablet_metadata(tablet_id, base_version);
}

// Read base version metadata
auto base_version_path = tablet_mgr->tablet_metadata_location(tablet_id, base_version);
auto base_metadata_or = tablet_mgr->get_tablet_metadata(base_version_path, false);
Expand Down Expand Up @@ -109,7 +180,8 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
// 5. txn4 will be published in later publish task, but we can't judge what's the latest_version in BE and we can not reapply txn_log if
// txn logs have been deleted.
bool delete_txn_log = (txn_ids.size() == 1);
for (int i = 0; i < txn_ids.size(); i++) {
int txn_offset = base_version - ori_base_version;
for (size_t i = txn_offset; i < txn_ids.size(); i++) {
auto txn_id = txn_ids[i];
auto log_path = tablet_mgr->txn_log_location(tablet_id, txn_id);
auto txn_log_st = tablet_mgr->get_txn_log(log_path, false);
Expand Down
25 changes: 2 additions & 23 deletions be/src/storage/lake/txn_log_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@

namespace starrocks::lake {
class PrimaryKeyTxnLogApplier : public TxnLogApplier {
template <class T>
using ParallelSet =
phmap::parallel_flat_hash_set<T, phmap::priv::hash_default_hash<T>, phmap::priv::hash_default_eq<T>,
phmap::priv::Allocator<T>, 4, std::mutex, true>;

public:
PrimaryKeyTxnLogApplier(Tablet tablet, MutableTabletMetadataPtr metadata, int64_t new_version)
: _tablet(tablet),
Expand All @@ -46,22 +41,9 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
_metadata->set_version(_new_version);
}

~PrimaryKeyTxnLogApplier() override {
handle_failure();
if (_inited) {
_s_schema_change_set.erase(_tablet.id());
}
}
~PrimaryKeyTxnLogApplier() override { handle_failure(); }

Status init() override {
auto [iter, ok] = _s_schema_change_set.insert(_tablet.id());
if (ok) {
_inited = true;
return check_meta_version();
} else {
return Status::InternalError("primary key does not support concurrent log applying");
}
}
Status init() override { return check_meta_version(); }

Status check_meta_version() {
// check tablet meta
Expand Down Expand Up @@ -303,16 +285,13 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
return Status::OK();
}

static inline ParallelSet<int64_t> _s_schema_change_set;

Tablet _tablet;
MutableTabletMetadataPtr _metadata;
int64_t _base_version{0};
int64_t _new_version{0};
int64_t _max_txn_id{0}; // Used as the file name prefix of the delvec file
MetaFileBuilder _builder;
DynamicCache<uint64_t, LakePrimaryIndex>::Entry* _index_entry{nullptr};
bool _inited{false};
std::unique_ptr<std::lock_guard<std::mutex>> _guard{nullptr};
// True when finalize meta file success.
bool _has_finalized = false;
Expand Down
21 changes: 21 additions & 0 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ void UpdateManager::remove_primary_index_cache(IndexEntry* index_entry) {
}
}

void UpdateManager::unload_and_remove_primary_index(int64_t tablet_id) {
auto index_entry = _index_cache.get(tablet_id);
if (index_entry != nullptr) {
auto& index = index_entry->value();
auto guard = index.fetch_guard();
index.unload();
guard.reset(nullptr);
_index_cache.remove(index_entry);
}
}

// |metadata| contain last tablet meta info with new version
Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_write, int64_t txn_id,
const TabletMetadata& metadata, Tablet* tablet,
Expand Down Expand Up @@ -688,6 +699,16 @@ void UpdateManager::update_primary_index_data_version(const Tablet& tablet, int6
}
}

int64_t UpdateManager::get_primary_index_data_version(int64_t tablet_id) {
auto index_entry = _index_cache.get(tablet_id);
if (index_entry != nullptr) {
int64_t version = index_entry->value().data_version();
_index_cache.release(index_entry);
return version;
}
return 0;
}

void UpdateManager::_print_memory_stats() {
static std::atomic<int64_t> last_print_ts;
if (time(nullptr) > last_print_ts.load() + kPrintMemoryStatsInterval && _update_mem_tracker != nullptr) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class UpdateManager {
// update primary index data version when meta file finalize success.
void update_primary_index_data_version(const Tablet& tablet, int64_t version);

int64_t get_primary_index_data_version(int64_t tablet_id);

void expire_cache();

void evict_cache(int64_t memory_urgent_level, int64_t memory_high_level);
Expand Down Expand Up @@ -141,6 +143,8 @@ class UpdateManager {
// remove index entry if it isn't nullptr
void remove_primary_index_cache(IndexEntry* index_entry);

void unload_and_remove_primary_index(int64_t tablet_id);

DynamicCache<uint64_t, LakePrimaryIndex>& index_cache() { return _index_cache; }

void lock_shard_pk_index_shard(int64_t tablet_id) { _get_pk_index_shard_lock(tablet_id).lock_shared(); }
Expand Down
136 changes: 136 additions & 0 deletions be/test/storage/lake/primary_key_publish_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,142 @@ TEST_P(LakePrimaryKeyPublishTest, test_batch_publish) {
EXPECT_EQ(0, read_rows(tablet_id, new_version));
}

TEST_P(LakePrimaryKeyPublishTest, test_batch_publish_1) {
auto [chunk0, indexes0] = gen_data_and_index(kChunkSize, 0, false, true);
auto [chunk1, indexes1] = gen_data_and_index(kChunkSize, 0, false, true);
auto [chunk2, indexes2] = gen_data_and_index(kChunkSize, 0, false, true);
auto base_version = 1;
auto tablet_id = _tablet_metadata->id();
std::vector<int64_t> txn_ids;
auto txn_id = next_id();
txn_ids.emplace_back(txn_id);
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk0, indexes0.data(), indexes0.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();

txn_id = next_id();
txn_ids.emplace_back(txn_id);
ASSIGN_OR_ABORT(delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk1, indexes1.data(), indexes1.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();

auto new_version = base_version + 2;
ASSERT_OK(batch_publish(tablet_id, base_version, new_version, txn_ids).status());
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 2);
EXPECT_EQ(kChunkSize, read_rows(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets(0).num_dels(), kChunkSize);
EXPECT_EQ(new_tablet_metadata->rowsets(1).num_dels(), 0);

txn_id = next_id();
txn_ids.emplace_back(txn_id);
ASSIGN_OR_ABORT(delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk2, indexes2.data(), indexes2.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();

new_version = base_version + 3;
ASSERT_OK(batch_publish(tablet_id, base_version, new_version, txn_ids).status());
ASSIGN_OR_ABORT(new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 3);
EXPECT_EQ(kChunkSize, read_rows(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets(0).num_dels(), kChunkSize);
EXPECT_EQ(new_tablet_metadata->rowsets(1).num_dels(), kChunkSize);
EXPECT_EQ(new_tablet_metadata->rowsets(2).num_dels(), 0);
}

TEST_P(LakePrimaryKeyPublishTest, test_transform_batch_to_single) {
auto [chunk0, indexes0] = gen_data_and_index(kChunkSize, 0, false, true);
auto [chunk1, indexes1] = gen_data_and_index(kChunkSize, 0, false, true);
auto base_version = 1;
auto tablet_id = _tablet_metadata->id();
std::vector<int64_t> txn_ids;
auto txn_id1 = next_id();
txn_ids.emplace_back(txn_id1);
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id1)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk0, indexes0.data(), indexes0.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();

auto txn_id2 = next_id();
txn_ids.emplace_back(txn_id2);
ASSIGN_OR_ABORT(delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id2)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk1, indexes1.data(), indexes1.size()));
ASSERT_OK(delta_writer->finish());
delta_writer->close();

auto new_version = base_version + 2;
ASSERT_OK(batch_publish(tablet_id, base_version, new_version, txn_ids).status());
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 2);
EXPECT_EQ(kChunkSize, read_rows(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets(0).num_dels(), kChunkSize);
EXPECT_EQ(new_tablet_metadata->rowsets(1).num_dels(), 0);

// transform to single publish
new_version = base_version + 1;
ASSERT_OK(publish_single_version(tablet_id, new_version, txn_id1).status());
ASSIGN_OR_ABORT(new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 1);
EXPECT_EQ(kChunkSize, read_rows(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets(0).num_dels(), 0);

new_version = base_version + 2;
ASSERT_OK(publish_single_version(tablet_id, new_version, txn_id2).status());
ASSIGN_OR_ABORT(new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 2);
EXPECT_EQ(kChunkSize, read_rows(tablet_id, new_version));
EXPECT_EQ(new_tablet_metadata->rowsets(0).num_dels(), 12);
EXPECT_EQ(new_tablet_metadata->rowsets(1).num_dels(), 0);
}

TEST_P(LakePrimaryKeyPublishTest, test_mem_tracker) {
EXPECT_EQ(1024 * 1024, _mem_tracker->limit());
EXPECT_EQ(1024 * 1024 * config::lake_pk_preload_memory_limit_percent / 100,
Expand Down

0 comments on commit bcce25e

Please sign in to comment.