Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Jan 19, 2025
1 parent 6eb8b73 commit 958e5c2
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 2 deletions.
4 changes: 2 additions & 2 deletions include/lgraph/lgraph_exceptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ X(ReachMaximumCompositeIndexField, "The size of composite index fields exceeds t
X(PluginDisabled, "Plugin disabled!") \
X(BoltDataException, "Bolt data exception") \
X(VectorIndexException, "Vector index exception") \
X(ReplicateTimeout, "Raft replication Timeout")

X(RaftProposeError, "Raft propose error") \
X(ReplicateTimeout, "Raft replication timeout")
enum class ErrorCode {
#define X(code, msg) code,
ERROR_CODES
Expand Down
140 changes: 140 additions & 0 deletions src/bolt_ha/connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#pragma once
#include <deque>
#include <boost/asio.hpp>
#include <boost/endian/conversion.hpp>
#include <utility>
#include "tools/lgraph_log.h"
#include "fma-common/string_formatter.h"
#include "etcd-raft-cpp/raftpb/raft.pb.h"

namespace bolt_ha {

class Connection : private boost::asio::noncopyable {
public:
virtual ~Connection() = default;

explicit Connection(boost::asio::io_service& io_service)
: io_service_(io_service), socket_(io_service_), has_closed_(false) {
}
boost::asio::ip::tcp::socket& socket() { return socket_; };
virtual void Close() {
LOG_DEBUG() << FMA_FMT("close conn[id:{}]", conn_id_);
socket_.close();
has_closed_ = true;
};
virtual bool has_closed() {return has_closed_;};
int64_t& conn_id() { return conn_id_;};
boost::asio::io_service& io_service() {return io_service_;};
virtual void Start() = 0;
virtual void PostResponse(std::string) {};
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::socket socket_;
int64_t conn_id_ = 0;
std::atomic<bool> has_closed_;
};

class ProtobufConnection
: public Connection,
public std::enable_shared_from_this<ProtobufConnection> {
public:
ProtobufConnection(boost::asio::io_service& io_service,
std::function<void(raftpb::Message)> handler)
: Connection(io_service), handler_(std::move(handler)) {
}
void Start() override;
private:
void read_msg_size();
void read_msg_size_done(const boost::system::error_code &ec);
void read_magic();
void read_magic_done(const boost::system::error_code &ec);
void read_msg_body();
void read_msg_body_done(const boost::system::error_code &ec);

uint32_t msg_size_ = 0;
std::vector<char> msg_body_;
std::function<void(raftpb::Message)> handler_;
const uint8_t magic_code_[4] = {0x17, 0xB0, 0x60, 0x60};
uint8_t buffer4_[4] = {0};
};

inline void ProtobufConnection::Start() {
read_magic();
}

inline void ProtobufConnection::read_msg_size() {
async_read(socket(), boost::asio::buffer(&msg_size_, sizeof(msg_size_)),
[this, self = shared_from_this()](const boost::system::error_code& ec, size_t) {
read_msg_size_done(ec);
});
}

inline void ProtobufConnection::read_magic() {
async_read(socket(), boost::asio::buffer(buffer4_),
[this, self = shared_from_this()](const boost::system::error_code& ec, size_t) {
read_magic_done(ec);
});
}

inline void ProtobufConnection::read_magic_done(const boost::system::error_code& ec) {
if (ec) {
LOG_WARN() << FMA_FMT("read_magic_done error {}", ec.message());
Close();
return;
}
if (memcmp(buffer4_, magic_code_, sizeof(magic_code_)) != 0) {
LOG_WARN() << ("receive wrong magic code");
Close();
return;
}
read_msg_size();
}


inline void ProtobufConnection::read_msg_size_done(const boost::system::error_code &ec) {
if (ec) {
LOG_WARN() << FMA_FMT("read_msg_size_done error {}", ec.message());
Close();
return;
}

boost::endian::big_to_native_inplace(msg_size_);
if (msg_size_ > 1024 * 1024 * 1024) {
LOG_WARN() << FMA_FMT("receive message which is too big, size : {}", msg_size_);
Close();
return;
}
if (msg_size_ == 0) {
LOG_WARN() << FMA_FMT("receive message with error size, size : {}", msg_size_);
Close();
return;
}
msg_body_.resize(msg_size_);
read_msg_body();
}

inline void ProtobufConnection::read_msg_body() {
async_read(socket(), boost::asio::buffer(msg_body_),
[this, self = shared_from_this()](const boost::system::error_code &ec, size_t) {
read_msg_body_done(ec);
});
}

inline void ProtobufConnection::read_msg_body_done(const boost::system::error_code &ec) {
if (ec) {
LOG_WARN() << FMA_FMT("read_msg_body_done error {}", ec.message().c_str());
Close();
return;
}
raftpb::Message msg;
auto ret = msg.ParseFromArray(msg_body_.data(), (int)msg_body_.size());
if (!ret) {
LOG_WARN() << FMA_FMT("raft msg ParseFromArray failed, close connection");
Close();
return;
}
handler_(std::move(msg));
read_msg_size();
}

}
139 changes: 139 additions & 0 deletions src/bolt_ha/io_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#pragma once
#include <pthread.h>
#include <thread>
#include <iostream>
#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include "tools/lgraph_log.h"
#include "fma-common/string_formatter.h"

using boost::asio::ip::tcp;
namespace bolt_ha {

class IOServicePool : private boost::asio::noncopyable {
public:
~IOServicePool() { Stop(); }

explicit IOServicePool(std::size_t pool_size) : next_io_service_(0) {
if (pool_size == 0) throw std::runtime_error("io_service_pool size is 0");
for (std::size_t i = 0; i < pool_size; ++i) {
io_service_ptr io_service(new boost::asio::io_service(1));
work_ptr work(new boost::asio::io_service::work(*io_service));
io_services_.push_back(std::move(io_service));
works_.push_back(std::move(work));
}
}

void Run() {
for (std::size_t i = 0; i < io_services_.size(); ++i) {
boost::asio::io_service &service = *io_services_[i];
threads_.emplace_back([i, &service]() {
std::string name = "io-worker-" + std::to_string(i);
pthread_setname_np(pthread_self(), name.c_str());
service.run();
});
}
}

void Stop() {
if (stopped_) {
return;
}
for (std::size_t i = 0; i < io_services_.size(); ++i) {
io_services_[i]->stop();
}
for (auto &t: threads_) {
t.join();
}
stopped_ = true;
}

boost::asio::io_service &GetIOService() {
boost::asio::io_service &io_service = *io_services_[next_io_service_];
++next_io_service_;
if (next_io_service_ == io_services_.size()) next_io_service_ = 0;
return io_service;
}

private:
typedef std::unique_ptr<boost::asio::io_service> io_service_ptr;
typedef std::unique_ptr<boost::asio::io_service::work> work_ptr;

std::vector<io_service_ptr> io_services_;
std::vector<work_ptr> works_;
std::size_t next_io_service_;
std::vector<std::thread> threads_;
bool stopped_ = false;
};

inline void socket_set_options(tcp::socket& socket) {
socket.set_option(boost::asio::ip::tcp::no_delay(true));
socket.set_option(boost::asio::socket_base::keep_alive(true));
socket.set_option(boost::asio::ip::tcp::socket::reuse_address(true));
}

template<typename T, typename F>
class IOService : private boost::asio::noncopyable {
public:
~IOService() {
io_service_pool_.Stop();
}

IOService(boost::asio::io_service &service,
int port, int thread_num, F handler)
: handler_(handler),
acceptor_(service, tcp::endpoint(tcp::v4(), port),
/*reuse_addr*/true),
io_service_pool_(thread_num), interval_(10), timer_(service) {
io_service_pool_.Run();
invoke_async_accept();
clean_closed_conn();
}

private:
void invoke_async_accept() {
conn_.reset(new T(io_service_pool_.GetIOService(), handler_));
acceptor_.async_accept(conn_->socket(), [this](boost::system::error_code ec) {
if (ec) {
LOG_WARN() << FMA_FMT("async accept error: {}", ec.message());
} else {
LOG_DEBUG() << FMA_FMT("accept new raft connection {}",
boost::lexical_cast<std::string>(
conn_->socket().remote_endpoint()).c_str());
socket_set_options(conn_->socket());
conn_->conn_id() = next_conn_id_;
connections_.emplace(next_conn_id_, conn_);
next_conn_id_++;
conn_->Start();
}

invoke_async_accept();
});
}

void clean_closed_conn() {
for (auto it = connections_.cbegin(); it != connections_.cend();) {
if (it->second->has_closed()) {
LOG_DEBUG() << FMA_FMT("erase connection[id:{},use_count:{}] from pool",
it->second->conn_id(), it->second.use_count());
it = connections_.erase(it);
} else {
++it;
}
}
timer_.expires_from_now(interval_);
timer_.async_wait(std::bind(&IOService::clean_closed_conn, this));
}

std::shared_ptr<T> conn_;
F handler_;
std::unordered_map<int64_t, std::shared_ptr<T>> connections_;
tcp::acceptor acceptor_;
IOServicePool io_service_pool_;
int next_conn_id_ = 0;
boost::posix_time::seconds interval_;
boost::asio::deadline_timer timer_;
};

}
1 change: 1 addition & 0 deletions src/server/bolt_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ void BoltFSM(std::shared_ptr<BoltConnection> conn) {
auto err = bolt_ha::g_raft_driver->Propose(request.SerializeAsString());
if (err != nullptr) {
LOG_ERROR() << FMA_FMT("Failed to propose, err: {}", err.String());
THROW_CODE(RaftProposeError, err.String());
}
if (future.wait_for(std::chrono::milliseconds(1000)) == std::future_status::ready) {
future.get();
Expand Down

0 comments on commit 958e5c2

Please sign in to comment.