Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add enable witness to leader to raft group #483

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
/runtime
/output
/test/output

/bld
# Ignore hidden files
.*
*.swp
Expand Down
4 changes: 4 additions & 0 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ void FSMCaller::do_stop_following(const LeaderChangeContext& stop_following_cont
_fsm->on_stop_following(stop_following_context);
}

void FSMCaller::on_pre_send_snapshot(const PeerId& peer_id) {
_fsm->on_pre_send_snapshot(peer_id);
}

void FSMCaller::describe(std::ostream &os, bool use_html) {
const char* newline = (use_html) ? "<br>" : "\n";
TaskType cur_task = _cur_task;
Expand Down
1 change: 1 addition & 0 deletions src/braft/fsm_caller.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
int on_leader_start(int64_t term, int64_t lease_epoch);
int on_start_following(const LeaderChangeContext& start_following_context);
int on_stop_following(const LeaderChangeContext& stop_following_context);
void on_pre_send_snapshot(const PeerId& peer_id);
BRAFT_MOCK int on_error(const Error& e);
int64_t last_applied_index() const {
return _last_applied_index.load(butil::memory_order_relaxed);
Expand Down
6 changes: 6 additions & 0 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ int NodeImpl::init(const NodeOptions& options) {
rg_options.election_timeout_ms = _options.election_timeout_ms;
rg_options.log_manager = _log_manager;
rg_options.ballot_box = _ballot_box;
rg_options.send_data_to_witness = _options.send_data_to_witness;
rg_options.node = this;
rg_options.snapshot_throttle = _options.snapshot_throttle
? _options.snapshot_throttle->get()
Expand Down Expand Up @@ -1366,6 +1367,11 @@ void NodeImpl::on_error(const Error& e) {
lck.unlock();
}

void NodeImpl::pre_send_snapshot(const PeerId& peer_id) {
_fsm_caller->on_pre_send_snapshot(peer_id);
}


void NodeImpl::handle_vote_timeout() {
std::unique_lock<raft_mutex_t> lck(_mutex);

Expand Down
3 changes: 3 additions & 0 deletions src/braft/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ friend class VoteBallotCtx;

bool disable_cli() const { return _options.disable_cli; }
bool is_witness() const { return _options.witness; }

// Called when leader start to send snapshot to remote peer
void pre_send_snapshot(const PeerId& peer_id);
private:
friend class butil::RefCountedThreadSafe<NodeImpl>;

Expand Down
1 change: 1 addition & 0 deletions src/braft/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ void StateMachine::on_configuration_committed(const Configuration& conf, int64_t

void StateMachine::on_stop_following(const LeaderChangeContext&) {}
void StateMachine::on_start_following(const LeaderChangeContext&) {}
void StateMachine::on_pre_send_snapshot(const PeerId& peer_id) {}

BootstrapOptions::BootstrapOptions()
: last_log_index(0)
Expand Down
6 changes: 6 additions & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ class StateMachine {
// the very leader whom the follower starts to follow.
// User can reset the node's information as it starts to follow some leader.
virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);

// Invoked when the leader start to send snapshot to |peer_id|
// Default: Do nothing
virtual void on_pre_send_snapshot(const PeerId& peer_id);

};

enum State {
Expand Down Expand Up @@ -604,6 +609,7 @@ struct NodeOptions {
// Default: false
bool witness = false;
// Construct a default instance
bool send_data_to_witness = true;
NodeOptions();

int get_catchup_timeout_ms();
Expand Down
21 changes: 20 additions & 1 deletion src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ DEFINE_int32(raft_retry_replicate_interval_ms, 1000,
"Interval of retry to append entries or install snapshot");
BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms,
brpc::PositiveInteger);
DEFINE_bool(raft_use_conn_pool, false, "use conn pool for raft replicator");
BRPC_VALIDATE_GFLAG(raft_use_conn_pool, ::brpc::PassValidate);

DECLARE_bool(raft_enable_witness_to_leader);
DECLARE_int64(raft_append_entry_high_lat_us);
Expand Down Expand Up @@ -115,6 +117,9 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
Replicator* r = new Replicator();
brpc::ChannelOptions channel_opt;
channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms;
if (FLAGS_raft_use_conn_pool) {
channel_opt.connection_type = "pooled";
}
channel_opt.timeout_ms = -1; // We don't need RPC timeout
if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) {
LOG(ERROR) << "Fail to init sending channel"
Expand Down Expand Up @@ -630,6 +635,11 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) {
} else {
CHECK(entry->type != ENTRY_TYPE_CONFIGURATION) << "log_index=" << log_index;
}
// 优先使用group级别的配置
if (is_witness() && !_options.send_data_to_witness) {
entry->Release();
return 0;
}
if (!is_witness() || FLAGS_raft_enable_witness_to_leader) {
em->set_data_len(entry->data.length());
data->append(entry->data);
Expand Down Expand Up @@ -790,6 +800,8 @@ void Replicator::_install_snapshot() {
add_one_more_task(true)) {
return _block(butil::gettimeofday_us(), EBUSY);
}

node_impl->pre_send_snapshot(_options.peer_id);

// pre-set replicator state to INSTALLING_SNAPSHOT, so replicator could be
// blocked if something is wrong, such as throttled for a period of time
Expand Down Expand Up @@ -1382,6 +1394,7 @@ int ReplicatorGroup::init(const NodeId& node_id, const ReplicatorGroupOptions& o
_election_timeout_ms = options.election_timeout_ms;
_common_options.log_manager = options.log_manager;
_common_options.ballot_box = options.ballot_box;
_common_options.send_data_to_witness = options.send_data_to_witness;
_common_options.node = options.node;
_common_options.term = 0;
_common_options.group_id = node_id.group_id;
Expand Down Expand Up @@ -1549,12 +1562,18 @@ int ReplicatorGroup::find_the_next_candidate(
}
const int64_t next_index = Replicator::get_next_index(iter->id_and_status.id);
const int consecutive_error_times = Replicator::get_consecutive_error_times(iter->id_and_status.id);
if (consecutive_error_times == 0 && next_index > max_index && !iter->peer_id.is_witness()) {
if (consecutive_error_times == 0 && next_index > max_index) {
max_index = next_index;
if (peer_id) {
*peer_id = iter->peer_id;
}
}
// transfer leadership to the non witness peer priority.
if (consecutive_error_times == 0 && next_index == max_index) {
if (peer_id && peer_id->is_witness()) {
*peer_id = iter->peer_id;
}
}
}
if (max_index == 0) {
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/braft/replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ReplicatorOptions {
ReplicatorOptions();
int* dynamic_heartbeat_timeout_ms;
int* election_timeout_ms;
bool send_data_to_witness;
GroupId group_id;
PeerId server_id;
PeerId peer_id;
Expand Down Expand Up @@ -267,6 +268,7 @@ struct ReplicatorGroupOptions {
ReplicatorGroupOptions();
int heartbeat_timeout_ms;
int election_timeout_ms;
bool send_data_to_witness = true;
LogManager* log_manager;
BallotBox* ballot_box;
NodeImpl* node;
Expand Down
Loading