Skip to content

Commit 4f9f4aa

Browse files
authored
ckit: reduce overhead of rejoining cluster (#76)
Previously, whenever a client would call Start to rejoin a cluster (and fix any potential split-brain issues), a new state message would be generated and broadcast to the cluster. This resulted in a singificant amount of gossip traffic as the cluster size increased: a cluster with 800 nodes rejoining every 15 seconds would result in around 640,000 state change messages being sent every 15 seconds (800 state changes sent to 800 nodes). This logic only appeared neccessary due to a bug in correcting invalid state messages about our own node: if we get any message at all about our node which is newer than our local copy, only then should we broadcast a new message. Fixing this bug removed the need for this logic, which will help reduce the amount of gossip generated when rejoining. With this change, rejoining a node will only require one push/pull request per node being joined, and will result in no new broadcasts if all nodes are already up-to-date. To help observe broadcast volume, a new cluster_node_gossip_broadcasts_total metric has been added.
1 parent d78cdfa commit 4f9f4aa

File tree

2 files changed

+39
-40
lines changed

2 files changed

+39
-40
lines changed

metrics.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ const clusterNameLabel = "cluster_name"
2525
type metrics struct {
2626
metricsutil.Container
2727

28-
gossipEventsTotal *prometheus.CounterVec
29-
nodePeers *prometheus.GaugeVec
30-
nodeUpdating prometheus.Gauge
31-
nodeUpdateDuration prometheus.Histogram
32-
nodeObservers prometheus.Gauge
33-
nodeInfo *metricsutil.InfoCollector
28+
gossipEventsTotal *prometheus.CounterVec
29+
gossipBroadcastsTotal *prometheus.CounterVec
30+
nodePeers *prometheus.GaugeVec
31+
nodeUpdating prometheus.Gauge
32+
nodeUpdateDuration prometheus.Histogram
33+
nodeObservers prometheus.Gauge
34+
nodeInfo *metricsutil.InfoCollector
3435
}
3536

3637
var _ prometheus.Collector = (*metrics)(nil)
@@ -46,6 +47,14 @@ func newMetrics(clusterName string) *metrics {
4647
},
4748
}, []string{"event"})
4849

50+
m.gossipBroadcastsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
51+
Name: "cluster_node_gossip_broadcasts_total",
52+
Help: "Total number of gossip messages broadcasted by the node.",
53+
ConstLabels: prometheus.Labels{
54+
clusterNameLabel: clusterName,
55+
},
56+
}, []string{"event"})
57+
4958
m.nodePeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
5059
Name: "cluster_node_peers",
5160
Help: "Current number of healthy peers by state",
@@ -89,6 +98,7 @@ func newMetrics(clusterName string) *metrics {
8998

9099
m.Add(
91100
m.gossipEventsTotal,
101+
m.gossipBroadcastsTotal,
92102
m.nodePeers,
93103
m.nodeUpdating,
94104
m.nodeUpdateDuration,

node.go

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,19 @@ func (n *Node) Start(peers []string) error {
268268
return fmt.Errorf("failed to join memberlist: %w", err)
269269
}
270270

271-
// Broadcast our current state of the node to all peers now that our lamport
272-
// clock is roughly synchronized.
273-
return n.broadcastCurrentState()
271+
// Originally, after calling n.ml.Join, we would broadcast a new state
272+
// message (with a new lamport time) with the node's local state. The intent
273+
// was that there might be stale messages about a previous instance of our
274+
// node with different lamport times, and we'd want to correct them by
275+
// broadcasting a message.
276+
//
277+
// This only appeared necessary due to a bug in how we stale messages about
278+
// our own node, and resulted in a lot of extra state messages being sent due
279+
// to nodes calling Start to fix split brain issues.
280+
//
281+
// Now, we'll correct invalid state messages about our node upon receipt; see
282+
// [Node.handleStateMessage] and [nodeDelegate.MergeRemoteState].
283+
return nil
274284
}
275285

276286
func (n *Node) run(ctx context.Context) {
@@ -293,31 +303,6 @@ func (n *Node) run(ctx context.Context) {
293303
}
294304
}
295305

296-
// broadcastCurrentState queues a message to send the current state of the node
297-
// to the cluster. This should be done after joining a new set of nodes once
298-
// the lamport clock is synchronized.
299-
func (n *Node) broadcastCurrentState() error {
300-
n.stateMut.RLock()
301-
defer n.stateMut.RUnlock()
302-
303-
stateMsg := messages.State{
304-
NodeName: n.cfg.Name,
305-
NewState: n.localState,
306-
Time: n.clock.Tick(),
307-
}
308-
309-
// Treat the stateMsg as if it was received externally to track our own state
310-
// along with other nodes.
311-
n.handleStateMessage(stateMsg)
312-
313-
bcast, err := messages.Broadcast(&stateMsg, nil)
314-
if err != nil {
315-
return err
316-
}
317-
n.broadcasts.QueueBroadcast(bcast)
318-
return nil
319-
}
320-
321306
// Stop stops the Node, removing it from the cluster. Callers should first
322307
// first transition to StateTerminating to gracefully leave the cluster.
323308
// Observers will no longer be notified about cluster changes after Stop
@@ -441,10 +426,13 @@ func (n *Node) changeState(to peer.State, onDone func()) error {
441426
}
442427

443428
// handleStateMessage handles a state message from a peer. Returns true if the
444-
// message hasn't been seen before.
429+
// message hasn't been seen before. The final return parameter will be the
430+
// message to broadcast: if msg is a stale message from a previous instance of
431+
// the local node, final will be an updated message reflecting the node's local
432+
// state.
445433
//
446434
// handleStateMessage must be called with n.stateMut held for reading.
447-
func (n *Node) handleStateMessage(msg messages.State) (newMessage bool) {
435+
func (n *Node) handleStateMessage(msg messages.State) (final messages.State, newMessage bool) {
448436
n.clock.Observe(msg.Time)
449437

450438
n.peerMut.Lock()
@@ -453,8 +441,8 @@ func (n *Node) handleStateMessage(msg messages.State) (newMessage bool) {
453441
curr, exist := n.peerStates[msg.NodeName]
454442
if exist && msg.Time <= curr.Time {
455443
// Ignore a state message if we have the same or a newer one.
456-
return false
457-
} else if exist && msg.NodeName == n.cfg.Name {
444+
return curr, false
445+
} else if msg.NodeName == n.cfg.Name {
458446
level.Debug(n.log).Log("msg", "got stale message about self", "msg", msg)
459447

460448
// A peer has a newer message about ourselves, likely from a previous
@@ -477,7 +465,7 @@ func (n *Node) handleStateMessage(msg messages.State) (newMessage bool) {
477465
n.handlePeersChanged()
478466
}
479467

480-
return true
468+
return msg, true
481469
}
482470

483471
// Peers returns all Peers currently known by n. The Peers list will include
@@ -605,7 +593,7 @@ func (nd *nodeDelegate) NotifyMsg(raw []byte) {
605593
nd.stateMut.RLock()
606594
defer nd.stateMut.RUnlock()
607595

608-
if nd.handleStateMessage(s) {
596+
if s, broadcast := nd.handleStateMessage(s); broadcast {
609597
// We should continue gossiping the message to other peers if we haven't
610598
// seen it before.
611599
//
@@ -614,6 +602,7 @@ func (nd *nodeDelegate) NotifyMsg(raw []byte) {
614602
// messages would still converge eventually using push/pulls.
615603
bcast, _ := messages.Broadcast(&s, nil)
616604
nd.broadcasts.QueueBroadcast(bcast)
605+
nd.m.gossipBroadcastsTotal.WithLabelValues(eventStateChange).Inc()
617606
}
618607

619608
default:

0 commit comments

Comments
 (0)