Skip to content

Commit a567a90

Browse files
committed
Add failureThreshold to elastic-agent self-monitoring config (#5999)
* Add failureThreshold to elastic-agent self-monitoring config (cherry picked from commit 2a46509)
1 parent e11d23d commit a567a90

File tree

4 files changed

+272
-13
lines changed

4 files changed

+272
-13
lines changed

internal/pkg/agent/application/coordinator/diagnostics_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ agent:
100100
metrics_period: ""
101101
namespace: ""
102102
pprof: null
103+
failure_threshold: null
103104
traces: true
104105
apm:
105106
hosts:

internal/pkg/agent/application/monitoring/v1_monitor.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"path/filepath"
1313
"runtime"
14+
"strconv"
1415
"strings"
1516
"time"
1617
"unicode"
@@ -48,6 +49,7 @@ const (
4849
monitoringKey = "monitoring"
4950
useOutputKey = "use_output"
5051
monitoringMetricsPeriodKey = "metrics_period"
52+
failureThresholdKey = "failure_threshold"
5153
monitoringOutput = "monitoring"
5254
defaultMonitoringNamespace = "default"
5355
agentName = "elastic-agent"
@@ -60,6 +62,10 @@ const (
6062
// metricset execution period used for the monitoring metrics inputs
6163
// we set this to 60s to reduce the load/data volume on the monitoring cluster
6264
defaultMetricsCollectionInterval = 60 * time.Second
65+
66+
// metricset stream failure threshold before the stream is marked as DEGRADED
67+
// to avoid marking the agent degraded for transient errors, we set the default threshold to 2
68+
defaultMetricsStreamFailureThreshold = uint(2)
6369
)
6470

6571
var (
@@ -131,6 +137,7 @@ func (b *BeatsMonitor) MonitoringConfig(
131137

132138
monitoringOutputName := defaultOutputName
133139
metricsCollectionIntervalString := b.config.C.MetricsPeriod
140+
failureThreshold := b.config.C.FailureThreshold
134141
if agentCfg, found := policy[agentKey]; found {
135142
// The agent section is required for feature flags
136143
cfg[agentKey] = agentCfg
@@ -151,6 +158,25 @@ func (b *BeatsMonitor) MonitoringConfig(
151158
metricsCollectionIntervalString = metricsPeriodStr
152159
}
153160
}
161+
162+
if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found {
163+
switch policyValue := policyFailureThresholdRaw.(type) {
164+
case uint:
165+
failureThreshold = &policyValue
166+
case int:
167+
unsignedValue := uint(policyValue)
168+
failureThreshold = &unsignedValue
169+
case string:
170+
parsedPolicyValue, err := strconv.Atoi(policyValue)
171+
if err != nil {
172+
return nil, fmt.Errorf("failed to convert policy failure threshold string to int: %w", err)
173+
}
174+
uintPolicyValue := uint(parsedPolicyValue)
175+
failureThreshold = &uintPolicyValue
176+
default:
177+
return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw)
178+
}
179+
}
154180
}
155181
}
156182
}
@@ -173,7 +199,7 @@ func (b *BeatsMonitor) MonitoringConfig(
173199
}
174200

175201
if b.config.C.MonitorMetrics {
176-
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString); err != nil {
202+
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString, failureThreshold); err != nil {
177203
return nil, errors.New(err, "failed to inject monitoring output")
178204
}
179205
}
@@ -556,12 +582,20 @@ func (b *BeatsMonitor) injectMetricsInput(
556582
componentList []component.Component,
557583
existingStateServicePids map[string]uint64,
558584
metricsCollectionIntervalString string,
585+
failureThreshold *uint,
559586
) error {
560587
if metricsCollectionIntervalString == "" {
561588
metricsCollectionIntervalString = defaultMetricsCollectionInterval.String()
562589
}
590+
591+
if failureThreshold == nil {
592+
defaultValue := defaultMetricsStreamFailureThreshold
593+
failureThreshold = &defaultValue
594+
}
563595
monitoringNamespace := b.monitoringNamespace()
564596
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
597+
// beatStreams and streams MUST be []interface{} even if in reality they are []map[string]interface{}:
598+
// if those are declared as slices of maps the message "proto: invalid type: []map[string]interface{}" will pop up
565599
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
566600
streams := []interface{}{
567601
map[string]interface{}{
@@ -866,6 +900,25 @@ func (b *BeatsMonitor) injectMetricsInput(
866900

867901
}
868902

903+
if failureThreshold != nil {
904+
// add failure threshold to all streams and beatStreams
905+
for _, s := range streams {
906+
if streamMap, ok := s.(map[string]interface{}); ok {
907+
streamMap[failureThresholdKey] = *failureThreshold
908+
} else {
909+
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
910+
}
911+
912+
}
913+
for _, s := range beatsStreams {
914+
if streamMap, ok := s.(map[string]interface{}); ok {
915+
streamMap[failureThresholdKey] = *failureThreshold
916+
} else {
917+
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
918+
}
919+
}
920+
}
921+
869922
inputs := []interface{}{
870923
map[string]interface{}{
871924
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),

internal/pkg/agent/application/monitoring/v1_monitor_test.go

Lines changed: 205 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
235235
// check the streams created for the input, should be a list of objects
236236
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
237237
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {
238-
// loop over streams and cast to map[string]any to access keys
238+
// loop over streams and access keys
239239
for _, rawStream := range input["streams"].([]any) {
240240
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
241241
stream := rawStream.(map[string]any)
@@ -258,6 +258,210 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
258258
}
259259
}
260260

261+
func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) {
262+
263+
agentInfo, err := info.NewAgentInfo(context.Background(), false)
264+
require.NoError(t, err, "Error creating agent info")
265+
266+
sampleFiveErrorsStreamThreshold := uint(5)
267+
sampleTenErrorsStreamThreshold := uint(10)
268+
269+
tcs := []struct {
270+
name string
271+
monitoringCfg *monitoringConfig
272+
policy map[string]any
273+
expectedThreshold uint
274+
}{
275+
{
276+
name: "default failure threshold",
277+
monitoringCfg: &monitoringConfig{
278+
C: &monitoringcfg.MonitoringConfig{
279+
Enabled: true,
280+
MonitorMetrics: true,
281+
HTTP: &monitoringcfg.MonitoringHTTPConfig{
282+
Enabled: false,
283+
},
284+
},
285+
},
286+
policy: map[string]any{
287+
"agent": map[string]any{
288+
"monitoring": map[string]any{
289+
"metrics": true,
290+
"http": map[string]any{
291+
"enabled": false,
292+
},
293+
},
294+
},
295+
"outputs": map[string]any{
296+
"default": map[string]any{},
297+
},
298+
},
299+
expectedThreshold: defaultMetricsStreamFailureThreshold,
300+
},
301+
{
302+
name: "agent config failure threshold",
303+
monitoringCfg: &monitoringConfig{
304+
C: &monitoringcfg.MonitoringConfig{
305+
Enabled: true,
306+
MonitorMetrics: true,
307+
HTTP: &monitoringcfg.MonitoringHTTPConfig{
308+
Enabled: false,
309+
},
310+
FailureThreshold: &sampleFiveErrorsStreamThreshold,
311+
},
312+
},
313+
policy: map[string]any{
314+
"agent": map[string]any{
315+
"monitoring": map[string]any{
316+
"metrics": true,
317+
"http": map[string]any{
318+
"enabled": false,
319+
},
320+
},
321+
},
322+
"outputs": map[string]any{
323+
"default": map[string]any{},
324+
},
325+
},
326+
expectedThreshold: sampleFiveErrorsStreamThreshold,
327+
},
328+
{
329+
name: "policy failure threshold uint",
330+
monitoringCfg: &monitoringConfig{
331+
C: &monitoringcfg.MonitoringConfig{
332+
Enabled: true,
333+
MonitorMetrics: true,
334+
HTTP: &monitoringcfg.MonitoringHTTPConfig{
335+
Enabled: false,
336+
},
337+
FailureThreshold: &sampleFiveErrorsStreamThreshold,
338+
},
339+
},
340+
policy: map[string]any{
341+
"agent": map[string]any{
342+
"monitoring": map[string]any{
343+
"metrics": true,
344+
"http": map[string]any{
345+
"enabled": false,
346+
},
347+
failureThresholdKey: sampleTenErrorsStreamThreshold,
348+
},
349+
},
350+
"outputs": map[string]any{
351+
"default": map[string]any{},
352+
},
353+
},
354+
expectedThreshold: sampleTenErrorsStreamThreshold,
355+
},
356+
{
357+
name: "policy failure threshold int",
358+
monitoringCfg: &monitoringConfig{
359+
C: &monitoringcfg.MonitoringConfig{
360+
Enabled: true,
361+
MonitorMetrics: true,
362+
HTTP: &monitoringcfg.MonitoringHTTPConfig{
363+
Enabled: false,
364+
},
365+
FailureThreshold: &sampleFiveErrorsStreamThreshold,
366+
},
367+
},
368+
policy: map[string]any{
369+
"agent": map[string]any{
370+
"monitoring": map[string]any{
371+
"metrics": true,
372+
"http": map[string]any{
373+
"enabled": false,
374+
},
375+
failureThresholdKey: 10,
376+
},
377+
},
378+
"outputs": map[string]any{
379+
"default": map[string]any{},
380+
},
381+
},
382+
expectedThreshold: sampleTenErrorsStreamThreshold,
383+
},
384+
{
385+
name: "policy failure threshold string",
386+
monitoringCfg: &monitoringConfig{
387+
C: &monitoringcfg.MonitoringConfig{
388+
Enabled: true,
389+
MonitorMetrics: true,
390+
HTTP: &monitoringcfg.MonitoringHTTPConfig{
391+
Enabled: false,
392+
},
393+
FailureThreshold: &sampleFiveErrorsStreamThreshold,
394+
},
395+
},
396+
policy: map[string]any{
397+
"agent": map[string]any{
398+
"monitoring": map[string]any{
399+
"metrics": true,
400+
"http": map[string]any{
401+
"enabled": false,
402+
},
403+
failureThresholdKey: "10",
404+
},
405+
},
406+
"outputs": map[string]any{
407+
"default": map[string]any{},
408+
},
409+
},
410+
expectedThreshold: sampleTenErrorsStreamThreshold,
411+
},
412+
}
413+
414+
for _, tc := range tcs {
415+
416+
t.Run(tc.name, func(t *testing.T) {
417+
b := &BeatsMonitor{
418+
enabled: true,
419+
config: tc.monitoringCfg,
420+
operatingSystem: runtime.GOOS,
421+
agentInfo: agentInfo,
422+
}
423+
got, err := b.MonitoringConfig(tc.policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input
424+
assert.NoError(t, err)
425+
426+
rawInputs, ok := got["inputs"]
427+
require.True(t, ok, "monitoring config contains no input")
428+
inputs, ok := rawInputs.([]any)
429+
require.True(t, ok, "monitoring inputs are not a list")
430+
marshaledInputs, err := yaml.Marshal(inputs)
431+
if assert.NoError(t, err, "error marshaling monitoring inputs") {
432+
t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs)
433+
}
434+
435+
// loop over the created inputs
436+
for _, i := range inputs {
437+
input, ok := i.(map[string]any)
438+
if assert.Truef(t, ok, "input is not represented as a map: %v", i) {
439+
inputID := input["id"]
440+
t.Logf("input %q", inputID)
441+
// check the streams created for the input, should be a list of objects
442+
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
443+
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {
444+
445+
// loop over streams and cast to map[string]any to access keys
446+
for _, rawStream := range input["streams"].([]any) {
447+
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
448+
stream := rawStream.(map[string]any)
449+
// check period and assert its value
450+
streamID := stream["id"]
451+
if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) &&
452+
assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) {
453+
actualFailureThreshold := stream[failureThresholdKey].(uint)
454+
assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID)
455+
}
456+
}
457+
}
458+
}
459+
}
460+
}
461+
})
462+
}
463+
}
464+
261465
func TestMonitoringConfigComponentFields(t *testing.T) {
262466
agentInfo, err := info.NewAgentInfo(context.Background(), false)
263467
require.NoError(t, err, "Error creating agent info")

internal/pkg/core/monitoring/config/config.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,18 @@ const (
2121

2222
// MonitoringConfig describes a configuration of a monitoring
2323
type MonitoringConfig struct {
24-
Enabled bool `yaml:"enabled" config:"enabled"`
25-
MonitorLogs bool `yaml:"logs" config:"logs"`
26-
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
27-
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
28-
LogMetrics bool `yaml:"-" config:"-"`
29-
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
30-
Namespace string `yaml:"namespace" config:"namespace"`
31-
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
32-
MonitorTraces bool `yaml:"traces" config:"traces"`
33-
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
34-
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
24+
Enabled bool `yaml:"enabled" config:"enabled"`
25+
MonitorLogs bool `yaml:"logs" config:"logs"`
26+
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
27+
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
28+
FailureThreshold *uint `yaml:"failure_threshold" config:"failure_threshold"`
29+
LogMetrics bool `yaml:"-" config:"-"`
30+
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
31+
Namespace string `yaml:"namespace" config:"namespace"`
32+
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
33+
MonitorTraces bool `yaml:"traces" config:"traces"`
34+
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
35+
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
3536
}
3637

3738
// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent

0 commit comments

Comments
 (0)