From 7c58844b72068f321b789ff8d602b92a676d24a4 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 14:36:50 +0100 Subject: [PATCH 1/4] Add failureThreshold to elastic-agent self-monitoring config --- .../application/monitoring/v1_monitor.go | 51 +++- .../application/monitoring/v1_monitor_test.go | 231 ++++++++++++++++-- internal/pkg/core/monitoring/config/config.go | 23 +- 3 files changed, 273 insertions(+), 32 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 7512b989a98..94aea85d102 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "runtime" + "strconv" "strings" "time" "unicode" @@ -48,6 +49,7 @@ const ( monitoringKey = "monitoring" useOutputKey = "use_output" monitoringMetricsPeriodKey = "metrics_period" + failureThresholdKey = "failure_threshold" monitoringOutput = "monitoring" defaultMonitoringNamespace = "default" agentName = "elastic-agent" @@ -60,6 +62,10 @@ const ( // metricset execution period used for the monitoring metrics inputs // we set this to 60s to reduce the load/data volume on the monitoring cluster defaultMetricsCollectionInterval = 60 * time.Second + + // metricset stream failure threshold before the stream is marked as DEGRADED + // to avoid marking the agent degraded for transient errors, we set the default threshold to 2 + defaultMetricsStreamFailureThreshold = uint(2) ) var ( @@ -131,6 +137,7 @@ func (b *BeatsMonitor) MonitoringConfig( monitoringOutputName := defaultOutputName metricsCollectionIntervalString := b.config.C.MetricsPeriod + failureThreshold := b.config.C.FailureThreshold if agentCfg, found := policy[agentKey]; found { // The agent section is required for feature flags cfg[agentKey] = agentCfg @@ -151,6 +158,26 @@ func (b *BeatsMonitor) MonitoringConfig( metricsCollectionIntervalString = metricsPeriodStr } } + + if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found { + switch policyFailureThresholdRaw.(type) { + case uint: + policyValue := policyFailureThresholdRaw.(uint) + failureThreshold = &policyValue + case int: + policyValue := uint(policyFailureThresholdRaw.(int)) + failureThreshold = &policyValue + case string: + policyValue, err := strconv.Atoi(policyFailureThresholdRaw.(string)) + if err != nil { + return nil, fmt.Errorf("failed to convert policy failure threshold string to int: %w", err) + } + uintPolicyValue := uint(policyValue) + failureThreshold = &uintPolicyValue + default: + return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw) + } + } } } } @@ -173,7 +200,7 @@ func (b *BeatsMonitor) MonitoringConfig( } if b.config.C.MonitorMetrics { - if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString); err != nil { + if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString, failureThreshold); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } @@ -556,15 +583,21 @@ func (b *BeatsMonitor) injectMetricsInput( componentList []component.Component, existingStateServicePids map[string]uint64, metricsCollectionIntervalString string, + failureThreshold *uint, ) error { if metricsCollectionIntervalString == "" { metricsCollectionIntervalString = defaultMetricsCollectionInterval.String() } + + if failureThreshold == nil { + defaultValue := defaultMetricsStreamFailureThreshold + failureThreshold = &defaultValue + } monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") - beatsStreams := make([]interface{}, 0, len(componentIDToBinary)) - streams := []interface{}{ - map[string]interface{}{ + beatsStreams := make([]map[string]interface{}, 0, len(componentIDToBinary)) + streams := []map[string]interface{}{ + { idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID), "data_stream": map[string]interface{}{ "type": "metrics", @@ -866,6 +899,16 @@ func (b *BeatsMonitor) injectMetricsInput( } + if failureThreshold != nil { + // add failure threshold to all streams and beatStreams + for _, s := range streams { + s[failureThresholdKey] = *failureThreshold + } + for _, s := range beatsStreams { + s[failureThresholdKey] = *failureThreshold + } + } + inputs := []interface{}{ map[string]interface{}{ idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID), diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index 813d74bbd92..fffc76e951e 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -234,20 +234,17 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { t.Logf("input %q", inputID) // check the streams created for the input, should be a list of objects if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && - assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { - // loop over streams and cast to map[string]any to access keys - for _, rawStream := range input["streams"].([]any) { - if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { - stream := rawStream.(map[string]any) - // check period and assert its value - streamID := stream["id"] - if assert.Containsf(t, stream, "period", "stream %q for input %q does not contain a period", streamID, inputID) && - assert.IsType(t, "", stream["period"], "period for stream %q of input %q is not represented as a string", streamID, inputID) { - periodString := stream["period"].(string) - duration, err := time.ParseDuration(periodString) - if assert.NoErrorf(t, err, "Unparseable period duration %s for stream %q of input %q", periodString, streamID, inputID) { - assert.Equalf(t, duration, tc.expectedInterval, "unexpected duration for stream %q of input %q", streamID, inputID) - } + assert.IsTypef(t, []map[string]any{}, input["streams"], "streams for input %q are not a list of maps", inputID) { + // loop over streams and access keys + for _, stream := range input["streams"].([]map[string]any) { + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, "period", "stream %q for input %q does not contain a period", streamID, inputID) && + assert.IsType(t, "", stream["period"], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + periodString := stream["period"].(string) + duration, err := time.ParseDuration(periodString) + if assert.NoErrorf(t, err, "Unparseable period duration %s for stream %q of input %q", periodString, streamID, inputID) { + assert.Equalf(t, duration, tc.expectedInterval, "unexpected duration for stream %q of input %q", streamID, inputID) } } } @@ -258,6 +255,206 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { } } +func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) { + + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + sampleFiveErrorsStreamThreshold := uint(5) + sampleTenErrorsStreamThreshold := uint(10) + + tcs := []struct { + name string + monitoringCfg *monitoringConfig + policy map[string]any + expectedThreshold uint + }{ + { + name: "default failure threshold", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: defaultMetricsStreamFailureThreshold, + }, + { + name: "agent config failure threshold", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleFiveErrorsStreamThreshold, + }, + { + name: "policy failure threshold uint", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + failureThresholdKey: sampleTenErrorsStreamThreshold, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleTenErrorsStreamThreshold, + }, + { + name: "policy failure threshold int", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + failureThresholdKey: 10, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleTenErrorsStreamThreshold, + }, + { + name: "policy failure threshold string", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + failureThresholdKey: "10", + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleTenErrorsStreamThreshold, + }, + } + + for _, tc := range tcs { + + t.Run(tc.name, func(t *testing.T) { + b := &BeatsMonitor{ + enabled: true, + config: tc.monitoringCfg, + operatingSystem: runtime.GOOS, + agentInfo: agentInfo, + } + got, err := b.MonitoringConfig(tc.policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input + assert.NoError(t, err) + + rawInputs, ok := got["inputs"] + require.True(t, ok, "monitoring config contains no input") + inputs, ok := rawInputs.([]any) + require.True(t, ok, "monitoring inputs are not a list") + marshaledInputs, err := yaml.Marshal(inputs) + if assert.NoError(t, err, "error marshaling monitoring inputs") { + t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs) + } + + // loop over the created inputs + for _, i := range inputs { + input, ok := i.(map[string]any) + if assert.Truef(t, ok, "input is not represented as a map: %v", i) { + inputID := input["id"] + t.Logf("input %q", inputID) + // check the streams created for the input, should be a list of objects + if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && + assert.IsTypef(t, []map[string]any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { + // loop over streams and cast to map[string]any to access keys + for _, stream := range input["streams"].([]map[string]any) { + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) && + assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + actualFailureThreshold := stream[failureThresholdKey].(uint) + assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID) + } + } + } + } + } + }) + } +} + func TestMonitoringConfigComponentFields(t *testing.T) { agentInfo, err := info.NewAgentInfo(context.Background(), false) require.NoError(t, err, "Error creating agent info") @@ -316,9 +513,9 @@ func TestMonitoringConfigComponentFields(t *testing.T) { inputsSlice := monitoringConfig["inputs"].([]any) for _, input := range inputsSlice { inpMap := input.(map[string]any) - for _, rawStream := range inpMap["streams"].([]any) { - streamID := rawStream.(map[string]any)["id"].(string) - processors := rawStream.(map[string]any)["processors"].([]any) + for _, stream := range inpMap["streams"].([]map[string]any) { + streamID := stream["id"].(string) + processors := stream["processors"].([]any) for _, rawProcessor := range processors { processor := rawProcessor.(map[string]any) if _, exists := processor["add_fields"]; !exists { diff --git a/internal/pkg/core/monitoring/config/config.go b/internal/pkg/core/monitoring/config/config.go index 6688301fb1b..263caddc08f 100644 --- a/internal/pkg/core/monitoring/config/config.go +++ b/internal/pkg/core/monitoring/config/config.go @@ -21,17 +21,18 @@ const ( // MonitoringConfig describes a configuration of a monitoring type MonitoringConfig struct { - Enabled bool `yaml:"enabled" config:"enabled"` - MonitorLogs bool `yaml:"logs" config:"logs"` - MonitorMetrics bool `yaml:"metrics" config:"metrics"` - MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"` - LogMetrics bool `yaml:"-" config:"-"` - HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` - Namespace string `yaml:"namespace" config:"namespace"` - Pprof *PprofConfig `yaml:"pprof" config:"pprof"` - MonitorTraces bool `yaml:"traces" config:"traces"` - APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"` - Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"` + Enabled bool `yaml:"enabled" config:"enabled"` + MonitorLogs bool `yaml:"logs" config:"logs"` + MonitorMetrics bool `yaml:"metrics" config:"metrics"` + MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"` + FailureThreshold *uint `yaml:"failure_threshold" config:"failure_threshold"` + LogMetrics bool `yaml:"-" config:"-"` + HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` + Namespace string `yaml:"namespace" config:"namespace"` + Pprof *PprofConfig `yaml:"pprof" config:"pprof"` + MonitorTraces bool `yaml:"traces" config:"traces"` + APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"` + Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"` } // MonitoringHTTPConfig is a config defining HTTP endpoint published by agent From 988d47b188ceb561f6bdbed9939a99d2da879343 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 16:22:29 +0100 Subject: [PATCH 2/4] lint --- .../pkg/agent/application/monitoring/v1_monitor.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 94aea85d102..deb4893c0e3 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -160,19 +160,18 @@ func (b *BeatsMonitor) MonitoringConfig( } if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found { - switch policyFailureThresholdRaw.(type) { + switch policyValue := policyFailureThresholdRaw.(type) { case uint: - policyValue := policyFailureThresholdRaw.(uint) failureThreshold = &policyValue case int: - policyValue := uint(policyFailureThresholdRaw.(int)) - failureThreshold = &policyValue + unsignedValue := uint(policyValue) + failureThreshold = &unsignedValue case string: - policyValue, err := strconv.Atoi(policyFailureThresholdRaw.(string)) + parsedPolicyValue, err := strconv.Atoi(policyValue) if err != nil { return nil, fmt.Errorf("failed to convert policy failure threshold string to int: %w", err) } - uintPolicyValue := uint(policyValue) + uintPolicyValue := uint(parsedPolicyValue) failureThreshold = &uintPolicyValue default: return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw) From de9470e6827d470e1c1642a2550efc8a008f2d9e Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 16:50:02 +0100 Subject: [PATCH 3/4] fix unit tests --- internal/pkg/agent/application/coordinator/diagnostics_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 9c72400e8e8..27149f790b9 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -100,6 +100,7 @@ agent: metrics_period: "" namespace: "" pprof: null + failure_threshold: null traces: true apm: hosts: From 41471b363c4abe6a83268da6e6687358d2f32b3f Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Wed, 13 Nov 2024 10:52:51 +0100 Subject: [PATCH 4/4] revert streams and beatStreams to []any in injectMetricsInput() --- .../application/monitoring/v1_monitor.go | 21 ++++++-- .../application/monitoring/v1_monitor_test.go | 49 +++++++++++-------- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index deb4893c0e3..7efcd155e15 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -594,9 +594,11 @@ func (b *BeatsMonitor) injectMetricsInput( } monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") - beatsStreams := make([]map[string]interface{}, 0, len(componentIDToBinary)) - streams := []map[string]interface{}{ - { + // beatStreams and streams MUST be []interface{} even if in reality they are []map[string]interface{}: + // if those are declared as slices of maps the message "proto: invalid type: []map[string]interface{}" will pop up + beatsStreams := make([]interface{}, 0, len(componentIDToBinary)) + streams := []interface{}{ + map[string]interface{}{ idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID), "data_stream": map[string]interface{}{ "type": "metrics", @@ -901,10 +903,19 @@ func (b *BeatsMonitor) injectMetricsInput( if failureThreshold != nil { // add failure threshold to all streams and beatStreams for _, s := range streams { - s[failureThresholdKey] = *failureThreshold + if streamMap, ok := s.(map[string]interface{}); ok { + streamMap[failureThresholdKey] = *failureThreshold + } else { + return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s) + } + } for _, s := range beatsStreams { - s[failureThresholdKey] = *failureThreshold + if streamMap, ok := s.(map[string]interface{}); ok { + streamMap[failureThresholdKey] = *failureThreshold + } else { + return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s) + } } } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index fffc76e951e..4d3449d244c 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -234,17 +234,20 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { t.Logf("input %q", inputID) // check the streams created for the input, should be a list of objects if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && - assert.IsTypef(t, []map[string]any{}, input["streams"], "streams for input %q are not a list of maps", inputID) { + assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { // loop over streams and access keys - for _, stream := range input["streams"].([]map[string]any) { - // check period and assert its value - streamID := stream["id"] - if assert.Containsf(t, stream, "period", "stream %q for input %q does not contain a period", streamID, inputID) && - assert.IsType(t, "", stream["period"], "period for stream %q of input %q is not represented as a string", streamID, inputID) { - periodString := stream["period"].(string) - duration, err := time.ParseDuration(periodString) - if assert.NoErrorf(t, err, "Unparseable period duration %s for stream %q of input %q", periodString, streamID, inputID) { - assert.Equalf(t, duration, tc.expectedInterval, "unexpected duration for stream %q of input %q", streamID, inputID) + for _, rawStream := range input["streams"].([]any) { + if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { + stream := rawStream.(map[string]any) + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, "period", "stream %q for input %q does not contain a period", streamID, inputID) && + assert.IsType(t, "", stream["period"], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + periodString := stream["period"].(string) + duration, err := time.ParseDuration(periodString) + if assert.NoErrorf(t, err, "Unparseable period duration %s for stream %q of input %q", periodString, streamID, inputID) { + assert.Equalf(t, duration, tc.expectedInterval, "unexpected duration for stream %q of input %q", streamID, inputID) + } } } } @@ -437,15 +440,19 @@ func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) { t.Logf("input %q", inputID) // check the streams created for the input, should be a list of objects if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && - assert.IsTypef(t, []map[string]any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { + assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { + // loop over streams and cast to map[string]any to access keys - for _, stream := range input["streams"].([]map[string]any) { - // check period and assert its value - streamID := stream["id"] - if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) && - assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) { - actualFailureThreshold := stream[failureThresholdKey].(uint) - assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID) + for _, rawStream := range input["streams"].([]any) { + if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { + stream := rawStream.(map[string]any) + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) && + assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + actualFailureThreshold := stream[failureThresholdKey].(uint) + assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID) + } } } } @@ -513,9 +520,9 @@ func TestMonitoringConfigComponentFields(t *testing.T) { inputsSlice := monitoringConfig["inputs"].([]any) for _, input := range inputsSlice { inpMap := input.(map[string]any) - for _, stream := range inpMap["streams"].([]map[string]any) { - streamID := stream["id"].(string) - processors := stream["processors"].([]any) + for _, rawStream := range inpMap["streams"].([]any) { + streamID := rawStream.(map[string]any)["id"].(string) + processors := rawStream.(map[string]any)["processors"].([]any) for _, rawProcessor := range processors { processor := rawProcessor.(map[string]any) if _, exists := processor["add_fields"]; !exists {