From 99265893279627587ffb7702168dbe3fb0f577b3 Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Fri, 14 Feb 2025 23:33:17 -0800 Subject: [PATCH] Fix bugs in streaming mode * If connection is suddenly closed before handling the response, queue may become empty when a callback function is invoked. It should be handled safely. * Added more logs regarding streaming mode. --- include/libnuraft/peer.hxx | 10 ++++++ src/asio_service.cxx | 20 +++++++++--- src/handle_append_entries.cxx | 5 +++ src/peer.cxx | 61 +++++++++++++++++++++++++---------- src/raft_server.cxx | 7 ++-- 5 files changed, 79 insertions(+), 24 deletions(-) diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index de57f756..ddae49d4 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -66,6 +66,7 @@ public: , network_recoveries_(0) , manual_free_(false) , rpc_errs_(0) + , stale_rpc_responses_(0) , last_sent_idx_(0) , cnt_not_applied_(0) , leave_requested_(false) @@ -260,6 +261,10 @@ public: void inc_rpc_errs() { rpc_errs_.fetch_add(1); } int32 get_rpc_errs() { return rpc_errs_; } + void reset_stale_rpc_responses() { stale_rpc_responses_ = 0; } + int32_t inc_stale_rpc_responses() { return stale_rpc_responses_.fetch_add(1); } + int32_t get_stale_rpc_responses() { return stale_rpc_responses_; } + void set_last_sent_idx(ulong to) { last_sent_idx_ = to; } ulong get_last_sent_idx() const { return last_sent_idx_.load(); } @@ -490,6 +495,11 @@ private: */ std::atomic rpc_errs_; + /** + * For tracking stale RPC responses from the old client. + */ + std::atomic stale_rpc_responses_; + /** * Start log index of the last sent append entries request. */ diff --git a/src/asio_service.cxx b/src/asio_service.cxx index 461f786f..386f4bf6 100644 --- a/src/asio_service.cxx +++ b/src/asio_service.cxx @@ -1174,7 +1174,7 @@ class asio_rpc_client cs_new(req, when_done, send_timeout_ms)); immediate_action_needed = (pending_write_reqs_.size() == 1); p_db("start to send msg to peer %d, start_log_idx: %" PRIu64 ", " - "size: %" PRIu64 ", pending write reqs: %" PRIu64 "", + "size: %" PRIu64 ", pending write reqs: %" PRIu64 "", req->get_dst(), req->get_last_log_idx(), req->log_entries().size(), pending_write_reqs_.size()); } @@ -1896,12 +1896,17 @@ class asio_rpc_client ptr next_req_pkg{nullptr}; { auto_lock(pending_write_reqs_lock_); - pending_write_reqs_.pop_front(); + // NOTE: + // The queue can be empty even though there was no `pop_front`, + // due to `close_socket()` when connection is suddenly closed. + if (pending_write_reqs_.size()) { + pending_write_reqs_.pop_front(); + } if (pending_write_reqs_.size() > 0) { next_req_pkg = *pending_write_reqs_.begin(); p_db("trigger next write, start_log_idx: %" PRIu64 ", " "pending write reqs: %" PRIu64 "", - next_req_pkg->req_->get_last_log_idx(), + next_req_pkg->req_->get_last_log_idx(), pending_write_reqs_.size()); } } @@ -1922,12 +1927,17 @@ class asio_rpc_client ptr next_req_pkg{nullptr}; { auto_lock(pending_read_reqs_lock_); - pending_read_reqs_.pop_front(); + // NOTE: + // The queue can be empty even though there was no `pop_front`, + // due to `close_socket()` when connection is suddenly closed. + if (pending_read_reqs_.size()) { + pending_read_reqs_.pop_front(); + } if (pending_read_reqs_.size() > 0) { next_req_pkg = *pending_read_reqs_.begin(); p_db("trigger next read, start_log_idx: %" PRIu64 ", " "pending read reqs: %" PRIu64 "", - next_req_pkg->req_->get_last_log_idx(), + next_req_pkg->req_->get_last_log_idx(), pending_read_reqs_.size()); } } diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index 59de3a7f..b1442c4f 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -1166,7 +1166,12 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) { resp.get_next_idx(), p->get_next_log_idx() ); // disable stream + uint64_t last_streamed_log_idx = p->get_last_streamed_log_idx(); p->reset_stream(); + if (last_streamed_log_idx) { + p_in("stop stream mode for peer %d at idx: %" PRIu64 "", + p->get_id(), last_streamed_log_idx); + } } if (!config_changing_ && p->get_config().is_new_joiner()) { diff --git a/src/peer.cxx b/src/peer.cxx index ec91104b..cf4f96b0 100644 --- a/src/peer.cxx +++ b/src/peer.cxx @@ -21,6 +21,7 @@ limitations under the License. #include "peer.hxx" #include "debugging_options.hxx" +#include "raft_server.hxx" #include "tracer.hxx" #include @@ -64,7 +65,7 @@ void peer::send_req( ptr myself, req_size_bytes += entry->get_buf_ptr()->size(); } } - + rpc_handler h = (rpc_handler)std::bind ( &peer::handle_rpc_result, this, @@ -118,18 +119,27 @@ void peer::handle_rpc_result( ptr myself, uint64_t cur_rpc_id = rpc_ ? rpc_->get_id() : 0; uint64_t given_rpc_id = my_rpc_client ? my_rpc_client->get_id() : 0; if (cur_rpc_id != given_rpc_id) { - p_wn( "[EDGE CASE] got stale RPC response from %d: " - "current %p (%" PRIu64 "), from parameter %p (%" PRIu64 "). " - "will ignore this response", - config_->get_id(), - rpc_.get(), - cur_rpc_id, - my_rpc_client.get(), - given_rpc_id ); + int32_t stale_resps = inc_stale_rpc_responses(); + int32_t limit = raft_server::get_raft_limits().response_limit_; + if (stale_resps < limit) { + p_wn( "[EDGE CASE] got stale RPC response from %d: " + "current %p (%" PRIu64 "), from parameter %p (%" PRIu64 "). " + "will ignore this response", + config_->get_id(), + rpc_.get(), + cur_rpc_id, + my_rpc_client.get(), + given_rpc_id ); + } else if (stale_resps == limit) { + p_wn( "[EDGE CASE] too verbose stale RPC response from peer %d, " + "will suppress it from now", config_->get_id() ); + } + } else { // WARNING: // `set_free()` should be protected by `rpc_protector_`, otherwise // it may free the peer even though new RPC client is already created. + reset_stale_rpc_responses(); bytes_in_flight_sub(req_size_bytes); try_set_free(req->get_type(), streaming); } @@ -167,7 +177,13 @@ void peer::handle_rpc_result( ptr myself, uint64_t given_rpc_id = my_rpc_client ? my_rpc_client->get_id() : 0; if (cur_rpc_id == given_rpc_id) { rpc_.reset(); + uint64_t last_streamed_log_idx = get_last_streamed_log_idx(); reset_stream(); + if (last_streamed_log_idx) { + p_in("stop stream mode for peer %d at idx: %" PRIu64 "", + config_->get_id(), last_streamed_log_idx); + } + reset_stale_rpc_responses(); reset_bytes_in_flight(); try_set_free(req->get_type(), streaming); } else { @@ -175,14 +191,25 @@ void peer::handle_rpc_result( ptr myself, // RPC client has been reset before this request returns // error. Those two are different instances and we // SHOULD NOT reset the new one. - p_wn( "[EDGE CASE] RPC for %d has been reset before " - "returning error: current %p (%" PRIu64 - "), from parameter %p (%" PRIu64 ")", - config_->get_id(), - rpc_.get(), - cur_rpc_id, - my_rpc_client.get(), - given_rpc_id ); + + // NOTE: In streaming mode, there can be lots of below errors + // at the same time. We should avoid verbose logs. + + int32_t stale_resps = inc_stale_rpc_responses(); + int32_t limit = raft_server::get_raft_limits().response_limit_; + if (stale_resps < limit) { + p_wn( "[EDGE CASE] RPC for %d has been reset before " + "returning error: current %p (%" PRIu64 + "), from parameter %p (%" PRIu64 ")", + config_->get_id(), + rpc_.get(), + cur_rpc_id, + my_rpc_client.get(), + given_rpc_id ); + } else if (stale_resps == limit) { + p_wn( "[EDGE CASE] too verbose stale RPC response from peer %d, " + "will suppress it from now", config_->get_id() ); + } } } } diff --git a/src/raft_server.cxx b/src/raft_server.cxx index 38a947bf..bcb4573e 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -403,7 +403,8 @@ void raft_server::apply_and_log_current_params() { "leadership transfer wait time %d, " "grace period of lagging state machine %d, " "snapshot IO: %s, " - "parallel log appending: %s", + "parallel log appending: %s, " + "streaming mode max log gap %d, max bytes %" PRIu64, params->election_timeout_lower_bound_, params->election_timeout_upper_bound_, params->heart_beat_interval_, @@ -425,7 +426,9 @@ void raft_server::apply_and_log_current_params() { params->leadership_transfer_min_wait_time_, params->grace_period_of_lagging_state_machine_, params->use_bg_thread_for_snapshot_io_ ? "ASYNC" : "BLOCKING", - params->parallel_log_appending_ ? "ON" : "OFF" ); + params->parallel_log_appending_ ? "ON" : "OFF", + params->max_log_gap_in_stream_, + params->max_bytes_in_flight_in_stream_ ); status_check_timer_.set_duration_ms(params->heart_beat_interval_); status_check_timer_.reset();