Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Resolve the inconsistency between member configuration and log index in snapshot meta #8

Open
wants to merge 10 commits into
base: unstable
Choose a base branch
from
2 changes: 1 addition & 1 deletion cmake/braft.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ExternalProject_Add(
DEPENDS brpc
# The pr on braft is not merged, so I am using my own warehouse to run the test for the time being
GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
GIT_TAG v1.1.2-alpha2
GIT_TAG v1.1.2.1
GIT_SHALLOW true
CMAKE_ARGS
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
Expand Down
18 changes: 0 additions & 18 deletions save_load.sh

This file was deleted.

4 changes: 2 additions & 2 deletions src/pikiwidb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ static int InitLimit() {
limit.rlim_cur = maxfiles;
limit.rlim_max = maxfiles;
if (setrlimit(RLIMIT_NOFILE, &limit) != -1) {
WARN("your 'limit -n ' of {} is not enough for PikiwiDB to start. PikiwiDB have successfully reconfig it to ",
WARN("your 'limit -n ' of {} is not enough for PikiwiDB to start. PikiwiDB have successfully reconfig it to {}",
old_limit, limit.rlim_cur);
} else {
ERROR(
Expand Down Expand Up @@ -282,7 +282,7 @@ int main(int ac, char* av[]) {
daemonize();
}

InitLimit();
// InitLimit(); // Code problem, program startup direct segment error
pstd::InitRandom();
SignalSetup();
InitLogs();
Expand Down
13 changes: 12 additions & 1 deletion src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ storage::LogIndex PRaft::GetLastLogIndex(bool is_flush) {
return node_->get_last_log_index(is_flush);
}

void PRaft::GetConfigurationByIndex(const int64_t index, braft::ConfigurationEntry* conf,
braft::ConfigurationEntry* learner_conf) {
if (!node_) {
ERROR("Node is not initialized");
return;
}

node_->get_configuration(index, conf, learner_conf);
}

void PRaft::SendNodeRequest(PClient* client) {
assert(client);

Expand Down Expand Up @@ -663,8 +673,9 @@ int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
2. When a node is improperly shut down and restarted, the minimum flush-index should
be obtained as the starting point for fault recovery.
*/
// replay from <replay_point + 1>
uint64_t replay_point = PSTORE.GetBackend(db_id_)->GetStorage()->GetSmallestFlushedLogIndex();
node_->set_self_playback_point(replay_point);
node_->set_last_applied_index_and_term(replay_point);
is_node_first_start_up_ = false;
INFO("set replay_point: {}", replay_point);

Expand Down
3 changes: 3 additions & 0 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <tuple>
#include <vector>

#include "braft/configuration_manager.h"
#include "braft/file_system_adaptor.h"
#include "braft/raft.h"
#include "brpc/server.h"
Expand Down Expand Up @@ -140,6 +141,8 @@ class PRaft : public braft::StateMachine {
butil::Status GetListPeers(std::vector<braft::PeerId>* peers);
storage::LogIndex GetTerm(uint64_t log_index);
storage::LogIndex GetLastLogIndex(bool is_flush = false);
void GetConfigurationByIndex(const int64_t index, braft::ConfigurationEntry* conf,
braft::ConfigurationEntry* learner_conf);

bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; }

Expand Down
19 changes: 18 additions & 1 deletion src/praft/psnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include "psnapshot.h"

#include "braft/configuration.h"
#include "braft/configuration_manager.h"
#include "braft/local_file_meta.pb.h"
#include "braft/snapshot.h"
#include "butil/files/file_path.h"
Expand Down Expand Up @@ -79,12 +81,27 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o
PSTORE.HandleTaskSpecificDB(tasks);
AddAllFiles(snapshot_path, &snapshot_meta_memtable, snapshot_path);

// update snapshot last log index and last_log_term
// update snapshot last log index, last_log_term, conf of last log index and learners
auto& new_meta = const_cast<braft::SnapshotMeta&>(snapshot_meta_memtable.meta());
auto last_log_index = PSTORE.GetBackend(db_id)->GetStorage()->GetSmallestFlushedLogIndex();
new_meta.set_last_included_index(last_log_index);
auto last_log_term = PRAFT.GetTerm(last_log_index);
new_meta.set_last_included_term(last_log_term);
braft::ConfigurationEntry conf_entry;
braft::ConfigurationEntry learner_conf_entry;
PRAFT.GetConfigurationByIndex(last_log_index, &conf_entry, &learner_conf_entry);
new_meta.clear_peers();
for (auto iter = conf_entry.conf.begin(); iter != conf_entry.conf.end(); ++iter) {
*new_meta.add_peers() = iter->to_string();
}
new_meta.clear_old_peers();
for (auto iter = conf_entry.old_conf.begin(); iter != conf_entry.old_conf.end(); ++iter) {
*new_meta.add_old_peers() = iter->to_string();
}
new_meta.clear_learners();
for (auto iter = learner_conf_entry.conf.begin(); iter != learner_conf_entry.conf.end(); ++iter) {
*new_meta.add_learners() = iter->to_string();
}
INFO("Succeed to fix db_{} snapshot meta: {}, {}", db_id, last_log_index, last_log_term);

auto rc = snapshot_meta_memtable.save_to_file(fs, meta_path);
Expand Down
Loading