Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in streaming mode #568

Merged
merged 1 commit into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public:
, network_recoveries_(0)
, manual_free_(false)
, rpc_errs_(0)
, stale_rpc_responses_(0)
, last_sent_idx_(0)
, cnt_not_applied_(0)
, leave_requested_(false)
Expand Down Expand Up @@ -260,6 +261,10 @@ public:
void inc_rpc_errs() { rpc_errs_.fetch_add(1); }
int32 get_rpc_errs() { return rpc_errs_; }

void reset_stale_rpc_responses() { stale_rpc_responses_ = 0; }
int32_t inc_stale_rpc_responses() { return stale_rpc_responses_.fetch_add(1); }
int32_t get_stale_rpc_responses() { return stale_rpc_responses_; }

void set_last_sent_idx(ulong to) { last_sent_idx_ = to; }
ulong get_last_sent_idx() const { return last_sent_idx_.load(); }

Expand Down Expand Up @@ -490,6 +495,11 @@ private:
*/
std::atomic<int32> rpc_errs_;

/**
* For tracking stale RPC responses from the old client.
*/
std::atomic<int32> stale_rpc_responses_;

/**
* Start log index of the last sent append entries request.
*/
Expand Down
20 changes: 15 additions & 5 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ class asio_rpc_client
cs_new<pending_req_pkg>(req, when_done, send_timeout_ms));
immediate_action_needed = (pending_write_reqs_.size() == 1);
p_db("start to send msg to peer %d, start_log_idx: %" PRIu64 ", "
"size: %" PRIu64 ", pending write reqs: %" PRIu64 "",
"size: %" PRIu64 ", pending write reqs: %" PRIu64 "",
req->get_dst(), req->get_last_log_idx(),
req->log_entries().size(), pending_write_reqs_.size());
}
Expand Down Expand Up @@ -1896,12 +1896,17 @@ class asio_rpc_client
ptr<pending_req_pkg> next_req_pkg{nullptr};
{
auto_lock(pending_write_reqs_lock_);
pending_write_reqs_.pop_front();
// NOTE:
// The queue can be empty even though there was no `pop_front`,
// due to `close_socket()` when connection is suddenly closed.
if (pending_write_reqs_.size()) {
pending_write_reqs_.pop_front();
}
if (pending_write_reqs_.size() > 0) {
next_req_pkg = *pending_write_reqs_.begin();
p_db("trigger next write, start_log_idx: %" PRIu64 ", "
"pending write reqs: %" PRIu64 "",
next_req_pkg->req_->get_last_log_idx(),
next_req_pkg->req_->get_last_log_idx(),
pending_write_reqs_.size());
}
}
Expand All @@ -1922,12 +1927,17 @@ class asio_rpc_client
ptr<pending_req_pkg> next_req_pkg{nullptr};
{
auto_lock(pending_read_reqs_lock_);
pending_read_reqs_.pop_front();
// NOTE:
// The queue can be empty even though there was no `pop_front`,
// due to `close_socket()` when connection is suddenly closed.
if (pending_read_reqs_.size()) {
pending_read_reqs_.pop_front();
}
if (pending_read_reqs_.size() > 0) {
next_req_pkg = *pending_read_reqs_.begin();
p_db("trigger next read, start_log_idx: %" PRIu64 ", "
"pending read reqs: %" PRIu64 "",
next_req_pkg->req_->get_last_log_idx(),
next_req_pkg->req_->get_last_log_idx(),
pending_read_reqs_.size());
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,12 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
resp.get_next_idx(), p->get_next_log_idx() );

// disable stream
uint64_t last_streamed_log_idx = p->get_last_streamed_log_idx();
p->reset_stream();
if (last_streamed_log_idx) {
p_in("stop stream mode for peer %d at idx: %" PRIu64 "",
p->get_id(), last_streamed_log_idx);
}
}

if (!config_changing_ && p->get_config().is_new_joiner()) {
Expand Down
61 changes: 44 additions & 17 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include "peer.hxx"

#include "debugging_options.hxx"
#include "raft_server.hxx"
#include "tracer.hxx"

#include <unordered_set>
Expand Down Expand Up @@ -64,7 +65,7 @@ void peer::send_req( ptr<peer> myself,
req_size_bytes += entry->get_buf_ptr()->size();
}
}

rpc_handler h = (rpc_handler)std::bind
( &peer::handle_rpc_result,
this,
Expand Down Expand Up @@ -118,18 +119,27 @@ void peer::handle_rpc_result( ptr<peer> myself,
uint64_t cur_rpc_id = rpc_ ? rpc_->get_id() : 0;
uint64_t given_rpc_id = my_rpc_client ? my_rpc_client->get_id() : 0;
if (cur_rpc_id != given_rpc_id) {
p_wn( "[EDGE CASE] got stale RPC response from %d: "
"current %p (%" PRIu64 "), from parameter %p (%" PRIu64 "). "
"will ignore this response",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );
int32_t stale_resps = inc_stale_rpc_responses();
int32_t limit = raft_server::get_raft_limits().response_limit_;
if (stale_resps < limit) {
p_wn( "[EDGE CASE] got stale RPC response from %d: "
"current %p (%" PRIu64 "), from parameter %p (%" PRIu64 "). "
"will ignore this response",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );
} else if (stale_resps == limit) {
p_wn( "[EDGE CASE] too verbose stale RPC response from peer %d, "
"will suppress it from now", config_->get_id() );
}

} else {
// WARNING:
// `set_free()` should be protected by `rpc_protector_`, otherwise
// it may free the peer even though new RPC client is already created.
reset_stale_rpc_responses();
bytes_in_flight_sub(req_size_bytes);
try_set_free(req->get_type(), streaming);
}
Expand Down Expand Up @@ -167,22 +177,39 @@ void peer::handle_rpc_result( ptr<peer> myself,
uint64_t given_rpc_id = my_rpc_client ? my_rpc_client->get_id() : 0;
if (cur_rpc_id == given_rpc_id) {
rpc_.reset();
uint64_t last_streamed_log_idx = get_last_streamed_log_idx();
reset_stream();
if (last_streamed_log_idx) {
p_in("stop stream mode for peer %d at idx: %" PRIu64 "",
config_->get_id(), last_streamed_log_idx);
}
reset_stale_rpc_responses();
reset_bytes_in_flight();
try_set_free(req->get_type(), streaming);
} else {
// WARNING (MONSTOR-9378):
// RPC client has been reset before this request returns
// error. Those two are different instances and we
// SHOULD NOT reset the new one.
p_wn( "[EDGE CASE] RPC for %d has been reset before "
"returning error: current %p (%" PRIu64
"), from parameter %p (%" PRIu64 ")",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );

// NOTE: In streaming mode, there can be lots of below errors
// at the same time. We should avoid verbose logs.

int32_t stale_resps = inc_stale_rpc_responses();
int32_t limit = raft_server::get_raft_limits().response_limit_;
if (stale_resps < limit) {
p_wn( "[EDGE CASE] RPC for %d has been reset before "
"returning error: current %p (%" PRIu64
"), from parameter %p (%" PRIu64 ")",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );
} else if (stale_resps == limit) {
p_wn( "[EDGE CASE] too verbose stale RPC response from peer %d, "
"will suppress it from now", config_->get_id() );
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ void raft_server::apply_and_log_current_params() {
"leadership transfer wait time %d, "
"grace period of lagging state machine %d, "
"snapshot IO: %s, "
"parallel log appending: %s",
"parallel log appending: %s, "
"streaming mode max log gap %d, max bytes %" PRIu64,
params->election_timeout_lower_bound_,
params->election_timeout_upper_bound_,
params->heart_beat_interval_,
Expand All @@ -425,7 +426,9 @@ void raft_server::apply_and_log_current_params() {
params->leadership_transfer_min_wait_time_,
params->grace_period_of_lagging_state_machine_,
params->use_bg_thread_for_snapshot_io_ ? "ASYNC" : "BLOCKING",
params->parallel_log_appending_ ? "ON" : "OFF" );
params->parallel_log_appending_ ? "ON" : "OFF",
params->max_log_gap_in_stream_,
params->max_bytes_in_flight_in_stream_ );

status_check_timer_.set_duration_ms(params->heart_beat_interval_);
status_check_timer_.reset();
Expand Down