Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Jan 17, 2025
1 parent fd8cd20 commit 686ff99
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 97 deletions.
176 changes: 106 additions & 70 deletions src/bolt_ha/raft_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ using boost::asio::ip::tcp;

extern std::shared_mutex promise_mutex;
extern std::unordered_map<uint64_t, std::promise<std::string>> pending_promise;

namespace bolt_ha {
void NodeClient::reconnect() {
if (has_closed_) {
return;
Expand Down Expand Up @@ -167,6 +167,40 @@ std::string MessageToNetString(const google::protobuf::Message& msg) {
return str;
}

RaftDriver::RaftDriver(std::function<void (uint64_t index, const std::string&)> apply,
uint64_t apply_id,
int64_t node_id,
std::vector<eraft::Peer> init_peers,
std::string log_path)
: apply_(std::move(apply)),
apply_id_(apply_id),
node_id_(node_id),
init_peers_(std::move(init_peers)),
log_path_(std::move(log_path)),
tick_interval_(100),
tick_timer_(tick_service_, tick_interval_) {
threads_.emplace_back([this]() {
pthread_setname_np(pthread_self(), "raft_service");
boost::asio::io_service::work holder(raft_service_);
raft_service_.run();
});
threads_.emplace_back([this]() {
pthread_setname_np(pthread_self(), "tick_service");
boost::asio::io_service::work holder(tick_service_);
tick_service_.run();
});
threads_.emplace_back([this]() {
pthread_setname_np(pthread_self(), "ready_service");
boost::asio::io_service::work holder(ready_service_);
ready_service_.run();
});
threads_.emplace_back([this]() {
pthread_setname_np(pthread_self(), "client_service");
boost::asio::io_service::work holder(client_service_);
client_service_.run();
});
}

eraft::Error RaftDriver::Run() {
rocksdb::Options options;
options.create_if_missing = true;
Expand Down Expand Up @@ -337,84 +371,86 @@ void RaftDriver::on_ready(eraft::Ready ready) {
}
for (const auto& entry : ready.committedEntries_) {
switch (entry.type()) {
case raftpb::EntryNormal: {
if (entry.data().empty()) {
continue;
}
auto propose = nlohmann::json::parse(entry.data());
auto uid = propose["uid"].get<uint64_t>();
auto data = propose["data"].get<std::string>();
apply_(entry.index(), data);
{
std::shared_lock lock(promise_mutex);
auto iter = pending_promise.find(uid);
if (iter != pending_promise.end()) {
iter->second.set_value(data);
}
case raftpb::EntryNormal: {
if (entry.data().empty()) {
continue;
}
auto propose = nlohmann::json::parse(entry.data());
auto uid = propose["uid"].get<uint64_t>();
auto data = propose["data"].get<std::string>();
apply_(entry.index(), data);
{
std::shared_lock lock(promise_mutex);
auto iter = pending_promise.find(uid);
if (iter != pending_promise.end()) {
iter->second.set_value(data);
}
break;
}
case raftpb::EntryConfChange: {
raftpb::ConfChange cc;
if (!cc.ParseFromString(entry.data())) {
LOG_FATAL() << "failed to parse ConfChange";
break;
}
case raftpb::EntryConfChange: {
raftpb::ConfChange cc;
if (!cc.ParseFromString(entry.data())) {
LOG_FATAL() << "failed to parse ConfChange";
}
auto confstate = rn_->ApplyConfChange(raftpb::ConfChangeWrap(cc));
auto info = nlohmann::json::parse(cc.context());
switch (cc.type()) {
case raftpb::ConfChangeType::ConfChangeAddLearnerNode:
case raftpb::ConfChangeType::ConfChangeAddNode: {
if (cc.type() == raftpb::ConfChangeType::ConfChangeAddNode) {
LOG_INFO() << FMA_FMT("Add node: {}", cc.ShortDebugString());
} else {
LOG_INFO() << FMA_FMT("Add learner: {}", cc.ShortDebugString());
}
auto confstate = rn_->ApplyConfChange(raftpb::ConfChangeWrap(cc));
auto info = nlohmann::json::parse(cc.context());
switch (cc.type()) {
case raftpb::ConfChangeType::ConfChangeAddLearnerNode:
case raftpb::ConfChangeType::ConfChangeAddNode: {
if (cc.type() == raftpb::ConfChangeType::ConfChangeAddNode) {
LOG_INFO() << FMA_FMT("Add node: {}", cc.ShortDebugString());
} else {
LOG_INFO() << FMA_FMT("Add learner: {}", cc.ShortDebugString());
}
if (!node_clients_.count(cc.node_id())) {
auto ip = info["ip"].get<std::string>();
auto port = info["port"].get<int>();
auto client = std::make_shared<NodeClient>(client_service_, ip, port);
client->Connect();
node_clients_.emplace(cc.node_id(), std::move(client));
} else {
LOG_ERROR() << FMA_FMT("peer client id %d has already existed", cc.node_id());
}
break;
}
case raftpb::ConfChangeType::ConfChangeRemoveNode: {
LOG_INFO() << FMA_FMT("Remove node: {}", cc.ShortDebugString());
if (node_clients_.count(cc.node_id())) {
node_clients_.at(cc.node_id())->Close();
node_clients_.erase(cc.node_id());
LOG_INFO() << FMA_FMT("erase id {} from peer client pool", cc.node_id());
}
break;
}
case raftpb::ConfChangeType::ConfChangeUpdateNode: {
LOG_INFO() << FMA_FMT("Update node: %s", cc.ShortDebugString());
break;
}
default: {
break;
}
if (!node_clients_.count(cc.node_id())) {
auto ip = info["ip"].get<std::string>();
auto port = info["port"].get<int>();
auto client = std::make_shared<NodeClient>(client_service_, ip, port);
client->Connect();
node_clients_.emplace(cc.node_id(), std::move(client));
} else {
LOG_ERROR() << FMA_FMT("peer client id %d has already existed", cc.node_id());
}
LOG_INFO() << FMA_FMT("New ConfState: {}", confstate->ShortDebugString());
rocksdb::WriteBatch wb;
storage_->SetConfState(*confstate, wb);
storage_->SetNodesInfo(nodes_info(), wb);
storage_->SetApplyIndex(entry.index(), wb);
storage_->WriteBatch(wb);
if (cc.id() > 0) {
std::shared_lock lock(promise_mutex);
auto iter = pending_promise.find(cc.id());
if (iter != pending_promise.end()) {
iter->second.set_value(cc.context());
}
break;
}
case raftpb::ConfChangeType::ConfChangeRemoveNode: {
LOG_INFO() << FMA_FMT("Remove node: {}", cc.ShortDebugString());
if (node_clients_.count(cc.node_id())) {
node_clients_.at(cc.node_id())->Close();
node_clients_.erase(cc.node_id());
LOG_INFO() << FMA_FMT("erase id {} from peer client pool", cc.node_id());
}
break;
}
case raftpb::ConfChangeType::ConfChangeUpdateNode: {
LOG_INFO() << FMA_FMT("Update node: %s", cc.ShortDebugString());
break;
}
default: {
LOG_ERROR() << FMA_FMT("Unhandled entry : {}", entry.ShortDebugString());
break;
}
}
LOG_INFO() << FMA_FMT("New ConfState: {}", confstate->ShortDebugString());
rocksdb::WriteBatch wb;
storage_->SetConfState(*confstate, wb);
storage_->SetNodesInfo(nodes_info(), wb);
storage_->SetApplyIndex(entry.index(), wb);
storage_->WriteBatch(wb);
if (cc.id() > 0) {
std::shared_lock lock(promise_mutex);
auto iter = pending_promise.find(cc.id());
if (iter != pending_promise.end()) {
iter->second.set_value(cc.context());
}
}
break;
}
default: {
LOG_ERROR() << FMA_FMT("Unhandled entry : {}", entry.ShortDebugString());
}
}
}
}
std::shared_ptr<RaftDriver> raft_driver;
}
35 changes: 11 additions & 24 deletions src/bolt_ha/raft_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <boost/asio.hpp>
#include <utility>
#include "raft_log_store.h"

namespace bolt_ha {
class NodeClient : public std::enable_shared_from_this<NodeClient> {
public:
NodeClient(boost::asio::io_service& io_service, const std::string &ip, int port)
Expand Down Expand Up @@ -39,27 +39,11 @@ class NodeClient : public std::enable_shared_from_this<NodeClient> {

class RaftDriver {
public:
RaftDriver(boost::asio::io_service &raft_service,
boost::asio::io_service &tick_service,
boost::asio::io_service &ready_service,
boost::asio::io_service &client_service,
std::function<void (uint64_t index, const std::string&)> apply,
RaftDriver(std::function<void (uint64_t index, const std::string&)> apply,
uint64_t apply_id,
int64_t node_id,
std::vector<eraft::Peer> init_peers,
std::string log_path)
:raft_service_(raft_service),
tick_service_(tick_service),
ready_service_(ready_service),
client_service_(client_service),
apply_(std::move(apply)),
apply_id_(apply_id),
node_id_(node_id),
init_peers_(std::move(init_peers)),
log_path_(std::move(log_path)),
tick_interval_(100),
tick_timer_(tick_service_, tick_interval_) {
}
std::string log_path);
eraft::Error Run();
void Message(raftpb::Message msg);
eraft::Error Proposal(std::string data);
Expand All @@ -85,10 +69,11 @@ class RaftDriver {
void on_ready(eraft::Ready ready);
std::string nodes_info();

boost::asio::io_service& raft_service_;
boost::asio::io_service& tick_service_;
boost::asio::io_service& ready_service_;
boost::asio::io_service& client_service_;
std::vector<std::thread> threads_;
boost::asio::io_service raft_service_;
boost::asio::io_service tick_service_;
boost::asio::io_service ready_service_;
boost::asio::io_service client_service_;
std::function<void (uint64_t, const std::string&)> apply_;
uint64_t apply_id_;
uint64_t node_id_;
Expand All @@ -101,4 +86,6 @@ class RaftDriver {
uint64_t lead_ = eraft::None;
bool advance_ = false;
std::unordered_map<uint64_t, std::shared_ptr<NodeClient>> node_clients_;
};
};
extern std::shared_ptr<RaftDriver> raft_driver;
}
3 changes: 2 additions & 1 deletion src/bolt_ha/raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "raft_log_store.h"
#include "tools/lgraph_log.h"
#include "fma-common/string_formatter.h"

namespace bolt_ha {
std::string raft_log_key(uint64_t log_id) {
std::string ret;
boost::endian::native_to_big_inplace(log_id);
Expand Down Expand Up @@ -273,4 +273,5 @@ std::pair<uint64_t, eraft::Error> RaftLogStorage::FirstIndex() {
std::pair<raftpb::Snapshot, eraft::Error> RaftLogStorage::Snapshot() {
// disable snapshot
return {raftpb::Snapshot{}, eraft::ErrSnapshotTemporarilyUnavailable};
}
}
5 changes: 3 additions & 2 deletions src/bolt_ha/raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <rocksdb/db.h>
#include <rocksdb/convenience.h>
#include "etcd-raft-cpp/rawnode.h"

namespace bolt_ha {
struct RaftLogStorage : private boost::noncopyable, eraft::Storage {
public:
RaftLogStorage(rocksdb::DB* db,
Expand Down Expand Up @@ -46,4 +46,5 @@ struct RaftLogStorage : private boost::noncopyable, eraft::Storage {
uint64_t last_entry_index_ = 0;
raftpb::HardState hard_state_;
raftpb::ConfState conf_state_;
};
};
}
10 changes: 10 additions & 0 deletions src/core/global_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ std::map<std::string, std::string> lgraph::GlobalConfig::FormatAsOptions() const
}
AddOption(options, "bolt port", bolt_port);
AddOption(options, "number of bolt io threads", bolt_io_thread_num);
AddOption(options, "bolt raft port", bolt_raft_port);
AddOption(options, "bolt raft node id", bolt_raft_node_id);
return options;
}

Expand Down Expand Up @@ -218,6 +220,8 @@ fma_common::Configuration lgraph::GlobalConfig::InitConfig
bolt_io_thread_num = 1;
// default disable plugin load/delete
enable_plugin = false;
bolt_raft_port = 0;
bolt_raft_node_id = 0;

// parse options
fma_common::Configuration argparser;
Expand Down Expand Up @@ -340,5 +344,11 @@ fma_common::Configuration lgraph::GlobalConfig::InitConfig
argparser.Add(is_cypher_v2,
"is_cypher_v2", true)
.Comment("Config browser whether to store user credentials in local storage.");
argparser.Add(bolt_raft_port, "bolt_raft_port", true)
.Comment("Bolt raft port.");
argparser.Add(bolt_raft_node_id, "bolt_raft_node_id", true)
.Comment("Bolt raft node id.");
argparser.Add(bolt_raft_init_peers, "bolt_raft_init_peers", true)
.Comment("Bolt raft initial member information.");
return argparser;
}
3 changes: 3 additions & 0 deletions src/core/global_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ struct BasicConfigs {
// bolt
int bolt_port = 0;
int bolt_io_thread_num = 1;
int bolt_raft_port = 0;
uint64_t bolt_raft_node_id = 0;
std::string bolt_raft_init_peers;
// default disable plugin load/delete
bool enable_plugin = false;
BrowserOptions browser_options;
Expand Down
23 changes: 23 additions & 0 deletions src/server/lgraph_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "server/state_machine.h"
#include "server/ha_state_machine.h"
#include "server/db_management_client.h"
#include "bolt_ha/raft_driver.h"

#ifndef _WIN32
#include "brpc/server.h"
Expand Down Expand Up @@ -318,6 +319,28 @@ int LGraphServer::Start() {
config_->bolt_io_thread_num)) {
return -1;
}
if (config_->bolt_raft_port > 0) {
std::vector<eraft::Peer> init_peers;
auto cluster = nlohmann::json::parse(config_->bolt_raft_init_peers);
for (const auto& item : cluster) {
eraft::Peer peer;
peer.id_ = item["node_id"].get<int64_t>();
peer.context_ = item.dump();
init_peers.emplace_back(std::move(peer));
}
/*
bolt_ha::raft_driver = std::make_unique<bolt_ha::RaftDriver> (
[](uint64_t index, const std::string& log){apply(data_db, index, log);},
0,
config_->bolt_raft_node_id,
init_peers,
"raftlog");
auto err = bolt_ha::raft_driver->Run();
if (err != nullptr) {
LOG_ERROR() << "raft driver failed to run, error: " << err.String();
return -1;
}*/
}
}

if (config_->enable_rpc) {
Expand Down

0 comments on commit 686ff99

Please sign in to comment.