Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ljcui committed Jan 20, 2025
1 parent 958e5c2 commit 1534b28
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 115 deletions.
8 changes: 4 additions & 4 deletions src/BuildBoltLib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ set(BOLT_SRC
bolt/connection.cpp
bolt/hydrator.cpp
bolt/pack.cpp
bolt_ha/logger.cpp
bolt_ha/raft_log_store.cpp
bolt_ha/raft_driver.cpp
bolt_ha/bolt_ha.pb.cc
bolt_raft/logger.cpp
bolt_raft/raft_log_store.cpp
bolt_raft/raft_driver.cpp
bolt_raft/bolt_ha.pb.cc
${LGRAPH_ROOT_DIR}/deps/etcd-raft-cpp/raftpb/raft.pb.cc
lgraph_api/lgraph_exceptions.cpp)

Expand Down
1 change: 1 addition & 0 deletions src/BuildLGraphServer.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ add_library(${TARGET_SERVER_LIB} STATIC
plugin/cpp_plugin.cpp
server/bolt_handler.cpp
server/bolt_server.cpp
server/bolt_raft_server.cpp
server/lgraph_server.cpp
server/state_machine.cpp
server/ha_state_machine.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/bolt_ha/bolt_ha.proto → src/bolt_raft/bolt_ha.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/

syntax = "proto3";
package bolt_ha;
package bolt_raft;

message RaftRequest {
uint64 id = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/bolt_ha/connection.h → src/bolt_raft/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "fma-common/string_formatter.h"
#include "etcd-raft-cpp/raftpb/raft.pb.h"

namespace bolt_ha {
namespace bolt_raft {

class Connection : private boost::asio::noncopyable {
public:
Expand Down
2 changes: 1 addition & 1 deletion src/bolt_ha/io_service.h → src/bolt_raft/io_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "fma-common/string_formatter.h"

using boost::asio::ip::tcp;
namespace bolt_ha {
namespace bolt_raft {

class IOServicePool : private boost::asio::noncopyable {
public:
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
using boost::asio::async_write;
using boost::asio::ip::tcp;

namespace bolt_ha {
namespace bolt_raft {
void NodeClient::reconnect() {
if (has_closed_) {
return;
Expand Down
2 changes: 1 addition & 1 deletion src/bolt_ha/raft_driver.h → src/bolt_raft/raft_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <boost/asio.hpp>
#include <utility>
#include "raft_log_store.h"
namespace bolt_ha {
namespace bolt_raft {
class NodeClient : public std::enable_shared_from_this<NodeClient> {
public:
NodeClient(boost::asio::io_service& io_service, const std::string &ip, int port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "raft_log_store.h"
#include "tools/lgraph_log.h"
#include "fma-common/string_formatter.h"
namespace bolt_ha {
namespace bolt_raft {
std::string raft_log_key(uint64_t log_id) {
std::string ret;
boost::endian::native_to_big_inplace(log_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <rocksdb/db.h>
#include <rocksdb/convenience.h>
#include "etcd-raft-cpp/rawnode.h"
namespace bolt_ha {
namespace bolt_raft {
struct RaftLogStorage : private boost::noncopyable, eraft::Storage {
public:
RaftLogStorage(rocksdb::DB* db,
Expand Down
20 changes: 10 additions & 10 deletions src/server/bolt_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include "server/bolt_session.h"
#include "db/galaxy.h"

#include "bolt_ha/raft_driver.h"
#include "bolt_ha/bolt_ha.pb.h"
#include "bolt_raft/raft_driver.h"
#include "bolt_raft/bolt_ha.pb.h"

using namespace lgraph_api;
namespace bolt {
Expand Down Expand Up @@ -113,13 +113,13 @@ parser::Expression ConvertParameters(std::any data) {

using namespace std::chrono;
std::shared_mutex promise_mutex;
std::unordered_map<uint64_t, std::shared_ptr<bolt_ha::ApplyContext>> pending_promise;
std::unordered_map<uint64_t, std::shared_ptr<bolt_raft::ApplyContext>> pending_promise;

void g_apply(uint64_t index, const std::string& log) {
bolt_ha::RaftRequest request;
bolt_raft::RaftRequest request;
auto ret = request.ParseFromString(log);
assert(ret);
std::shared_ptr<bolt_ha::ApplyContext> context;
std::shared_ptr<bolt_raft::ApplyContext> context;
{
std::unique_lock lock(promise_mutex);
auto iter = pending_promise.find(request.id());
Expand Down Expand Up @@ -297,23 +297,23 @@ void BoltFSM(std::shared_ptr<BoltConnection> conn) {
}
}
session->streaming_msg.reset();
std::shared_ptr<bolt_ha::ApplyContext> apply_context;
std::shared_ptr<bolt_raft::ApplyContext> apply_context;
{
std::string plugin_name, plugin_type;
auto ret = cypher::Scheduler::DetermineReadOnly(&ctx, GraphQueryType::CYPHER, cypher, plugin_name, plugin_type);
if (!ret) {
auto uid = bolt_ha::g_id_generator->Next();
apply_context = std::make_shared<bolt_ha::ApplyContext>();
auto uid = bolt_raft::g_id_generator->Next();
apply_context = std::make_shared<bolt_raft::ApplyContext>();
auto future = apply_context->start.get_future();
{
std::unique_lock lock(promise_mutex);
pending_promise.emplace(uid, apply_context);
}
bolt_ha::RaftRequest request;
bolt_raft::RaftRequest request;
request.set_id(uid);
request.set_user(session->user);
request.set_raw_data((const char*)msg.value().raw_data.data(), msg.value().raw_data.size());
auto err = bolt_ha::g_raft_driver->Propose(request.SerializeAsString());
auto err = bolt_raft::g_raft_driver->Propose(request.SerializeAsString());
if (err != nullptr) {
LOG_ERROR() << FMA_FMT("Failed to propose, err: {}", err.String());
THROW_CODE(RaftProposeError, err.String());
Expand Down
74 changes: 74 additions & 0 deletions src/server/bolt_raft_server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#include "bolt_raft_server.h"
#include "bolt_raft/raft_driver.h"
#include "bolt_raft/io_service.h"
#include "bolt_raft/connection.h"
#include "tools/json.hpp"
#include "tools/lgraph_log.h"

namespace bolt {
void g_apply(uint64_t index, const std::string& log);
}

namespace bolt_raft {
std::function protobuf_handler = [](raftpb::Message rpc_msg){
bolt_raft::g_raft_driver->Message(std::move(rpc_msg));
};

static boost::asio::io_service raft_listener(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);
bool BoltRaftServer::Start(int port, uint64_t node_id, std::string init_peers) {
std::promise<bool> promise;
std::future<bool> future = promise.get_future();
threads_.emplace_back([port, node_id, init_peers, &promise](){
bool promise_done = false;
try {
std::vector<eraft::Peer> peers;
auto cluster = nlohmann::json::parse(init_peers);
for (const auto& item : cluster) {
eraft::Peer peer;
peer.id_ = item["node_id"].get<int64_t>();
peer.context_ = item.dump();
peers.emplace_back(std::move(peer));
}
bolt_raft::g_id_generator = std::make_shared<bolt_raft::Generator>(
node_id,std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count());
bolt_raft::g_raft_driver = std::make_unique<bolt_raft::RaftDriver> (
bolt::g_apply,
0,
node_id,
peers,
"raftlog");
auto err = bolt_raft::g_raft_driver->Run();
if (err != nullptr) {
LOG_ERROR() << "raft driver failed to run, error: " << err.String();
return;
}
bolt_raft::IOService<bolt_raft::ProtobufConnection, decltype(protobuf_handler)> bolt_raft_service(
raft_listener, port, 1, protobuf_handler);
boost::asio::io_service::work holder(raft_listener);
LOG_INFO() << "bolt raft server run";
promise.set_value(true);
promise_done = true;
raft_listener.run();
} catch (const std::exception& e) {
LOG_WARN() << "bolt raft server expection: " << e.what();
if (!promise_done) {
promise.set_value(false);
}
}
});
return future.get();
}

void BoltRaftServer::Stop() {
if (stopped_) {
return;
}
raft_listener.stop();
for (auto& t : threads_) {
t.join();
}
stopped_ = true;
LOG_INFO() << "bolt raft server stopped.";
}
}
21 changes: 21 additions & 0 deletions src/server/bolt_raft_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include <thread>
#include "fma-common/type_traits.h"

namespace bolt_raft {
class BoltRaftServer final {
public:
static BoltRaftServer& Instance() {
static BoltRaftServer server;
return server;
}
DISABLE_COPY(BoltRaftServer);
DISABLE_MOVE(BoltRaftServer);
bool Start(int port, uint64_t node_id, std::string init_peers);
void Stop();
~BoltRaftServer() { Stop(); }
private:
BoltRaftServer() = default;
std::vector<std::thread> threads_{};
bool stopped_ = false;
};
}
71 changes: 3 additions & 68 deletions src/server/bolt_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
* written by botu.wzy
*/
#include "server/bolt_server.h"
#include "bolt_ha/connection.h"
#include "bolt_ha/io_service.h"
#include "bolt_ha/raft_driver.h"
#include "bolt_raft/connection.h"
#include "bolt_raft/io_service.h"
#include "bolt_raft/raft_driver.h"

namespace bolt {
static boost::asio::io_service listener(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);
Expand Down Expand Up @@ -60,69 +60,4 @@ void BoltServer::Stop() {
stopped_ = true;
LOG_INFO() << "bolt server stopped.";
}

std::function protobuf_handler = [](raftpb::Message rpc_msg){
bolt_ha::g_raft_driver->Message(std::move(rpc_msg));
};

void g_apply(uint64_t index, const std::string& log);

static boost::asio::io_service raft_listener(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);
bool BoltRaftServer::Start(lgraph::StateMachine* sm, int port, uint64_t node_id, std::string init_peers) {
std::promise<bool> promise;
std::future<bool> future = promise.get_future();
threads_.emplace_back([port, node_id, init_peers, &promise](){
bool promise_done = false;
try {
std::vector<eraft::Peer> peers;
auto cluster = nlohmann::json::parse(init_peers);
for (const auto& item : cluster) {
eraft::Peer peer;
peer.id_ = item["node_id"].get<int64_t>();
peer.context_ = item.dump();
peers.emplace_back(std::move(peer));
}
bolt_ha::g_id_generator = std::make_shared<bolt_ha::Generator>(
node_id,std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count());
bolt_ha::g_raft_driver = std::make_unique<bolt_ha::RaftDriver> (
bolt::g_apply,
0,
node_id,
peers,
"raftlog");
auto err = bolt_ha::g_raft_driver->Run();
if (err != nullptr) {
LOG_ERROR() << "raft driver failed to run, error: " << err.String();
return;
}
bolt_ha::IOService<bolt_ha::ProtobufConnection, decltype(protobuf_handler)> bolt_raft_service(
raft_listener, port, 1, protobuf_handler);
boost::asio::io_service::work holder(raft_listener);
LOG_INFO() << "bolt raft server run";
promise.set_value(true);
promise_done = true;
raft_listener.run();
} catch (const std::exception& e) {
LOG_WARN() << "bolt raft server expection: " << e.what();
if (!promise_done) {
promise.set_value(false);
}
}
});
return future.get();
}

void BoltRaftServer::Stop() {
if (stopped_) {
return;
}
raft_listener.stop();
for (auto& t : threads_) {
t.join();
}
stopped_ = true;
LOG_INFO() << "bolt raft server stopped.";
}

} // namespace bolt
22 changes: 0 additions & 22 deletions src/server/bolt_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,4 @@ class BoltServer final {
std::vector<std::thread> threads_;
bool stopped_ = false;
};

class BoltRaftServer final {
public:
static BoltRaftServer& Instance() {
static BoltRaftServer server;
return server;
}
DISABLE_COPY(BoltRaftServer);
DISABLE_MOVE(BoltRaftServer);
bool Start(lgraph::StateMachine* sm, int port, uint64_t node_id, std::string init_peers);
void Stop();
~BoltRaftServer() {Stop();}
lgraph::StateMachine* StateMachine() {
return sm_;
}
private:
BoltRaftServer() = default;
lgraph::StateMachine* sm_ = nullptr;
std::vector<std::thread> threads_;
bool stopped_ = false;
};

} // namespace bolt
6 changes: 3 additions & 3 deletions src/server/lgraph_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
#include "server/state_machine.h"
#include "server/ha_state_machine.h"
#include "server/db_management_client.h"
#include "bolt_ha/raft_driver.h"
#include "server/bolt_server.h"
#include "server/bolt_raft_server.h"

#ifndef _WIN32
#include "brpc/server.h"
Expand Down Expand Up @@ -320,8 +321,7 @@ int LGraphServer::Start() {
return -1;
}
if (config_->bolt_raft_port > 0) {
if (!bolt::BoltRaftServer::Instance().Start(
state_machine_.get(), config_->bolt_raft_port,
if (!bolt_raft::BoltRaftServer::Instance().Start(config_->bolt_raft_port,
config_->bolt_raft_node_id, config_->bolt_raft_init_peers)) {
return -1;
}
Expand Down
1 change: 0 additions & 1 deletion src/server/lgraph_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "server/state_machine.h"
#include "restful/server/rest_server.h"
#include "http/http_server.h"
#include "server/bolt_server.h"

#ifndef _WIN32
#include "brpc/server.h"
Expand Down

0 comments on commit 1534b28

Please sign in to comment.