Skip to content

Commit

Permalink
otelcol: fix retry mechanism (#5188)
Browse files Browse the repository at this point in the history
* otelcol: fix retry mechanism

The v0.80 upgrade introduced two new settings to the common retry
mechanism shared between components. One of these new settings,
`multiplier`, specifies the rate at which a backoff is applied.

We did not convert this new field after upgrading, leading the
multiplier to be set to the Go default (zero). After the first retry,
the backoff period would be multiplied by zero for each subsequent
retry, causing retries to happen in a hot loop.

* otelcol.exporter.loadbalancing: properly set defaults
  • Loading branch information
rfratto authored and ptodev committed Sep 21, 2023
1 parent 2bc6033 commit 05de433
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ Main (unreleased)

- Fixed a bug where `otelcol.processor.discovery` could modify the `targets` passed by an upstream component. (@ptodev)

- Fixed a bug where `otelcol` components with a retry mechanism would not wait after the first retry. (@rfratto)

- Fixed a bug where documented default settings in `otelcol.exporter.loadbalancing` were never set. (@rfratto)

v0.36.1 (2023-09-06)
--------------------

Expand Down
11 changes: 7 additions & 4 deletions component/otelcol/config_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package otelcol
import (
"time"

"github.com/cenkalti/backoff/v4"
otelexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
)

Expand Down Expand Up @@ -35,9 +36,11 @@ func (args *RetryArguments) Convert() *otelexporterhelper.RetrySettings {
}

return &otelexporterhelper.RetrySettings{
Enabled: args.Enabled,
InitialInterval: args.InitialInterval,
MaxInterval: args.MaxInterval,
MaxElapsedTime: args.MaxElapsedTime,
Enabled: args.Enabled,
InitialInterval: args.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: args.MaxInterval,
MaxElapsedTime: args.MaxElapsedTime,
}
}
24 changes: 20 additions & 4 deletions component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,22 @@ var (
_ river.Defaulter = &Arguments{}
)

// DefaultArguments holds default values for Arguments.
var DefaultArguments = Arguments{
RoutingKey: "traceID",
}
var (
// DefaultArguments holds default values for Arguments.
DefaultArguments = Arguments{
Protocol: Protocol{
OTLP: DefaultOTLPConfig,
},
RoutingKey: "traceID",
}

DefaultOTLPConfig = OtlpConfig{
Timeout: otelcol.DefaultTimeout,
Queue: otelcol.DefaultQueueArguments,
Retry: otelcol.DefaultRetryArguments,
Client: DefaultGRPCClientArguments,
}
)

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
Expand Down Expand Up @@ -85,6 +97,10 @@ type OtlpConfig struct {
Client GRPCClientArguments `river:"client,block"`
}

func (OtlpConfig *OtlpConfig) SetToDefault() {
*OtlpConfig = DefaultOTLPConfig
}

func (otlpConfig OtlpConfig) Convert() otlpexporter.Config {
return otlpexporter.Config{
TimeoutSettings: exporterhelper.TimeoutSettings{
Expand Down
43 changes: 31 additions & 12 deletions component/otelcol/exporter/loadbalancing/loadbalancing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,35 @@ import (
)

func TestConfigConversion(t *testing.T) {
defaultProtocol := loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
WriteBufferSize: 512 * 1024,
Headers: map[string]configopaque.String{},
BalancerName: "pick_first",
var (
defaultRetrySettings = exporterhelper.NewDefaultRetrySettings()
defaultTimeoutSettings = exporterhelper.NewDefaultTimeoutSettings()

// TODO(rfratto): resync defaults with upstream.
//
// We have drifted from the upstream defaults, which have decreased the
// default queue_size to 1000 since we introduced the defaults.
defaultQueueSettings = exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
}

defaultProtocol = loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
WriteBufferSize: 512 * 1024,
Headers: map[string]configopaque.String{},
BalancerName: "pick_first",
},
RetrySettings: defaultRetrySettings,
TimeoutSettings: defaultTimeoutSettings,
QueueSettings: defaultQueueSettings,
},
},
}
}
)

tests := []struct {
testName string
Expand Down Expand Up @@ -96,14 +114,15 @@ func TestConfigConversion(t *testing.T) {
static {
hostnames = ["endpoint-1", "endpoint-2:55678"]
}
}
`,
}`,
expected: loadbalancingexporter.Config{
Protocol: loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 1 * time.Second,
},
RetrySettings: defaultRetrySettings,
QueueSettings: defaultQueueSettings,
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
Expand Down

0 comments on commit 05de433

Please sign in to comment.