Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Jan 22, 2025
1 parent ee0f98e commit fad8e7c
Show file tree
Hide file tree
Showing 10 changed files with 1,417 additions and 74 deletions.
97 changes: 55 additions & 42 deletions src/bolt_raft/raft_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ RaftDriver::RaftDriver(std::function<void (uint64_t index, const RaftRequest&)>
log_path_(std::move(log_path)),
tick_interval_(100),
tick_timer_(tick_service_, tick_interval_) {
id_generator_ = std::make_shared<Generator>(
node_id,std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count());
threads_.emplace_back([this]() {
pthread_setname_np(pthread_self(), "raft_service");
boost::asio::io_service::work holder(raft_service_);
Expand Down Expand Up @@ -216,14 +219,17 @@ eraft::Error RaftDriver::Run() {
}
storage_ = std::make_shared<RaftLogStorage>(db, cf_handles[0], cf_handles[1]);
auto applied = std::max(apply_id_, storage_->GetApplyIndex());
auto nodes_info = storage_->GetNodesInfo();
auto json_obj = nlohmann::json::parse(nodes_info);
for (auto& item : json_obj) {
auto client = std::make_shared<NodeClient>(client_service_, item["ip"].get<std::string>(), item["port"].get<int>());
auto nodes = storage_->GetNodeInfos();
if (nodes.has_value()) {
node_infos_.ParseFromString(nodes.value());
}
for (auto& [id, node] : node_infos_.nodes()) {
auto client = std::make_shared<NodeClient>(
client_service_, node.ip(), node.bolt_raft_port());
client->Connect();
node_clients_.emplace(item["node_id"].get<uint64_t>(), std::move(client));
node_clients_.emplace(node.node_id(), std::move(client));
}
LOG_INFO() << FMA_FMT("Raft nodes info: {}", nodes_info);
LOG_INFO() << FMA_FMT("Raft nodes info: {}", node_infos_.ShortDebugString());
bool exist = storage_->Init();
eraft::Config config;
config.id_ = node_id_;
Expand Down Expand Up @@ -294,11 +300,11 @@ std::shared_ptr<PromiseContext> RaftDriver::PostMessage(uint64_t uuid, raftpb::M
LOG_WARN() << FMA_FMT("failed to step raft message, err: {}",err.String().c_str());
return;
}
context->proposed.set_value(std::move(err));
{
std::lock_guard<std::mutex> guard(promise_mutex_);
pending_promise_.emplace(uuid, std::move(context));
}
context->proposed.set_value(std::move(err));
CheckReady();
});
return context;
Expand Down Expand Up @@ -335,13 +341,30 @@ std::shared_ptr<PromiseContext> RaftDriver::ProposeConfChange(const raftpb::Conf
return PostMessage(id_generator_->Next(), std::move(msg));
}

std::shared_ptr<PromiseContext> RaftDriver::Propose(std::string data) {
std::shared_ptr<PromiseContext> RaftDriver::Propose(bolt_raft::RaftRequest request) {
request.set_id(id_generator_->Next());
raftpb::Message msg;
auto entry = msg.add_entries();
entry->set_type(raftpb::EntryType::EntryNormal);
entry->set_data(std::move(data));
entry->set_data(request.SerializeAsString());
msg.set_type(raftpb::MessageType::MsgProp);
return PostMessage(id_generator_->Next(), std::move(msg));
return PostMessage(request.id(), std::move(msg));
}

NodeInfos RaftDriver::GetNodeInfosWithLeader() {
std::promise<uint64_t> promise;
auto future = promise.get_future();
raft_service_.post([this, &promise]() {
promise.set_value(rn_->raft_->lead_);
});
auto leader = future.get();

std::shared_lock<std::shared_mutex> lock(node_infos_mutex_);
NodeInfos ret = node_infos_;
if (ret.nodes().count(leader)) {
ret.mutable_nodes()->at(leader).set_is_leader(true);
}
return ret;
}

void RaftDriver::CheckReady() {
Expand All @@ -359,23 +382,10 @@ void RaftDriver::CheckReady() {
advance_ = true;
}

std::string RaftDriver::nodes_info() {
auto array = nlohmann::json::array();
for (auto& [id, node] : node_clients_) {
auto obj = nlohmann::json::object();
obj["node_id"] = id;
obj["ip"] = node->ip();
obj["port"] = node->port();
array.emplace_back(std::move(obj));
}
return array.dump();
}


void RaftDriver::OnReady(eraft::Ready ready) {
if (ready.softState_) {
LOG_INFO() << FMA_FMT("soft state change, state:{}, lead:{}",
eraft::ToString(ready.softState_->raftState_).c_str(), ready.softState_->lead_);
eraft::ToString(ready.softState_->raftState_), ready.softState_->lead_);
}
rocksdb::WriteBatch batch;
if (!ready.entries_.empty()) {
Expand Down Expand Up @@ -405,7 +415,6 @@ void RaftDriver::OnReady(eraft::Ready ready) {
if (entry.data().empty()) {
continue;
}

RaftRequest request;
auto ret = request.ParseFromString(entry.data());
assert(ret);
Expand Down Expand Up @@ -433,32 +442,36 @@ void RaftDriver::OnReady(eraft::Ready ready) {
LOG_FATAL() << "failed to parse ConfChange";
}
auto confstate = rn_->ApplyConfChange(raftpb::ConfChangeWrap(cc));
auto info = nlohmann::json::parse(cc.context());
NodeInfo node_info;
node_info.ParseFromString(cc.context());
switch (cc.type()) {
case raftpb::ConfChangeType::ConfChangeAddLearnerNode:
case raftpb::ConfChangeType::ConfChangeAddNode: {
if (cc.type() == raftpb::ConfChangeType::ConfChangeAddNode) {
LOG_INFO() << FMA_FMT("Add node: {}", cc.ShortDebugString());
LOG_INFO() << FMA_FMT("Add node: {}", node_info.ShortDebugString());
} else {
LOG_INFO() << FMA_FMT("Add learner: {}", cc.ShortDebugString());
LOG_INFO() << FMA_FMT("Add learner: {}", node_info.ShortDebugString());
}
if (!node_clients_.count(cc.node_id())) {
auto ip = info["ip"].get<std::string>();
auto port = info["port"].get<int>();
auto client = std::make_shared<NodeClient>(client_service_, ip, port);
std::unique_lock<std::shared_mutex> lock(node_infos_mutex_);
if (!node_infos_.nodes().count(node_info.node_id())) {
node_infos_.mutable_nodes()->insert({node_info.node_id(), node_info});
auto client = std::make_shared<NodeClient>(client_service_, node_info.ip(), node_info.bolt_raft_port());
client->Connect();
node_clients_.emplace(cc.node_id(), std::move(client));
node_clients_.emplace(node_info.node_id(), std::move(client));
} else {
LOG_ERROR() << FMA_FMT("peer client id %d has already existed", cc.node_id());
LOG_ERROR() << FMA_FMT("node id %d has already existed", node_info.node_id());
}
break;
}
case raftpb::ConfChangeType::ConfChangeRemoveNode: {
LOG_INFO() << FMA_FMT("Remove node: {}", cc.ShortDebugString());
if (node_clients_.count(cc.node_id())) {
node_clients_.at(cc.node_id())->Close();
node_clients_.erase(cc.node_id());
LOG_INFO() << FMA_FMT("erase id {} from peer client pool", cc.node_id());
LOG_INFO() << FMA_FMT("Remove node: {}", node_info.node_id());
std::unique_lock<std::shared_mutex> lock(node_infos_mutex_);
if (node_infos_.nodes().count(node_info.node_id())) {
node_clients_.at(node_info.node_id())->Close();
node_clients_.erase(node_info.node_id());
node_infos_.mutable_nodes()->erase(node_info.node_id());
} else {
LOG_ERROR() << FMA_FMT("No such node id {}", node_info.node_id());
}
break;
}
Expand All @@ -468,12 +481,12 @@ void RaftDriver::OnReady(eraft::Ready ready) {
}
default: {
break;
}
}
}}

LOG_INFO() << FMA_FMT("New ConfState: {}", confstate->ShortDebugString());
rocksdb::WriteBatch wb;
storage_->SetConfState(*confstate, wb);
storage_->SetNodesInfo(nodes_info(), wb);
storage_->SetNodeInfos(node_infos_.SerializeAsString(), wb);
storage_->SetApplyIndex(entry.index(), wb);
storage_->WriteBatch(wb);
/*if (cc.id() > 0) {
Expand Down
6 changes: 4 additions & 2 deletions src/bolt_raft/raft_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ class RaftDriver {
std::string log_path);
eraft::Error Run();
void Message(raftpb::Message msg);
std::shared_ptr<PromiseContext> Propose(std::string data);
std::shared_ptr<PromiseContext> Propose(bolt_raft::RaftRequest request);
std::shared_ptr<PromiseContext> ProposeConfChange(const raftpb::ConfChange& cc);
NodeInfos GetNodeInfosWithLeader();

private:
std::shared_ptr<PromiseContext> PostMessage(uint64_t uuid, raftpb::Message msg);
void Tick();
void Advance();
void CheckReady();
void OnReady(eraft::Ready ready);
std::string nodes_info();

std::vector<std::thread> threads_;
boost::asio::io_service raft_service_;
Expand All @@ -103,6 +103,8 @@ class RaftDriver {
std::shared_ptr<RaftLogStorage> storage_;
uint64_t lead_ = eraft::None;
bool advance_ = false;
std::shared_mutex node_infos_mutex_;
NodeInfos node_infos_;
std::unordered_map<uint64_t, std::shared_ptr<NodeClient>> node_clients_;
std::shared_ptr<Generator> id_generator_ = nullptr;
std::mutex promise_mutex_;
Expand Down
18 changes: 8 additions & 10 deletions src/bolt_raft/raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ std::string raft_confstate_key() {
return "confState";
}

std::string raft_nodesinfo_key() {
return "nodesInfo";
std::string raft_nodeinfos_key() {
return "nodeInfos";
}

bool RaftLogStorage::Init() {
Expand Down Expand Up @@ -126,23 +126,21 @@ eraft::Error RaftLogStorage::SetConfState(const raftpb::ConfState& hs, rocksdb::
return nullptr;
}

eraft::Error RaftLogStorage::SetNodesInfo(const std::string& info, rocksdb::WriteBatch &batch) {
eraft::Error RaftLogStorage::SetNodeInfos(const std::string& info, rocksdb::WriteBatch &batch) {
std::string val;
std::string key = raft_nodesinfo_key();
std::string key = raft_nodeinfos_key();
batch.Put(meta_cf_, key, info);
return nullptr;
}

std::string RaftLogStorage::GetNodesInfo() {
std::string ret;
std::string key = raft_nodesinfo_key();
std::optional<std::string> RaftLogStorage::GetNodeInfos() {
std::optional<std::string> ret;
std::string val;
std::string key = raft_nodeinfos_key();
auto s = db_->Get(rocksdb::ReadOptions(), meta_cf_, key, &val);
if (s.ok()) {
ret = val;
} else if (s.IsNotFound()) {
ret = "[]";
} else {
} else if (!s.IsNotFound()) {
LOG_FATAL() << FMA_FMT("Failed to get nodes info: {}", s.ToString());
}
return ret;
Expand Down
4 changes: 2 additions & 2 deletions src/bolt_raft/raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ struct RaftLogStorage : private boost::noncopyable, eraft::Storage {
bool Init();
eraft::Error SetHardState(const raftpb::HardState& hs, rocksdb::WriteBatch &batch);
eraft::Error SetConfState(const raftpb::ConfState& hs, rocksdb::WriteBatch &batch);
eraft::Error SetNodesInfo(const std::string& info, rocksdb::WriteBatch &batch);
std::string GetNodesInfo();
eraft::Error SetNodeInfos(const std::string& info, rocksdb::WriteBatch &batch);
std::optional<std::string> GetNodeInfos();
eraft::Error SetApplyIndex(uint64_t apply_index, rocksdb::WriteBatch &batch);
uint64_t GetApplyIndex();
eraft::Error Append(std::vector<raftpb::Entry> entries, rocksdb::WriteBatch &batch);
Expand Down
Loading

0 comments on commit fad8e7c

Please sign in to comment.