Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Jan 21, 2025
1 parent 69ccc92 commit 48aa34f
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 171 deletions.
11 changes: 5 additions & 6 deletions src/bolt_raft/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class Connection : private boost::asio::noncopyable {
int64_t& conn_id() { return conn_id_;};
boost::asio::io_service& io_service() {return io_service_;};
virtual void Start() = 0;
virtual void PostResponse(std::string) {};
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::socket socket_;
Expand Down Expand Up @@ -83,7 +82,7 @@ inline void ProtobufConnection::read_magic_done(const boost::system::error_code&
return;
}
if (memcmp(buffer4_, magic_code_, sizeof(magic_code_)) != 0) {
LOG_WARN() << ("receive wrong magic code");
LOG_WARN() << "receive wrong magic code";
Close();
return;
}
Expand All @@ -100,12 +99,12 @@ inline void ProtobufConnection::read_msg_size_done(const boost::system::error_co

boost::endian::big_to_native_inplace(msg_size_);
if (msg_size_ > 1024 * 1024 * 1024) {
LOG_WARN() << FMA_FMT("receive message which is too big, size : {}", msg_size_);
LOG_WARN() << FMA_FMT("receive raft message which is too big, size : {}", msg_size_);
Close();
return;
}
if (msg_size_ == 0) {
LOG_WARN() << FMA_FMT("receive message with error size, size : {}", msg_size_);
LOG_WARN() << FMA_FMT("receive raft message with error size, size : {}", msg_size_);
Close();
return;
}
Expand All @@ -122,14 +121,14 @@ inline void ProtobufConnection::read_msg_body() {

inline void ProtobufConnection::read_msg_body_done(const boost::system::error_code &ec) {
if (ec) {
LOG_WARN() << FMA_FMT("read_msg_body_done error {}", ec.message().c_str());
LOG_WARN() << FMA_FMT("read_msg_body_done error {}", ec.message());
Close();
return;
}
raftpb::Message msg;
auto ret = msg.ParseFromArray(msg_body_.data(), (int)msg_body_.size());
if (!ret) {
LOG_WARN() << FMA_FMT("raft msg ParseFromArray failed, close connection");
LOG_WARN() << FMA_FMT("failed to parse raft msg, close connection");
Close();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/bolt_raft/io_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class IOService : private boost::asio::noncopyable {
void clean_closed_conn() {
for (auto it = connections_.cbegin(); it != connections_.cend();) {
if (it->second->has_closed()) {
LOG_DEBUG() << FMA_FMT("erase connection[id:{},use_count:{}] from pool",
LOG_DEBUG() << FMA_FMT("erase raft connection[id:{},use_count:{}] from pool",
it->second->conn_id(), it->second.use_count());
it = connections_.erase(it);
} else {
Expand Down
87 changes: 65 additions & 22 deletions src/bolt_raft/raft_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "tools/json.hpp"
#include "tools/lgraph_log.h"
#include "fma-common/string_formatter.h"
#include "raft_request.pb.h"

using boost::asio::async_write;
using boost::asio::ip::tcp;
Expand Down Expand Up @@ -165,7 +166,7 @@ std::string MessageToNetString(const google::protobuf::Message& msg) {
return str;
}

RaftDriver::RaftDriver(std::function<void (uint64_t index, const std::string&)> apply,
RaftDriver::RaftDriver(std::function<void (uint64_t index, const RaftRequest&)> apply,
uint64_t apply_id,
int64_t node_id,
std::vector<eraft::Peer> init_peers,
Expand Down Expand Up @@ -250,7 +251,7 @@ eraft::Error RaftDriver::Run() {
LOG_WARN() << FMA_FMT("tick_timer async_wait error: {}", ec.message().c_str());
return;
}
tick();
Tick();
});
return {};
}
Expand All @@ -271,48 +272,73 @@ void RaftDriver::Message(raftpb::Message msg) {
if (err != nullptr) {
LOG_WARN() << FMA_FMT("Step return err: {}",err.String().c_str());
}
check_ready();
CheckReady();
});
}

eraft::Error RaftDriver::PostMessage(raftpb::Message msg) {
std::promise<eraft::Error> promise;
std::future<eraft::Error> future = promise.get_future();
raft_service_.post([this, msg = std::move(msg), &promise]() mutable {
std::shared_ptr<ApplyContext> RaftDriver::PostMessage(uint64_t uuid, raftpb::Message msg) {
auto context = std::make_shared<ApplyContext>();
raft_service_.post([this, uuid, context, msg = std::move(msg)]() mutable {
if (rn_->raft_->id_ != rn_->raft_->lead_) {
promise.set_value(eraft::Error("not leader"));
context->propose.set_value(eraft::Error("not leader"));
return;
}
msg.set_from(rn_->raft_->id_);
auto err = rn_->Step(std::move(msg));
if (err != nullptr) {
LOG_WARN() << FMA_FMT("Proposal return err: {}",err.String().c_str());
}
promise.set_value(std::move(err));
check_ready();
context->propose.set_value(std::move(err));
{
std::lock_guard<std::mutex> guard(promise_mutex_);
pending_promise_.emplace(uuid, std::move(context));
}
CheckReady();
});
return context;
}

void RaftDriver::Advance() {
raft_service_.post([this]() mutable {
rn_->Advance({});
advance_ = false;
});
}

void RaftDriver::Tick() {
raft_service_.post([this]() mutable {
rn_->Tick();
CheckReady();
});
tick_timer_.expires_at(tick_timer_.expires_at() + tick_interval_);
tick_timer_.async_wait([this](const boost::system::error_code& ec){
if (ec) {
LOG_WARN() << "tick_timer async_wait error: " << ec.message();
return;
}
Tick();
});
return future.get();
}

eraft::Error RaftDriver::ProposeConfChange(const raftpb::ConfChange& cc) {
std::shared_ptr<ApplyContext> RaftDriver::ProposeConfChange(const raftpb::ConfChange& cc) {
raftpb::Message msg;
auto entry = msg.add_entries();
entry->set_type(raftpb::EntryType::EntryConfChange);
entry->set_data(cc.SerializeAsString());
msg.set_type(raftpb::MessageType::MsgProp);
return PostMessage(std::move(msg));
return PostMessage(id_generator_->Next(), std::move(msg));
}

eraft::Error RaftDriver::Propose(std::string data) {
std::shared_ptr<ApplyContext> RaftDriver::Propose(std::string data) {
raftpb::Message msg;
auto entry = msg.add_entries();
entry->set_type(raftpb::EntryType::EntryNormal);
entry->set_data(std::move(data));
msg.set_type(raftpb::MessageType::MsgProp);
return PostMessage(std::move(msg));
return PostMessage(id_generator_->Next(), std::move(msg));
}

void RaftDriver::check_ready() {
void RaftDriver::CheckReady() {
if (advance_) {
return;
}
Expand All @@ -321,8 +347,8 @@ void RaftDriver::check_ready() {
}
auto ready = rn_->GetReady();
ready_service_.post([this, ready = std::move(ready)]() mutable {
on_ready(std::move(ready));
advance();
OnReady(std::move(ready));
Advance();
});
advance_ = true;
}
Expand All @@ -340,7 +366,7 @@ std::string RaftDriver::nodes_info() {
}


void RaftDriver::on_ready(eraft::Ready ready) {
void RaftDriver::OnReady(eraft::Ready ready) {
if (ready.softState_) {
LOG_INFO() << FMA_FMT("soft state change, state:{}, lead:{}",
eraft::ToString(ready.softState_->raftState_).c_str(), ready.softState_->lead_);
Expand Down Expand Up @@ -373,7 +399,26 @@ void RaftDriver::on_ready(eraft::Ready ready) {
if (entry.data().empty()) {
continue;
}
apply_(entry.index(), entry.data());

RaftRequest request;
auto ret = request.ParseFromString(entry.data());
assert(ret);
std::shared_ptr<bolt_raft::ApplyContext> context;
{
std::lock_guard<std::mutex> guard(promise_mutex_);
auto iter = pending_promise_.find(request.id());
if (iter != pending_promise_.end()) {
context = iter->second;
pending_promise_.erase(iter);
}
}
if (context) {
context->index = entry.index();
context->start.set_value();
context->end.get_future().get();
} else {
apply_(entry.index(), request);
}
break;
}
case raftpb::EntryConfChange: {
Expand Down Expand Up @@ -440,6 +485,4 @@ void RaftDriver::on_ready(eraft::Ready ready) {
}
}
}
std::shared_ptr<RaftDriver> g_raft_driver;
std::shared_ptr<Generator> g_id_generator;
}
99 changes: 45 additions & 54 deletions src/bolt_raft/raft_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
#include <boost/asio.hpp>
#include <utility>
#include "raft_log_store.h"
#include "bolt_raft/raft_request.pb.h"
namespace bolt_raft {
using namespace boost::asio::ip;
class NodeClient : public std::enable_shared_from_this<NodeClient> {
public:
NodeClient(boost::asio::io_service& io_service, const std::string &ip, int port)
: io_service_(io_service),
socket_(io_service_),
endpoint_(boost::asio::ip::address::from_string(ip), port),
endpoint_(address::from_string(ip), port),
interval_(1000),
timer_(io_service_) {
}
Expand All @@ -25,8 +27,8 @@ class NodeClient : public std::enable_shared_from_this<NodeClient> {
void send_magic_code();

boost::asio::io_service& io_service_;
boost::asio::ip::tcp::socket socket_;
boost::asio::ip::tcp::endpoint endpoint_;
tcp::socket socket_;
tcp::endpoint endpoint_;
std::deque<std::string> msg_queue_;
std::vector<boost::asio::const_buffer> send_buffers_;
boost::posix_time::millisec interval_;
Expand All @@ -37,44 +39,60 @@ class NodeClient : public std::enable_shared_from_this<NodeClient> {
uint8_t buffer4_[4] = {0};
};

struct Generator {
static const int tsLen = 5 * 8;
static const int cntLen = 8;
static const int suffixLen = tsLen + cntLen;

Generator(uint64_t id, uint64_t time) {
prefix = id << suffixLen;
suffix = lowbit(time, tsLen) << cntLen;
}
uint64_t prefix = 0;
std::atomic<uint64_t> suffix{0};
static uint64_t lowbit(uint64_t x, int n) {
return x & (std::numeric_limits<uint64_t>::max() >> (64 - n));
}
uint64_t Next() {
auto suf = suffix.fetch_add(1) + 1;
auto id = prefix | lowbit(suf, suffixLen);
return id;
}
};

struct ApplyContext {
uint64_t index = 0;
std::promise<eraft::Error> propose;
std::promise<void> start;
std::promise<void> end;
};

class RaftDriver {
public:
RaftDriver(std::function<void (uint64_t index, const std::string&)> apply,
RaftDriver(std::function<void (uint64_t index, const RaftRequest&)> apply,
uint64_t apply_id,
int64_t node_id,
std::vector<eraft::Peer> init_peers,
std::string log_path);
eraft::Error Run();
void Message(raftpb::Message msg);
eraft::Error Propose(std::string data);
eraft::Error ProposeConfChange(const raftpb::ConfChange& cc);
std::shared_ptr<ApplyContext> Propose(std::string data);
std::shared_ptr<ApplyContext> ProposeConfChange(const raftpb::ConfChange& cc);

private:
eraft::Error PostMessage(raftpb::Message msg);
void tick() {
raft_service_.post([this]() mutable {
rn_->Tick();
check_ready();
});
tick_timer_.expires_at(tick_timer_.expires_at() + tick_interval_);
tick_timer_.async_wait(std::bind(&RaftDriver::tick, this));
}
void advance() {
raft_service_.post([this]() mutable {
rn_->Advance({});
advance_ = false;
});
}
void check_ready();
void on_ready(eraft::Ready ready);
std::shared_ptr<ApplyContext> PostMessage(uint64_t uuid, raftpb::Message msg);
void Tick();
void Advance();
void CheckReady();
void OnReady(eraft::Ready ready);
std::string nodes_info();

std::vector<std::thread> threads_;
boost::asio::io_service raft_service_;
boost::asio::io_service tick_service_;
boost::asio::io_service ready_service_;
boost::asio::io_service client_service_;
std::function<void (uint64_t, const std::string&)> apply_;
std::function<void (uint64_t, const RaftRequest&)> apply_;
uint64_t apply_id_;
uint64_t node_id_;
std::vector<eraft::Peer> init_peers_;
Expand All @@ -86,35 +104,8 @@ class RaftDriver {
uint64_t lead_ = eraft::None;
bool advance_ = false;
std::unordered_map<uint64_t, std::shared_ptr<NodeClient>> node_clients_;
std::shared_ptr<Generator> id_generator_ = nullptr;
std::mutex promise_mutex_;
std::unordered_map<uint64_t, std::shared_ptr<ApplyContext>> pending_promise_;
};
extern std::shared_ptr<RaftDriver> g_raft_driver;

struct Generator {
static const int tsLen = 5 * 8;
static const int cntLen = 8;
static const int suffixLen = tsLen + cntLen;

Generator(uint64_t id, uint64_t time) {
prefix = id << suffixLen;
suffix = lowbit(time, tsLen) << cntLen;
}
uint64_t prefix = 0;
std::atomic<uint64_t> suffix{0};
uint64_t lowbit(uint64_t x, int n) {
return x & (std::numeric_limits<uint64_t>::max() >> (64 - n));
}
uint64_t Next() {
auto suf = suffix.fetch_add(1) + 1;
auto id = prefix | lowbit(suf, suffixLen);
return id;
}
};
extern std::shared_ptr<Generator> g_id_generator;

struct ApplyContext {
uint64_t index = 0;
std::promise<void> start;
std::promise<void> end;
};

}
Loading

0 comments on commit 48aa34f

Please sign in to comment.