Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ agent:
metrics_period: ""
namespace: ""
pprof: null
failure_threshold: null
traces: true
apm:
hosts:
Expand Down
55 changes: 54 additions & 1 deletion internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"unicode"
Expand Down Expand Up @@ -48,6 +49,7 @@ const (
monitoringKey = "monitoring"
useOutputKey = "use_output"
monitoringMetricsPeriodKey = "metrics_period"
failureThresholdKey = "failure_threshold"
monitoringOutput = "monitoring"
defaultMonitoringNamespace = "default"
agentName = "elastic-agent"
Expand All @@ -60,6 +62,10 @@ const (
// metricset execution period used for the monitoring metrics inputs
// we set this to 60s to reduce the load/data volume on the monitoring cluster
defaultMetricsCollectionInterval = 60 * time.Second

// metricset stream failure threshold before the stream is marked as DEGRADED
// to avoid marking the agent degraded for transient errors, we set the default threshold to 2
defaultMetricsStreamFailureThreshold = uint(2)
)

var (
Expand Down Expand Up @@ -131,6 +137,7 @@ func (b *BeatsMonitor) MonitoringConfig(

monitoringOutputName := defaultOutputName
metricsCollectionIntervalString := b.config.C.MetricsPeriod
failureThreshold := b.config.C.FailureThreshold
if agentCfg, found := policy[agentKey]; found {
// The agent section is required for feature flags
cfg[agentKey] = agentCfg
Expand All @@ -151,6 +158,25 @@ func (b *BeatsMonitor) MonitoringConfig(
metricsCollectionIntervalString = metricsPeriodStr
}
}

if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found {
switch policyValue := policyFailureThresholdRaw.(type) {
case uint:
failureThreshold = &policyValue
case int:
unsignedValue := uint(policyValue)
failureThreshold = &unsignedValue
case string:
parsedPolicyValue, err := strconv.Atoi(policyValue)
if err != nil {
return nil, fmt.Errorf("failed to convert policy failure threshold string to int: %w", err)
}
uintPolicyValue := uint(parsedPolicyValue)
failureThreshold = &uintPolicyValue
default:
return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw)
}
}
}
}
}
Expand All @@ -173,7 +199,7 @@ func (b *BeatsMonitor) MonitoringConfig(
}

if b.config.C.MonitorMetrics {
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString); err != nil {
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString, failureThreshold); err != nil {
return nil, errors.New(err, "failed to inject monitoring output")
}
}
Expand Down Expand Up @@ -556,12 +582,20 @@ func (b *BeatsMonitor) injectMetricsInput(
componentList []component.Component,
existingStateServicePids map[string]uint64,
metricsCollectionIntervalString string,
failureThreshold *uint,
) error {
if metricsCollectionIntervalString == "" {
metricsCollectionIntervalString = defaultMetricsCollectionInterval.String()
}

if failureThreshold == nil {
defaultValue := defaultMetricsStreamFailureThreshold
failureThreshold = &defaultValue
}
monitoringNamespace := b.monitoringNamespace()
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
// beatStreams and streams MUST be []interface{} even if in reality they are []map[string]interface{}:
// if those are declared as slices of maps the message "proto: invalid type: []map[string]interface{}" will pop up
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
streams := []interface{}{
map[string]interface{}{
Expand Down Expand Up @@ -866,6 +900,25 @@ func (b *BeatsMonitor) injectMetricsInput(

}

if failureThreshold != nil {
// add failure threshold to all streams and beatStreams
for _, s := range streams {
if streamMap, ok := s.(map[string]interface{}); ok {
streamMap[failureThresholdKey] = *failureThreshold
} else {
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
}

}
for _, s := range beatsStreams {
if streamMap, ok := s.(map[string]interface{}); ok {
streamMap[failureThresholdKey] = *failureThreshold
} else {
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
}
}
}

inputs := []interface{}{
map[string]interface{}{
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
Expand Down
206 changes: 205 additions & 1 deletion internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
// check the streams created for the input, should be a list of objects
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {
// loop over streams and cast to map[string]any to access keys
// loop over streams and access keys
for _, rawStream := range input["streams"].([]any) {
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
stream := rawStream.(map[string]any)
Expand All @@ -258,6 +258,210 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
}
}

func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) {

agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

sampleFiveErrorsStreamThreshold := uint(5)
sampleTenErrorsStreamThreshold := uint(10)

tcs := []struct {
name string
monitoringCfg *monitoringConfig
policy map[string]any
expectedThreshold uint
}{
{
name: "default failure threshold",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: defaultMetricsStreamFailureThreshold,
},
{
name: "agent config failure threshold",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleFiveErrorsStreamThreshold,
},
{
name: "policy failure threshold uint",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: sampleTenErrorsStreamThreshold,
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
{
name: "policy failure threshold int",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: 10,
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
{
name: "policy failure threshold string",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: "10",
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
}

for _, tc := range tcs {

t.Run(tc.name, func(t *testing.T) {
b := &BeatsMonitor{
enabled: true,
config: tc.monitoringCfg,
operatingSystem: runtime.GOOS,
agentInfo: agentInfo,
}
got, err := b.MonitoringConfig(tc.policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input
assert.NoError(t, err)

rawInputs, ok := got["inputs"]
require.True(t, ok, "monitoring config contains no input")
inputs, ok := rawInputs.([]any)
require.True(t, ok, "monitoring inputs are not a list")
marshaledInputs, err := yaml.Marshal(inputs)
if assert.NoError(t, err, "error marshaling monitoring inputs") {
t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs)
}

// loop over the created inputs
for _, i := range inputs {
input, ok := i.(map[string]any)
if assert.Truef(t, ok, "input is not represented as a map: %v", i) {
inputID := input["id"]
t.Logf("input %q", inputID)
// check the streams created for the input, should be a list of objects
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {

// loop over streams and cast to map[string]any to access keys
for _, rawStream := range input["streams"].([]any) {
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
stream := rawStream.(map[string]any)
// check period and assert its value
streamID := stream["id"]
if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) &&
assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) {
actualFailureThreshold := stream[failureThresholdKey].(uint)
assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID)
}
}
}
}
}
}
})
}
}

func TestMonitoringConfigComponentFields(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")
Expand Down
23 changes: 12 additions & 11 deletions internal/pkg/core/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ const (

// MonitoringConfig describes a configuration of a monitoring
type MonitoringConfig struct {
Enabled bool `yaml:"enabled" config:"enabled"`
MonitorLogs bool `yaml:"logs" config:"logs"`
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
LogMetrics bool `yaml:"-" config:"-"`
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
Namespace string `yaml:"namespace" config:"namespace"`
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
MonitorTraces bool `yaml:"traces" config:"traces"`
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
Enabled bool `yaml:"enabled" config:"enabled"`
MonitorLogs bool `yaml:"logs" config:"logs"`
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
FailureThreshold *uint `yaml:"failure_threshold" config:"failure_threshold"`
Copy link
Contributor

@michalpristas michalpristas Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on this configuration default value for FailureThreshold is nil
but it is not true as we set default value if it's not set.
shouldn't we preset it here in DefaultConfig() to make expectations clear?
i know you took metrics period as an example. not a blocker just thinking out loud

Copy link
Contributor

@michalpristas michalpristas Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also why *uint seems unconventional and it does not match beat side implementation either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why the defaults here don't match the beats default is for backward compatibility: until now the first error on a stream in beats would trigger a status change to DEGRADED and beats maintains that behavior by default.

Here on the agent side the reason for *uint comes from:

  • zero value (0) would disable any status change for the monitoring metricbeat streams
  • the default is managed in internal/pkg/agent/application/monitoring/v1_monitor.go when we are sure there's been no set of value in config (pointer == nil)

I suppose I could set the default value in func DefaultConfig() *MonitoringConfig but if by any chance the Monitoring config is deserialized without calling DefaultConfig() we would get an erraneous value set in the failureThreshold (0).
Another case I am concerned with is unmarshaling from a fleet policy that does not specify any value for the threshold: if we set 2 or 0 by default there we could be overriding some other value...
Overall having a clear "not set" value that matches the go zero value seemed a safer approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate about backward compatibility?
in a matrix of config/agent
old/old and new/new we don't care
old/new - means we have old config and we should aim for safer behavior
new/old - new config with old agent we don't understand newly introduced keywords and ignore them, behavior is not there

I suppose I could set the default value in func DefaultConfig() *MonitoringConfig but if by any chance the Monitoring config is deserialized without calling DefaultConfig() we would get an erraneous value set in the failureThreshold (0).

this applies for probably any config we have

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate about backward compatibility? in a matrix of config/agent old/old and new/new we don't care old/new - means we have old config and we should aim for safer behavior new/old - new config with old agent we don't understand newly introduced keywords and ignore them, behavior is not there

When I mentioned backward compatibility I was referring to metricbeat and the reason why the defaults in agent don't match the one in beats as it was mentioned in an earlier comment (quoted below)

also why *uint seems unconventional and it does not match beat side implementation either.

What I meant with "backward compatibility" was not "backward compatibility between agent and beats version" but rather "metricbeat default value for failureThreshold takes into account the current behaviour when a metricset stream errors out during fetch"

There's no need for a config/agent old/new matrix as we want to change the default behaviour of monitoring config in order to solve #5332.
The possibility to configure the value in the monitoring config has been added as a way to change/disable the DEGRADED status if we need to mitigate an issue or change behavior testing purposes (it's an escape hatch of sorts).

I suppose I could set the default value in func DefaultConfig() *MonitoringConfig but if by any chance the Monitoring config is deserialized without calling DefaultConfig() we would get an erraneous value set in the failureThreshold (0).

this applies for probably any config we have

In this case, all values of uint are valid configuration values (there's no not set value).
The fact that we have already other configuration values that cannot represent "not explicitly set" because of default values, should not preclude to define a new config value as *uint so that a nil value to be interpreted as "value not set".
I am not sure that following what has already been done for other configurations would have definite advantages in this case.

LogMetrics bool `yaml:"-" config:"-"`
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
Namespace string `yaml:"namespace" config:"namespace"`
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
MonitorTraces bool `yaml:"traces" config:"traces"`
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
}

// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent
Expand Down