Skip to content

Commit

Permalink
Cherry picks for v0.36.2 (#5269)
Browse files Browse the repository at this point in the history
* otelcol.processor.discovery should not modify its targets attribute (#5170)

Fixed a bug where `otelcol.processor.discovery` could modify the `targets` passed by an upstream component.

---------

Co-authored-by: Paschalis Tsilias <[email protected]>

* otelcol: fix retry mechanism (#5188)

* otelcol: fix retry mechanism

The v0.80 upgrade introduced two new settings to the common retry
mechanism shared between components. One of these new settings,
`multiplier`, specifies the rate at which a backoff is applied.

We did not convert this new field after upgrading, leading the
multiplier to be set to the Go default (zero). After the first retry,
the backoff period would be multiplied by zero for each subsequent
retry, causing retries to happen in a hot loop.

* otelcol.exporter.loadbalancing: properly set defaults

* Rename component stability from "alpha" to "experimental". (#5200)

* Rename stability from "alpha" to "experimental".
This complies with Agent stability naming conventions.

* Add an experimental stability block
for "otelcol.processor.span" and "otelcol.connector.spanmetrics".

* Sync loki.source.file with promtail (#5245)

* Sync loki.source.file with promtail

* changelog

* Fix scraped targets metric (#5263)

* Remove main branch changes from the Changelog

* Add v0.36.2 to the changelog

* Change date of v0.36.2 from 2023-09-21 to 2023-09-22

---------

Co-authored-by: Paschalis Tsilias <[email protected]>
Co-authored-by: Robert Fratto <[email protected]>
Co-authored-by: Piotr <[email protected]>
  • Loading branch information
4 people authored Sep 22, 2023
1 parent 3635844 commit 35d1103
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 67 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ This document contains a historical list of changes between releases. Only
changes that impact end-user behavior are listed; changes to documentation or
internal API changes are not present.

v0.36.2 (2023-09-22)
--------------------

### Bugfixes

- Fixed a bug where `otelcol.processor.discovery` could modify the `targets` passed by an upstream component. (@ptodev)

- Fixed a bug where `otelcol` components with a retry mechanism would not wait after the first retry. (@rfratto)

- Fixed a bug where documented default settings in `otelcol.exporter.loadbalancing` were never set. (@rfratto)

- Fix `loki.source.file` race condition in cleaning up metrics when stopping to tail files. (@thampiotr)

- Fixed the `agent_prometheus_scrape_targets_gauge` incorrectly reporting all discovered targets
instead of targets that belong to current instance when clustering is enabled. (@thampiotr)

v0.36.1 (2023-09-06)
--------------------

Expand Down
6 changes: 3 additions & 3 deletions component/loki/source/file/decompresser.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package file

// This code is copied from Promtail. decompressor implements the reader
// interface and is used to read compressed log files. It uses the Go stdlib's
// compress/* packages for decoding.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// Decompressor implements the reader interface and is used to read compressed log files.
// It uses the Go stdlib's compress/* packages for decoding.

import (
"bufio"
Expand Down
6 changes: 3 additions & 3 deletions component/loki/source/file/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package file

// This code is copied from Promtail. The metrics struct provides a common set
// of metrics that are reused between all implementations of the reader
// interface.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// The metrics struct provides a common set of metrics that are reused between all
// implementations of the reader interface.

import "github.com/prometheus/client_golang/prometheus"

Expand Down
4 changes: 2 additions & 2 deletions component/loki/source/file/reader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package file

// This code is copied from Promtail to accommodate the tailer and decompressor
// implementations as readers.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// This code accommodates the tailer and decompressor implementations as readers.

// reader contains the set of methods the loki.source.file component uses.
type reader interface {
Expand Down
19 changes: 11 additions & 8 deletions component/loki/source/file/tailer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package file

// This code is copied from Promtail. tailer implements the reader interface by
// using the github.com/grafana/tail package to tail files.
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// tailer implements the reader interface by using the github.com/grafana/tail package to tail files.

import (
"fmt"
Expand Down Expand Up @@ -117,6 +117,8 @@ func (t *tailer) updatePosition() {
defer func() {
positionWait.Stop()
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
// NOTE: metrics must be cleaned up after the position timer exits, as MarkPositionAndSize() updates metrics.
t.cleanupMetrics()
close(t.posdone)
}()

Expand Down Expand Up @@ -152,10 +154,11 @@ func (t *tailer) readLines() {
// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
t.cleanupMetrics()
t.running.Store(false)
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path)
close(t.done)
// Shut down the position marker thread
close(t.posquit)
}()
entries := t.handler.Chan()
for {
Expand Down Expand Up @@ -209,12 +212,14 @@ func (t *tailer) MarkPositionAndSize() error {
}
return err
}
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))

pos, err := t.tail.Tell()
if err != nil {
return err
}

// Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped.
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, t.labels, pos)

Expand All @@ -225,10 +230,6 @@ func (t *tailer) Stop() {
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
// we wrap the stop in a sync.Once.
t.stopOnce.Do(func() {
// Shut down the position marker thread
close(t.posquit)
<-t.posdone

// Save the current position before shutting down tailer
err := t.MarkPositionAndSize()
if err != nil {
Expand All @@ -242,6 +243,8 @@ func (t *tailer) Stop() {
}
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-t.done
// Wait for the position marker thread to exit
<-t.posdone
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
t.handler.Stop()
})
Expand Down
11 changes: 7 additions & 4 deletions component/otelcol/config_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package otelcol
import (
"time"

"github.com/cenkalti/backoff/v4"
otelexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
)

Expand Down Expand Up @@ -35,9 +36,11 @@ func (args *RetryArguments) Convert() *otelexporterhelper.RetrySettings {
}

return &otelexporterhelper.RetrySettings{
Enabled: args.Enabled,
InitialInterval: args.InitialInterval,
MaxInterval: args.MaxInterval,
MaxElapsedTime: args.MaxElapsedTime,
Enabled: args.Enabled,
InitialInterval: args.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: args.MaxInterval,
MaxElapsedTime: args.MaxElapsedTime,
}
}
24 changes: 20 additions & 4 deletions component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,22 @@ var (
_ river.Defaulter = &Arguments{}
)

// DefaultArguments holds default values for Arguments.
var DefaultArguments = Arguments{
RoutingKey: "traceID",
}
var (
// DefaultArguments holds default values for Arguments.
DefaultArguments = Arguments{
Protocol: Protocol{
OTLP: DefaultOTLPConfig,
},
RoutingKey: "traceID",
}

DefaultOTLPConfig = OtlpConfig{
Timeout: otelcol.DefaultTimeout,
Queue: otelcol.DefaultQueueArguments,
Retry: otelcol.DefaultRetryArguments,
Client: DefaultGRPCClientArguments,
}
)

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
Expand Down Expand Up @@ -85,6 +97,10 @@ type OtlpConfig struct {
Client GRPCClientArguments `river:"client,block"`
}

func (OtlpConfig *OtlpConfig) SetToDefault() {
*OtlpConfig = DefaultOTLPConfig
}

func (otlpConfig OtlpConfig) Convert() otlpexporter.Config {
return otlpexporter.Config{
TimeoutSettings: exporterhelper.TimeoutSettings{
Expand Down
43 changes: 31 additions & 12 deletions component/otelcol/exporter/loadbalancing/loadbalancing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,35 @@ import (
)

func TestConfigConversion(t *testing.T) {
defaultProtocol := loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
WriteBufferSize: 512 * 1024,
Headers: map[string]configopaque.String{},
BalancerName: "pick_first",
var (
defaultRetrySettings = exporterhelper.NewDefaultRetrySettings()
defaultTimeoutSettings = exporterhelper.NewDefaultTimeoutSettings()

// TODO(rfratto): resync defaults with upstream.
//
// We have drifted from the upstream defaults, which have decreased the
// default queue_size to 1000 since we introduced the defaults.
defaultQueueSettings = exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
}

defaultProtocol = loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
WriteBufferSize: 512 * 1024,
Headers: map[string]configopaque.String{},
BalancerName: "pick_first",
},
RetrySettings: defaultRetrySettings,
TimeoutSettings: defaultTimeoutSettings,
QueueSettings: defaultQueueSettings,
},
},
}
}
)

tests := []struct {
testName string
Expand Down Expand Up @@ -96,14 +114,15 @@ func TestConfigConversion(t *testing.T) {
static {
hostnames = ["endpoint-1", "endpoint-2:55678"]
}
}
`,
}`,
expected: loadbalancingexporter.Config{
Protocol: loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 1 * time.Second,
},
RetrySettings: defaultRetrySettings,
QueueSettings: defaultQueueSettings,
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
Expand Down
8 changes: 3 additions & 5 deletions component/otelcol/processor/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (args *Arguments) Validate() error {

// Component is the otelcol.exporter.discovery component.
type Component struct {
cfg Arguments
consumer *promsdconsumer.Consumer
logger log.Logger
}
Expand Down Expand Up @@ -134,18 +133,17 @@ func (c *Component) Run(ctx context.Context) error {
// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
cfg := newConfig.(Arguments)
c.cfg = cfg

hostLabels := make(map[string]discovery.Target)

for _, labels := range c.cfg.Targets {
for _, labels := range cfg.Targets {
host, err := promsdconsumer.GetHostFromLabels(labels)
if err != nil {
level.Warn(c.logger).Log("msg", "ignoring target, unable to find address", "err", err)
continue
}
promsdconsumer.CleanupLabels(labels)
hostLabels[host] = labels

hostLabels[host] = promsdconsumer.NewTargetsWithNonInternalLabels(labels)
}

err := c.consumer.UpdateOptions(promsdconsumer.Options{
Expand Down
2 changes: 1 addition & 1 deletion component/prometheus/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (c *Component) Run(ctx context.Context) error {
// 'clustered' targets implementation every time.
ct := discovery.NewDistributedTargets(cl, c.cluster, tgs)
promTargets := c.componentTargetsToProm(jobName, ct.Get())
c.targetsGauge.Set(float64(len(promTargets)))

select {
case targetSetsChan <- promTargets:
Expand Down Expand Up @@ -247,7 +248,6 @@ func (c *Component) Update(args component.Arguments) error {
default:
}

c.targetsGauge.Set(float64(len(c.args.Targets)))
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ aliases:
- /docs/grafana-cloud/monitor-infrastructure/integrations/agent/flow/reference/components/otelcol.connector.spanmetrics/
canonical: https://grafana.com/docs/agent/latest/flow/reference/components/otelcol.connector.spanmetrics/
labels:
stage: alpha
stage: experimental
title: otelcol.connector.spanmetrics
---

# otelcol.connector.spanmetrics

`otelcol.connector.spanmetrics` accepts span data from other `otelcol` components and
{{< docs/shared lookup="flow/stability/experimental.md" source="agent" version="<AGENT VERSION>" >}}

`otelcol.connector.spanmetrics` accepts span data from other `otelcol` components and
aggregates Request, Error and Duration (R.E.D) OpenTelemetry metrics from the spans:

* **Request** counts are computed as the number of spans seen per unique set of dimensions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ information.
## Examples

### Basic usage
```
```river
discovery.http "dynamic_targets" {
url = "https://example.com/scrape_targets"
refresh_interval = "15s"
Expand All @@ -123,7 +123,7 @@ otelcol.processor.discovery "default" {

Outputs from more than one discovery process can be combined via the `concat` function.

```
```river
discovery.http "dynamic_targets" {
url = "https://example.com/scrape_targets"
refresh_interval = "15s"
Expand All @@ -149,7 +149,7 @@ It is not necessary to use a discovery component. In the example below, a `test_
attribute will be added to a span if its IP address is "1.2.2.2". The `__internal_label__` will
be not be added to the span, because it begins with a double underscore (`__`).

```
```river
otelcol.processor.discovery "default" {
targets = [{
"__address__" = "1.2.2.2",
Expand Down
Loading

0 comments on commit 35d1103

Please sign in to comment.