Skip to content

Commit

Permalink
Merge pull request #20838 from mmaslankaprv/hb-manager-async
Browse files Browse the repository at this point in the history
 Use `async_for_each` when collecting heartbeat requests
  • Loading branch information
mmaslankaprv committed Jul 5, 2024
2 parents f334a22 + 86aebfe commit 7c9bbf8
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 79 deletions.
1 change: 1 addition & 0 deletions src/v/raft/follower_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class follower_stats {
using container_t = absl::node_hash_map<vnode, follower_index_metadata>;
using iterator = container_t::iterator;
using const_iterator = container_t::const_iterator;
using value_type = container_t::value_type;

explicit follower_stats(vnode self, uint32_t max_concurrent_append_entries)
: _self(self)
Expand Down
172 changes: 94 additions & 78 deletions src/v/raft/heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "base/vlog.h"
#include "config/configuration.h"
#include "features/feature_table.h"
#include "follower_stats.h"
#include "model/metadata.h"
#include "model/timeout_clock.h"
#include "outcome_future_utils.h"
Expand All @@ -24,6 +25,7 @@
#include "rpc/errc.h"
#include "rpc/reconnect_transport.h"
#include "rpc/types.h"
#include "ssx/async_algorithm.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -52,16 +54,17 @@ heartbeat_manager::follower_request_meta::follower_request_meta(
, follower_vnode(target)
, append_guard(c->track_append_inflight(follower_vnode)) {}

heartbeat_manager::heartbeat_requests heartbeat_manager::requests_for_range() {
absl::node_hash_map<
model::node_id,
ss::chunked_fifo<
std::pair<group_heartbeat, heartbeat_manager::follower_request_meta>>>
ss::future<heartbeat_manager::heartbeat_requests>
heartbeat_manager::requests_for_range() {
using hb_pair
= std::pair<group_heartbeat, heartbeat_manager::follower_request_meta>;
absl::node_hash_map<model::node_id, ss::chunked_fifo<hb_pair>>
pending_beats;

if (_consensus_groups.empty()) {
return {};
co_return heartbeat_requests{};
}
ssx::async_counter counter;

// Set of follower nodes whose heartbeat_failed status indicates
// that we should tear down their TCP connection before next heartbeat
Expand All @@ -73,72 +76,78 @@ heartbeat_manager::heartbeat_requests heartbeat_manager::requests_for_range() {
continue;
}

for (auto& [id, follower_metadata] : r->_fstats) {
if (
follower_metadata.last_received_reply_timestamp
> last_heartbeat) {
vlog(r->_ctxlog.trace, "[{}] heartbeat skipped", id);
continue;
}

if (unlikely(
!_enable_lw_heartbeat()
&& follower_metadata.has_inflight_appends())) {
// Revert back to old behavior of heartbeat suppression during
// inflight appends as we cannot make use of lw heartbeats
// optitmization. This is unlikely in practice because lw
// heartbeats are enabled by default in the binary.
vlog(
r->_ctxlog.trace,
"[{}] heartbeat suppressed, lw hearbeats are disabled",
id);
continue;
}

auto [it, _] = pending_beats.try_emplace(id.id());
group_heartbeat group_beat{
.group = r->group(),
};
const auto raft_metadata = r->meta();
if (
_enable_lw_heartbeat()
&& !needs_full_heartbeat(
follower_metadata, raft_metadata, r->flushed_offset())) {
r->_probe->lw_heartbeat();
// we do not fill the dirty offset and follower request sequence
// here as those fields are not used to process lightweight
// heartbeats
it->second.emplace_back(
group_beat,
heartbeat_manager::follower_request_meta(
r, raft::follower_req_seq{}, model::offset{}, id));
continue;
}
vlog(r->_ctxlog.trace, "[{}] full heartbeat", id);
r->_probe->full_heartbeat();
auto const seq_id = follower_metadata.next_follower_sequence();

follower_metadata.last_sent_protocol_meta = raft_metadata;
group_beat.data = heartbeat_request_data{
.source_revision = r->_self.revision(),
.target_revision = id.revision(),
.commit_index = raft_metadata.commit_index,
.term = raft_metadata.term,
.prev_log_index = raft_metadata.prev_log_index,
.prev_log_term = raft_metadata.prev_log_term,
.last_visible_index = raft_metadata.last_visible_index,
};
it->second.emplace_back(
group_beat,
heartbeat_manager::follower_request_meta(
r, seq_id, raft_metadata.prev_log_index, id));

if (r->should_reconnect_follower(follower_metadata)) {
reconnect_nodes.insert(id.id());
}
}
co_await ssx::async_for_each_counter(
counter,
r->_fstats.begin(),
r->_fstats.end(),
[this, r, last_heartbeat, &pending_beats, &reconnect_nodes](
follower_stats::value_type& p) {
auto& [id, follower_metadata] = p;
if (
follower_metadata.last_received_reply_timestamp
> last_heartbeat) {
vlog(r->_ctxlog.trace, "[{}] heartbeat skipped", id);
return;
}

if (unlikely(
!_enable_lw_heartbeat()
&& follower_metadata.has_inflight_appends())) {
// Revert back to old behavior of heartbeat suppression during
// inflight appends as we cannot make use of lw heartbeats
// optitmization. This is unlikely in practice because lw
// heartbeats are enabled by default in the binary.
vlog(
r->_ctxlog.trace,
"[{}] heartbeat suppressed, lw hearbeats are disabled",
id);
return;
}

auto [it, _] = pending_beats.try_emplace(id.id());
group_heartbeat group_beat{
.group = r->group(),
};
const auto raft_metadata = r->meta();
if (
_enable_lw_heartbeat()
&& !needs_full_heartbeat(
follower_metadata, raft_metadata, r->flushed_offset())) {
r->_probe->lw_heartbeat();
// we do not fill the dirty offset and follower request
// sequence here as those fields are not used to process
// lightweight heartbeats
it->second.emplace_back(
group_beat,
heartbeat_manager::follower_request_meta(
r, raft::follower_req_seq{}, model::offset{}, id));
return;
}
vlog(r->_ctxlog.trace, "[{}] full heartbeat", id);
r->_probe->full_heartbeat();
auto const seq_id = follower_metadata.next_follower_sequence();

follower_metadata.last_sent_protocol_meta = raft_metadata;
group_beat.data = heartbeat_request_data{
.source_revision = r->_self.revision(),
.target_revision = id.revision(),
.commit_index = raft_metadata.commit_index,
.term = raft_metadata.term,
.prev_log_index = raft_metadata.prev_log_index,
.prev_log_term = raft_metadata.prev_log_term,
.last_visible_index = raft_metadata.last_visible_index,
};
it->second.emplace_back(
group_beat,
heartbeat_manager::follower_request_meta(
r, seq_id, raft_metadata.prev_log_index, id));

if (r->should_reconnect_follower(follower_metadata)) {
reconnect_nodes.insert(id.id());
}
});
}

ssx::async_counter counter_collect;
std::vector<heartbeat_manager::node_heartbeat> reqs;
reqs.reserve(pending_beats.size());
for (auto& p : pending_beats) {
Expand All @@ -148,16 +157,23 @@ heartbeat_manager::heartbeat_requests heartbeat_manager::requests_for_range() {
heartbeat_manager::follower_request_meta>
meta_map;
requests.reserve(p.second.size());
meta_map.reserve(p.second.size());
heartbeat_request_v2 req(_self, p.first);
for (auto& [hb, follower_meta] : p.second) {
meta_map.emplace(hb.group, std::move(follower_meta));
req.add(hb);
}
co_await ssx::async_for_each_counter(
counter_collect,
p.second.begin(),
p.second.end(),
[&meta_map, &req](hb_pair& inner) {
auto& [hb, follower_meta] = inner;
meta_map.emplace(hb.group, std::move(follower_meta));
req.add(hb);
});

reqs.emplace_back(p.first, std::move(req), std::move(meta_map));
}

return heartbeat_requests{
.requests{std::move(reqs)}, .reconnect_nodes{reconnect_nodes}};
co_return heartbeat_requests{
.requests{std::move(reqs)}, .reconnect_nodes{std::move(reconnect_nodes)}};
}

bool heartbeat_manager::needs_full_heartbeat(
Expand Down Expand Up @@ -220,7 +236,7 @@ heartbeat_manager::send_heartbeats(std::vector<node_heartbeat> reqs) {
}

ss::future<> heartbeat_manager::do_dispatch_heartbeats() {
auto reqs = requests_for_range();
auto reqs = co_await requests_for_range();

for (const auto& node_id : reqs.reconnect_nodes) {
if (co_await _client_protocol.ensure_disconnect(node_id)) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/heartbeat_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class heartbeat_manager {
group_id group,
reply_result status);

heartbeat_requests requests_for_range();
ss::future<heartbeat_requests> requests_for_range();
// private members

mutex _lock{"heartbeat_manager"};
Expand Down

0 comments on commit 7c9bbf8

Please sign in to comment.