Skip to content

Commit

Permalink
Fix bugs in streaming mode (#568)
Browse files Browse the repository at this point in the history
* If connection is suddenly closed before handling the response,
queue may become empty when a callback function is invoked.
It should be handled safely.

* Added more logs regarding streaming mode.
  • Loading branch information
greensky00 authored Feb 15, 2025
1 parent ad1f36f commit 2724344
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 24 deletions.
10 changes: 10 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public:
, network_recoveries_(0)
, manual_free_(false)
, rpc_errs_(0)
, stale_rpc_responses_(0)
, last_sent_idx_(0)
, cnt_not_applied_(0)
, leave_requested_(false)
Expand Down Expand Up @@ -260,6 +261,10 @@ public:
void inc_rpc_errs() { rpc_errs_.fetch_add(1); }
int32 get_rpc_errs() { return rpc_errs_; }

void reset_stale_rpc_responses() { stale_rpc_responses_ = 0; }
int32_t inc_stale_rpc_responses() { return stale_rpc_responses_.fetch_add(1); }
int32_t get_stale_rpc_responses() { return stale_rpc_responses_; }

void set_last_sent_idx(ulong to) { last_sent_idx_ = to; }
ulong get_last_sent_idx() const { return last_sent_idx_.load(); }

Expand Down Expand Up @@ -490,6 +495,11 @@ private:
*/
std::atomic<int32> rpc_errs_;

/**
* For tracking stale RPC responses from the old client.
*/
std::atomic<int32> stale_rpc_responses_;

/**
* Start log index of the last sent append entries request.
*/
Expand Down
20 changes: 15 additions & 5 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ class asio_rpc_client
cs_new<pending_req_pkg>(req, when_done, send_timeout_ms));
immediate_action_needed = (pending_write_reqs_.size() == 1);
p_db("start to send msg to peer %d, start_log_idx: %" PRIu64 ", "
"size: %" PRIu64 ", pending write reqs: %" PRIu64 "",
"size: %" PRIu64 ", pending write reqs: %" PRIu64 "",
req->get_dst(), req->get_last_log_idx(),
req->log_entries().size(), pending_write_reqs_.size());
}
Expand Down Expand Up @@ -1896,12 +1896,17 @@ class asio_rpc_client
ptr<pending_req_pkg> next_req_pkg{nullptr};
{
auto_lock(pending_write_reqs_lock_);
pending_write_reqs_.pop_front();
// NOTE:
// The queue can be empty even though there was no `pop_front`,
// due to `close_socket()` when connection is suddenly closed.
if (pending_write_reqs_.size()) {
pending_write_reqs_.pop_front();
}
if (pending_write_reqs_.size() > 0) {
next_req_pkg = *pending_write_reqs_.begin();
p_db("trigger next write, start_log_idx: %" PRIu64 ", "
"pending write reqs: %" PRIu64 "",
next_req_pkg->req_->get_last_log_idx(),
next_req_pkg->req_->get_last_log_idx(),
pending_write_reqs_.size());
}
}
Expand All @@ -1922,12 +1927,17 @@ class asio_rpc_client
ptr<pending_req_pkg> next_req_pkg{nullptr};
{
auto_lock(pending_read_reqs_lock_);
pending_read_reqs_.pop_front();
// NOTE:
// The queue can be empty even though there was no `pop_front`,
// due to `close_socket()` when connection is suddenly closed.
if (pending_read_reqs_.size()) {
pending_read_reqs_.pop_front();
}
if (pending_read_reqs_.size() > 0) {
next_req_pkg = *pending_read_reqs_.begin();
p_db("trigger next read, start_log_idx: %" PRIu64 ", "
"pending read reqs: %" PRIu64 "",
next_req_pkg->req_->get_last_log_idx(),
next_req_pkg->req_->get_last_log_idx(),
pending_read_reqs_.size());
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,12 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
resp.get_next_idx(), p->get_next_log_idx() );

// disable stream
uint64_t last_streamed_log_idx = p->get_last_streamed_log_idx();
p->reset_stream();
if (last_streamed_log_idx) {
p_in("stop stream mode for peer %d at idx: %" PRIu64 "",
p->get_id(), last_streamed_log_idx);
}
}

if (!config_changing_ && p->get_config().is_new_joiner()) {
Expand Down
61 changes: 44 additions & 17 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include "peer.hxx"

#include "debugging_options.hxx"
#include "raft_server.hxx"
#include "tracer.hxx"

#include <unordered_set>
Expand Down Expand Up @@ -64,7 +65,7 @@ void peer::send_req( ptr<peer> myself,
req_size_bytes += entry->get_buf_ptr()->size();
}
}

rpc_handler h = (rpc_handler)std::bind
( &peer::handle_rpc_result,
this,
Expand Down Expand Up @@ -118,18 +119,27 @@ void peer::handle_rpc_result( ptr<peer> myself,
uint64_t cur_rpc_id = rpc_ ? rpc_->get_id() : 0;
uint64_t given_rpc_id = my_rpc_client ? my_rpc_client->get_id() : 0;
if (cur_rpc_id != given_rpc_id) {
p_wn( "[EDGE CASE] got stale RPC response from %d: "
"current %p (%" PRIu64 "), from parameter %p (%" PRIu64 "). "
"will ignore this response",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );
int32_t stale_resps = inc_stale_rpc_responses();
int32_t limit = raft_server::get_raft_limits().response_limit_;
if (stale_resps < limit) {
p_wn( "[EDGE CASE] got stale RPC response from %d: "
"current %p (%" PRIu64 "), from parameter %p (%" PRIu64 "). "
"will ignore this response",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );
} else if (stale_resps == limit) {
p_wn( "[EDGE CASE] too verbose stale RPC response from peer %d, "
"will suppress it from now", config_->get_id() );
}

} else {
// WARNING:
// `set_free()` should be protected by `rpc_protector_`, otherwise
// it may free the peer even though new RPC client is already created.
reset_stale_rpc_responses();
bytes_in_flight_sub(req_size_bytes);
try_set_free(req->get_type(), streaming);
}
Expand Down Expand Up @@ -167,22 +177,39 @@ void peer::handle_rpc_result( ptr<peer> myself,
uint64_t given_rpc_id = my_rpc_client ? my_rpc_client->get_id() : 0;
if (cur_rpc_id == given_rpc_id) {
rpc_.reset();
uint64_t last_streamed_log_idx = get_last_streamed_log_idx();
reset_stream();
if (last_streamed_log_idx) {
p_in("stop stream mode for peer %d at idx: %" PRIu64 "",
config_->get_id(), last_streamed_log_idx);
}
reset_stale_rpc_responses();
reset_bytes_in_flight();
try_set_free(req->get_type(), streaming);
} else {
// WARNING (MONSTOR-9378):
// RPC client has been reset before this request returns
// error. Those two are different instances and we
// SHOULD NOT reset the new one.
p_wn( "[EDGE CASE] RPC for %d has been reset before "
"returning error: current %p (%" PRIu64
"), from parameter %p (%" PRIu64 ")",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );

// NOTE: In streaming mode, there can be lots of below errors
// at the same time. We should avoid verbose logs.

int32_t stale_resps = inc_stale_rpc_responses();
int32_t limit = raft_server::get_raft_limits().response_limit_;
if (stale_resps < limit) {
p_wn( "[EDGE CASE] RPC for %d has been reset before "
"returning error: current %p (%" PRIu64
"), from parameter %p (%" PRIu64 ")",
config_->get_id(),
rpc_.get(),
cur_rpc_id,
my_rpc_client.get(),
given_rpc_id );
} else if (stale_resps == limit) {
p_wn( "[EDGE CASE] too verbose stale RPC response from peer %d, "
"will suppress it from now", config_->get_id() );
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ void raft_server::apply_and_log_current_params() {
"leadership transfer wait time %d, "
"grace period of lagging state machine %d, "
"snapshot IO: %s, "
"parallel log appending: %s",
"parallel log appending: %s, "
"streaming mode max log gap %d, max bytes %" PRIu64,
params->election_timeout_lower_bound_,
params->election_timeout_upper_bound_,
params->heart_beat_interval_,
Expand All @@ -425,7 +426,9 @@ void raft_server::apply_and_log_current_params() {
params->leadership_transfer_min_wait_time_,
params->grace_period_of_lagging_state_machine_,
params->use_bg_thread_for_snapshot_io_ ? "ASYNC" : "BLOCKING",
params->parallel_log_appending_ ? "ON" : "OFF" );
params->parallel_log_appending_ ? "ON" : "OFF",
params->max_log_gap_in_stream_,
params->max_bytes_in_flight_in_stream_ );

status_check_timer_.set_duration_ms(params->heart_beat_interval_);
status_check_timer_.reset();
Expand Down

0 comments on commit 2724344

Please sign in to comment.