Skip to content

Commit

Permalink
Merge pull request #298 from Edward-xk/master
Browse files Browse the repository at this point in the history
some bugfix from baidu
  • Loading branch information
PFZheng authored Jun 8, 2021
2 parents 154d805 + 2faae65 commit 2c9f611
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 69 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

---

# Overview
An industrial-grade C++ implementation of [RAFT consensus algorithm](https://raft.github.io/) and [replicated state machine](https://en.wikipedia.org/wiki/State_machine_replication) based on [brpc](https://github.com/brpc/brpc). braft is designed and implemented for scenarios demanding for high workload and low overhead of latency, with the consideration for easy-to-understand concepts so that engineers inside Baidu can build their own distributed systems individually and correctly.

It's widely used inside Baidu to build highly-available systems, such as:
Expand All @@ -14,7 +15,7 @@ It's widely used inside Baidu to build highly-available systems, such as:
* Build [brpc](https://github.com/brpc/brpc/blob/master/docs/cn/getting_started.md) which is the main dependency of braft.

* Compile braft with cmake

```shell
$ mkdir bld && cd bld && cmake .. && make
```
Expand All @@ -35,3 +36,6 @@ It's widely used inside Baidu to build highly-available systems, such as:
* [ZAB](./docs/cn/zab_protocol.md)
* [QJM](./docs/cn/qjm.md)

# Discussion

* Add Weixin id ***zhengpf__87*** or ***xiongk_2049*** with a verification message '**braft**', then you will be invited into the discussion group.
2 changes: 2 additions & 0 deletions src/braft/file_system_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ int file_error_to_os_error(butil::File::Error e) {
switch (e) {
case butil::File::FILE_OK:
return 0;
case butil::File::FILE_ERROR_IN_USE:
return EAGAIN;
case butil::File::FILE_ERROR_ACCESS_DENIED:
return EACCES;
case butil::File::FILE_ERROR_EXISTS:
Expand Down
53 changes: 30 additions & 23 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1054,8 +1054,7 @@ void NodeImpl::handle_timeout_now_request(brpc::Controller* controller,
response->set_success(false);
lck.unlock();
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " received handle_timeout_now_request "
"while _current_term="
<< " received handle_timeout_now_request while _current_term="
<< saved_current_term << " didn't match request_term="
<< request->term();
return;
Expand All @@ -1067,9 +1066,8 @@ void NodeImpl::handle_timeout_now_request(brpc::Controller* controller,
response->set_success(false);
lck.unlock();
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " received handle_timeout_now_request "
"while state is " << state2str(saved_state)
<< " at term=" << saved_term;
<< " received handle_timeout_now_request while state is "
<< state2str(saved_state) << " at term=" << saved_term;
return;
}
const butil::EndPoint remote_side = controller->remote_side();
Expand All @@ -1085,7 +1083,7 @@ void NodeImpl::handle_timeout_now_request(brpc::Controller* controller,
response->set_success(true);
// Parallelize Response and election
run_closure_in_bthread(done_guard.release());
elect_self(&lck);
elect_self(&lck, request->old_leader_stepped_down());
// Don't touch any mutable field after this point, it's likely out of the
// critical section
if (lck.owns_lock()) {
Expand Down Expand Up @@ -1409,7 +1407,7 @@ void NodeImpl::retry_vote_on_reserved_peers() {
if (peers.empty()) {
return;
}
request_peers_to_vote(peers, &_vote_ctx.disrupted_leader());
request_peers_to_vote(peers, _vote_ctx.disrupted_leader());
}

struct OnRequestVoteRPCDone : public google::protobuf::Closure {
Expand Down Expand Up @@ -1621,7 +1619,8 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
}

// in lock
void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {
void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck,
bool old_leader_stepped_down) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " term " << _current_term << " start vote and grant vote self";
if (!_conf.contains(_server_id)) {
Expand All @@ -1636,6 +1635,8 @@ void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {
_election_timer.stop();
}
// reset leader_id before vote
const PeerId old_leader = _leader_id;
const int64_t leader_term = _current_term;
PeerId empty_id;
butil::Status status;
status.set_error(ERAFTTIMEDOUT, "A follower's leader_id is reset to NULL "
Expand All @@ -1651,6 +1652,10 @@ void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {
_vote_timer.start();
_pre_vote_ctx.reset(this);
_vote_ctx.init(this, false);
if (old_leader_stepped_down) {
_vote_ctx.set_disrupted_leader(DisruptedLeader(old_leader, leader_term));
_follower_lease.expire();
}

int64_t old_term = _current_term;
// get last_log_id outof node mutex
Expand All @@ -1668,7 +1673,7 @@ void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {

std::set<PeerId> peers;
_conf.list_peers(&peers);
request_peers_to_vote(peers, NULL);
request_peers_to_vote(peers, _vote_ctx.disrupted_leader());

//TODO: outof lock
status = _meta_storage->
Expand All @@ -1686,7 +1691,7 @@ void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck) {
}

void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
const NodeImpl::DisruptedLeader* disrupted_leader) {
const DisruptedLeader& disrupted_leader) {
for (std::set<PeerId>::const_iterator
iter = peers.begin(); iter != peers.end(); ++iter) {
if (*iter == _server_id) {
Expand All @@ -1712,11 +1717,11 @@ void NodeImpl::request_peers_to_vote(const std::set<PeerId>& peers,
done->request.set_last_log_index(_vote_ctx.last_log_id().index);
done->request.set_last_log_term(_vote_ctx.last_log_id().term);

if (disrupted_leader) {
if (disrupted_leader.peer_id != ANY_PEER) {
done->request.mutable_disrupted_leader()
->set_peer_id(disrupted_leader->peer_id.to_string());
->set_peer_id(disrupted_leader.peer_id.to_string());
done->request.mutable_disrupted_leader()
->set_term(disrupted_leader->term);
->set_term(disrupted_leader.term);
}

RaftService_Stub stub(&channel);
Expand Down Expand Up @@ -2431,16 +2436,18 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
response->set_term(_current_term);
response->set_last_log_index(last_index);
lck.unlock();
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " reject term_unmatched AppendEntries from "
<< request->server_id()
<< " in term " << request->term()
<< " prev_log_index " << request->prev_log_index()
<< " prev_log_term " << request->prev_log_term()
<< " local_prev_log_term " << local_prev_log_term
<< " last_log_index " << last_index
<< " entries_size " << request->entries_size()
<< " from_append_entries_cache: " << from_append_entries_cache;
if (local_prev_log_term != 0) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " reject term_unmatched AppendEntries from "
<< request->server_id()
<< " in term " << request->term()
<< " prev_log_index " << request->prev_log_index()
<< " prev_log_term " << request->prev_log_term()
<< " local_prev_log_term " << local_prev_log_term
<< " last_log_index " << last_index
<< " entries_size " << request->entries_size()
<< " from_append_entries_cache: " << from_append_entries_cache;
}
return;
}

Expand Down
7 changes: 5 additions & 2 deletions src/braft/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ friend class butil::RefCountedThreadSafe<NodeImpl>;
void pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered);

// elect self to candidate
void elect_self(std::unique_lock<raft_mutex_t>* lck);
// If old leader has already stepped down, the candidate can vote without
// taking account of leader lease
void elect_self(std::unique_lock<raft_mutex_t>* lck,
bool old_leader_stepped_down = false);

// grant self a vote
class VoteBallotCtx;
Expand Down Expand Up @@ -321,7 +324,7 @@ friend class butil::RefCountedThreadSafe<NodeImpl>;
void retry_vote_on_reserved_peers();
struct DisruptedLeader;
void request_peers_to_vote(const std::set<PeerId>& peers,
const DisruptedLeader* disrupted_leader);
const DisruptedLeader& disrupted_leader);

private:

Expand Down
1 change: 1 addition & 0 deletions src/braft/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ message TimeoutNowRequest {
required string server_id = 2;
required string peer_id = 3;
required int64 term = 4;
optional bool old_leader_stepped_down = 5;
}

message TimeoutNowResponse {
Expand Down
22 changes: 12 additions & 10 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
// Wang,Yao([email protected])
// Xiong,Kai([email protected])

#include "braft/replicator.h"

#include <gflags/gflags.h> // DEFINE_int32
#include <butil/unique_ptr.h> // std::unique_ptr
#include <butil/time.h> // butil::gettimeofday_us
#include <brpc/controller.h> // brpc::Controller
#include <brpc/reloadable_flags.h> // BRPC_VALIDATE_GFLAG

#include "braft/replicator.h"
#include "braft/node.h" // NodeImpl
#include "braft/ballot_box.h" // BallotBox
#include "braft/log_entry.h" // LogEntry
Expand Down Expand Up @@ -1073,16 +1071,17 @@ void Replicator::_reset_next_index() {
}
}

void Replicator::_send_timeout_now(bool unlock_id, bool stop_after_finish,
void Replicator::_send_timeout_now(bool unlock_id, bool old_leader_stepped_down,
int timeout_ms) {
TimeoutNowRequest* request = new TimeoutNowRequest;
TimeoutNowResponse* response = new TimeoutNowResponse;
request->set_term(_options.term);
request->set_group_id(_options.group_id);
request->set_server_id(_options.server_id.to_string());
request->set_peer_id(_options.peer_id.to_string());
request->set_old_leader_stepped_down(old_leader_stepped_down);
brpc::Controller* cntl = new brpc::Controller;
if (!stop_after_finish) {
if (!old_leader_stepped_down) {
// This RPC is issued by transfer_leadership, save this call_id so that
// the RPC can be cancelled by stop.
_timeout_now_in_fly = cntl->call_id();
Expand All @@ -1094,7 +1093,7 @@ void Replicator::_send_timeout_now(bool unlock_id, bool stop_after_finish,
RaftService_Stub stub(&_sending_channel);
::google::protobuf::Closure* done = brpc::NewCallback(
_on_timeout_now_returned, _id.value, cntl, request, response,
stop_after_finish);
old_leader_stepped_down);
stub.timeout_now(cntl, request, response, done);
if (unlock_id) {
CHECK_EQ(0, bthread_id_unlock(_id));
Expand All @@ -1105,7 +1104,7 @@ void Replicator::_on_timeout_now_returned(
ReplicatorId id, brpc::Controller* cntl,
TimeoutNowRequest* request,
TimeoutNowResponse* response,
bool stop_after_finish) {
bool old_leader_stepped_down) {
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<TimeoutNowRequest> req_guard(request);
std::unique_ptr<TimeoutNowResponse> res_guard(response);
Expand All @@ -1124,7 +1123,7 @@ void Replicator::_on_timeout_now_returned(
ss << " fail : " << cntl->ErrorText();
BRAFT_VLOG << ss.str();

if (stop_after_finish) {
if (old_leader_stepped_down) {
r->_notify_on_caught_up(ESTOP, true);
r->_destroy();
} else {
Expand All @@ -1149,7 +1148,7 @@ void Replicator::_on_timeout_now_returned(
node_impl->Release();
return;
}
if (stop_after_finish) {
if (old_leader_stepped_down) {
r->_notify_on_caught_up(ESTOP, true);
r->_destroy();
} else {
Expand Down Expand Up @@ -1272,7 +1271,7 @@ void Replicator::_describe(std::ostream& os, bool use_html) {
os << "idle";
break;
case BLOCKING:
os << "blocking consecutive_error_times=" << consecutive_error_times;
os << "blocking";
break;
case APPENDING_ENTRIES:
os << "appending [" << st.first_log_index << ", " << st.last_log_index << ']';
Expand All @@ -1282,6 +1281,9 @@ void Replicator::_describe(std::ostream& os, bool use_html) {
<< ", " << st.last_term_included << '}';
break;
}
if (consecutive_error_times != 0) {
os << " consecutive_error_times=" << consecutive_error_times;
}
os << " hc=" << heartbeat_counter << " ac=" << append_entries_counter << " ic=" << install_snapshot_counter << new_line;
}

Expand Down
4 changes: 2 additions & 2 deletions src/braft/replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
void _block(long start_time_us, int error_code);
void _install_snapshot();
void _start_heartbeat_timer(long start_time_us);
void _send_timeout_now(bool unlock_id, bool stop_after_finish,
void _send_timeout_now(bool unlock_id, bool old_leader_stepped_down,
int timeout_ms = -1);
int _transfer_leadership(int64_t log_index);
void _cancel_append_entries_rpcs();
Expand All @@ -186,7 +186,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
ReplicatorId id, brpc::Controller* cntl,
TimeoutNowRequest* request,
TimeoutNowResponse* response,
bool stop_after_finish);
bool old_leader_stepped_down);

static void _on_timedout(void* arg);
static void* _send_heartbeat(void* arg);
Expand Down
54 changes: 29 additions & 25 deletions src/braft/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,42 +185,45 @@ LocalSnapshotWriter::~LocalSnapshotWriter() {
int LocalSnapshotWriter::init() {
butil::File::Error e;
if (!_fs->create_directory(_path, &e, false)) {
set_error(EIO, "CreateDirectory failed, path: %s", _path.c_str());
LOG(ERROR) << "Fail to create directory " << _path << ", " << e;
set_error(EIO, "CreateDirectory failed with path: %s", _path.c_str());
return EIO;
}
std::string meta_path = _path + "/" BRAFT_SNAPSHOT_META_FILE;
if (_fs->path_exists(meta_path) &&
_meta_table.load_from_file(_fs, meta_path) != 0) {
LOG(ERROR) << "Fail to load meta from " << meta_path;
set_error(EIO, "Fail to load metatable from %s", meta_path.c_str());
return EIO;
}

// remove file if meta_path not exist or it's not in _meta_table
// to avoid dirty data
{
std::vector<std::string> to_remove;
DirReader* dir_reader = _fs->directory_reader(_path);
if (!dir_reader->is_valid()) {
LOG(WARNING) << "directory reader failed, maybe NOEXIST or PERMISSION,"
" path: " << _path;
delete dir_reader;
return EIO;
}
while (dir_reader->next()) {
std::string filename = dir_reader->name();
if (filename != BRAFT_SNAPSHOT_META_FILE) {
if (get_file_meta(filename, NULL) != 0) {
to_remove.push_back(filename);
}
}
}
delete dir_reader;
for (size_t i = 0; i < to_remove.size(); ++i) {
std::string file_path = _path + "/" + to_remove[i];
_fs->delete_file(file_path, false);
LOG(WARNING) << "Snapshot file exist but meta not found so delete it,"
" path: " << file_path;
}
std::vector<std::string> to_remove;
DirReader* dir_reader = _fs->directory_reader(_path);
if (!dir_reader->is_valid()) {
LOG(ERROR) << "Invalid directory reader, maybe NOEXIST or PERMISSION,"
<< " path: " << _path;
set_error(EIO, "Invalid directory reader in path: %s", _path.c_str());
delete dir_reader;
return EIO;
}
while (dir_reader->next()) {
std::string filename = dir_reader->name();
if (filename != BRAFT_SNAPSHOT_META_FILE) {
if (get_file_meta(filename, NULL) != 0) {
to_remove.push_back(filename);
}
}
}
delete dir_reader;
for (size_t i = 0; i < to_remove.size(); ++i) {
std::string file_path = _path + "/" + to_remove[i];
_fs->delete_file(file_path, false);
LOG(WARNING) << "Snapshot file exist but meta not found so delete it,"
<< " path: " << file_path;
}
}

return 0;
Expand Down Expand Up @@ -559,7 +562,8 @@ SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) {

writer = new LocalSnapshotWriter(snapshot_path, _fs.get());
if (writer->init() != 0) {
LOG(ERROR) << "Fail to init writer, path: " << snapshot_path;
LOG(ERROR) << "Fail to init writer in path " << snapshot_path
<< ", " << *writer;
delete writer;
writer = NULL;
break;
Expand Down
2 changes: 1 addition & 1 deletion src/braft/snapshot_throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ size_t ThroughputSnapshotThrottle::throttled_by_throughput(int64_t bytes) {
1 * 1000 * 1000 / _check_cycle) {
// if time interval is less than or equal to a cycle, read more data
// to make full use of the throughput of current cycle.
available_size = limit_per_cycle - _cur_throughput_bytes;
available_size = limit_per_cycle > _cur_throughput_bytes ? limit_per_cycle - _cur_throughput_bytes : 0;
_cur_throughput_bytes = limit_per_cycle;
} else {
// otherwise, read the data in the next cycle.
Expand Down
Loading

0 comments on commit 2c9f611

Please sign in to comment.