Skip to content

Commit 22a40c3

Browse files
authored
[clusteragent/clusterchecks/dispatcher] Pick random node for new checks when advanced dispatching is enabled (DataDog#18914)
1 parent 84d6964 commit 22a40c3

File tree

4 files changed

+64
-31
lines changed

4 files changed

+64
-31
lines changed

pkg/clusteragent/clusterchecks/dispatcher_main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (d *dispatcher) reschedule(configs []integration.Config) {
147147

148148
// add stores and delegates a given configuration
149149
func (d *dispatcher) add(config integration.Config) {
150-
target := d.getLeastBusyNode()
150+
target := d.getNodeToScheduleCheck()
151151
if target == "" {
152152
// If no node is found, store it in the danglingConfigs map for retrying later.
153153
log.Warnf("No available node to dispatch %s:%s on, will retry later", config.Name, config.Digest())

pkg/clusteragent/clusterchecks/dispatcher_nodes.go

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package clusterchecks
99

1010
import (
1111
"fmt"
12+
"math/rand"
1213
"time"
1314

1415
"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
@@ -75,38 +76,57 @@ func (d *dispatcher) processNodeStatus(nodeName, clientIP string, status types.N
7576
return false
7677
}
7778

78-
// getLeastBusyNode returns the name of the node that is assigned
79-
// the lowest number of checks. In case of equality, one is chosen
80-
// randomly, based on map iterations being randomized.
81-
func (d *dispatcher) getLeastBusyNode() string {
82-
var leastBusyNode string
83-
minCheckCount := int(-1)
84-
minBusyness := int(-1)
79+
// getNodeToScheduleCheck returns the node where a new check should be scheduled
80+
81+
// Advanced dispatching relies on the check stats fetched from the cluster check
82+
// runners API to distribute the checks. The stats are only updated when the
83+
// checks are rebalanced, they are not updated every time a check is scheduled.
84+
// That's why it's not a good idea to pick the least busy node. Rebalance
85+
// happens every few minutes, so all the checks added during that time would get
86+
// scheduled to the same node. It's a better solution to pick a random node and
87+
// rely on rebalancing to distribute when needed.
88+
//
89+
// On the other hand, when advanced dispatching is not used, we can pick the
90+
// node with fewer checks. It's because the number of checks is kept up to date.
91+
func (d *dispatcher) getNodeToScheduleCheck() string {
92+
if d.advancedDispatching {
93+
return d.getRandomNode()
94+
}
95+
96+
return d.getNodeWithLessChecks()
97+
}
98+
99+
func (d *dispatcher) getRandomNode() string {
100+
d.store.RLock()
101+
defer d.store.RUnlock()
102+
103+
var nodes []string
104+
for name := range d.store.nodes {
105+
nodes = append(nodes, name)
106+
}
107+
108+
if len(nodes) == 0 {
109+
return ""
110+
}
111+
112+
return nodes[rand.Intn(len(nodes))]
113+
}
85114

115+
func (d *dispatcher) getNodeWithLessChecks() string {
86116
d.store.RLock()
87117
defer d.store.RUnlock()
88118

119+
var selectedNode string
120+
minNumChecks := 0
121+
89122
for name, store := range d.store.nodes {
90-
if name == "" {
91-
continue
92-
}
93-
if d.advancedDispatching && store.busyness > defaultBusynessValue {
94-
// dispatching based on clc runners stats
95-
// only when advancedDispatching is true and
96-
// started collecting busyness values
97-
if minBusyness == -1 || store.busyness < minBusyness {
98-
leastBusyNode = name
99-
minBusyness = store.busyness
100-
}
101-
} else {
102-
// count-based round robin dispatching
103-
if minCheckCount == -1 || len(store.digestToConfig) < minCheckCount {
104-
leastBusyNode = name
105-
minCheckCount = len(store.digestToConfig)
106-
}
123+
if selectedNode == "" || len(store.digestToConfig) < minNumChecks {
124+
selectedNode = name
125+
minNumChecks = len(store.digestToConfig)
107126
}
108127
}
109-
return leastBusyNode
128+
129+
return selectedNode
110130
}
111131

112132
// expireNodes iterates over nodes and removes the ones that have not

pkg/clusteragent/clusterchecks/dispatcher_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,26 +231,26 @@ func TestProcessNodeStatus(t *testing.T) {
231231
requireNotLocked(t, dispatcher.store)
232232
}
233233

234-
func TestGetLeastBusyNode(t *testing.T) {
234+
func TestGetNodeWithLessChecks(t *testing.T) {
235235
dispatcher := newDispatcher()
236236

237237
// No node registered -> empty string
238-
assert.Equal(t, "", dispatcher.getLeastBusyNode())
238+
assert.Equal(t, "", dispatcher.getNodeWithLessChecks())
239239

240240
// 1 config on node1, 2 on node2
241241
dispatcher.addConfig(generateIntegration("A"), "node1")
242242
dispatcher.addConfig(generateIntegration("B"), "node2")
243243
dispatcher.addConfig(generateIntegration("C"), "node2")
244-
assert.Equal(t, "node1", dispatcher.getLeastBusyNode())
244+
assert.Equal(t, "node1", dispatcher.getNodeWithLessChecks())
245245

246246
// 3 configs on node1, 2 on node2
247247
dispatcher.addConfig(generateIntegration("D"), "node1")
248248
dispatcher.addConfig(generateIntegration("E"), "node1")
249-
assert.Equal(t, "node2", dispatcher.getLeastBusyNode())
249+
assert.Equal(t, "node2", dispatcher.getNodeWithLessChecks())
250250

251251
// Add an empty node3
252252
dispatcher.processNodeStatus("node3", "10.0.0.3", types.NodeStatus{})
253-
assert.Equal(t, "node3", dispatcher.getLeastBusyNode())
253+
assert.Equal(t, "node3", dispatcher.getNodeWithLessChecks())
254254

255255
requireNotLocked(t, dispatcher.store)
256256
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Each section from every release note are combined when the
2+
# CHANGELOG.rst is rendered. So the text needs to be worded so that
3+
# it does not depend on any information only available in another
4+
# section. This may mean repeating some details, but each section
5+
# must be readable independently of the other.
6+
#
7+
# Each section note must be formatted as reStructuredText.
8+
---
9+
fixes:
10+
- |
11+
Fixed a bug in the advanced dispatching of cluster checks. All the checks
12+
scheduled since the last rebalance were being scheduled in the same node.
13+
Now they should be distributed among the available nodes.

0 commit comments

Comments
 (0)