Skip to content

Commit

Permalink
Fix issue with config reload when using a log pipeline with a metric …
Browse files Browse the repository at this point in the history
…stage (#6971)

* Fix bug with unnecessary config reload

* Apply suggestions from code review

Co-authored-by: Clayton Cornell <[email protected]>
Co-authored-by: Piotr <[email protected]>

* Refactor Flow unit test to make it more readable.

* Add comments, prettify tests.

---------

Co-authored-by: Clayton Cornell <[email protected]>
Co-authored-by: Piotr <[email protected]>
  • Loading branch information
3 people authored Jul 15, 2024
1 parent 8dbd241 commit 0936175
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 14 deletions.
13 changes: 8 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ Main (unreleased)
- A new `otelcol.exporter.debug` component for printing OTel telemetry from
other `otelcol` components to the console. (@BarunKGP)

### Bugfixes

- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed.
The bug only affected the "metrics" and "logs" subsystems in Static mode. (@ptodev)

- Fix a bug in Static mode and Flow which prevented config reloads to work if a Loki `metrics` stage is in the pipeline.
This resulted in a "failed to unregister all metrics from previous promtail" message. (@ptodev)

v0.41.1 (2024-06-07)
--------------------

Expand All @@ -28,11 +36,6 @@ v0.41.1 (2024-06-07)

- Updated pyroscope to v0.4.6 introducing `symbols_map_size` and `pid_map_size` configuration. (@simonswine)

### Bugfixes

- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed.
The bug only affected the "metrics" and "logs" subsystems in Static mode.

v0.41.0 (2024-05-31)
--------------------

Expand Down
4 changes: 4 additions & 0 deletions docs/sources/flow/reference/components/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ The following blocks are supported inside the definition of `stage.metrics`:
| metric.gauge | [metric.gauge][] | Defines a `gauge` metric. | no |
| metric.histogram | [metric.histogram][] | Defines a `histogram` metric. | no |

{{< admonition type="note" >}}
The metrics will be reset if you reload the {{< param "PRODUCT_ROOT_NAME" >}} configuration file.
{{< /admonition >}}

[metric.counter]: #metriccounter-block
[metric.gauge]: #metricgauge-block
[metric.histogram]: #metrichistogram-block
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() {})
c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
c.stages = newArgs.Stages
}
Expand Down
193 changes: 191 additions & 2 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package process

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

Expand All @@ -18,12 +20,15 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/river"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
)

const logline = `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`

func TestJSONLabelsStage(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

Expand Down Expand Up @@ -91,7 +96,6 @@ func TestJSONLabelsStage(t *testing.T) {

// Send a log entry to the component's receiver.
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -454,7 +458,6 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
go func() {
for {
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -486,3 +489,189 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
time.Sleep(1 * time.Second)
require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond)
}

func TestMetricsStageRefresh(t *testing.T) {
tester := newTester(t)
defer tester.stop()

forwardArgs := `
// This will be filled later
forward_to = []`

numLogsToSend := 3

cfgWithMetric := `
stage.metrics {
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}` + forwardArgs

cfgWithMetric_Metrics := `
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

// The component will be reconfigured so that it has a metric.
t.Run("config with a metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
"",
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend))
})

// The component will be "updated" with the same config.
// We expect the metric to stay the same before logs are sent - the component should be smart enough to
// know that the new config is the same as the old one and it should just keep running as it is.
// If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader"
// which reloads the collector config every X seconds.
// Those users wouldn't expect their metrics to be reset every time the config is reloaded.
t.Run("config with the same metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend),
fmt.Sprintf(cfgWithMetric_Metrics, 2*numLogsToSend))
})

// Use a config which has no metrics stage.
// This should cause the metric to disappear.
cfgWithNoStages := forwardArgs
t.Run("config with no metrics stage", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "")
})

// Use a config which has a metric with a different name,
// as well as a metric with the same name as the one in the previous config.
// We try having a metric with the same name as before so that we can see if there
// is some sort of double registration error for that metric.
cfgWithTwoMetrics := `
stage.metrics {
metric.counter {
name = "paulin_test_3"
action = "inc"
match_all = true
}
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}` + forwardArgs

expectedMetrics3 := `
# HELP loki_process_custom_paulin_test_3
# TYPE loki_process_custom_paulin_test_3 counter
loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

t.Run("config with a new and old metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithTwoMetrics,
"",
fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend))
})
}

type tester struct {
t *testing.T
component *Component
registry *prometheus.Registry
cancelFunc context.CancelFunc
logReceiver loki.LogsReceiver
logTimestamp time.Time
logEntry loki.Entry
wantLabelSet model.LabelSet
}

// Create the component, so that it can process and forward logs.
func newTester(t *testing.T) *tester {
reg := prometheus.NewRegistry()

opts := component.Options{
Logger: util.TestFlowLogger(t),
Registerer: reg,
OnStateChange: func(e component.Exports) {},
}

initialCfg := `forward_to = []`
var args Arguments
err := river.Unmarshal([]byte(initialCfg), &args)
require.NoError(t, err)

logReceiver := loki.NewLogsReceiver()
args.ForwardTo = []loki.LogsReceiver{logReceiver}

c, err := New(opts, args)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
go c.Run(ctx)

logTimestamp := time.Now()

return &tester{
t: t,
component: c,
registry: reg,
cancelFunc: cancel,
logReceiver: logReceiver,
logTimestamp: logTimestamp,
logEntry: loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Timestamp: logTimestamp,
Line: logline,
},
},
wantLabelSet: model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
"foo": "bar",
},
}
}

func (t *tester) stop() {
t.cancelFunc()
}

func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSendingLogs, expectedMetricsAfterSendingLogs string) {
var args Arguments
err := river.Unmarshal([]byte(cfg), &args)
require.NoError(t.t, err)

args.ForwardTo = []loki.LogsReceiver{t.logReceiver}

t.component.Update(args)

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil {
require.NoError(t.t, err)
}

// Send logs.
for i := 0; i < numLogsToSend; i++ {
t.component.receiver.Chan() <- t.logEntry
}

// Receive logs.
for i := 0; i < numLogsToSend; i++ {
select {
case logEntry := <-t.logReceiver.Chan():
require.True(t.t, t.logTimestamp.Equal(logEntry.Timestamp))
require.Equal(t.t, logline, logEntry.Line)
require.Equal(t.t, t.wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t.t, "failed waiting for log line")
}
}

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil {
require.NoError(t.t, err)
}
}
41 changes: 41 additions & 0 deletions internal/util/unregisterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,38 @@ func WrapWithUnregisterer(reg prometheus.Registerer) *Unregisterer {
}
}

// An "unchecked collector" is a collector which returns an empty description.
// It is described in the Prometheus documentation, here:
// https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics
//
// > Alternatively, you could return no Desc at all, which will mark the Collector “unchecked”.
// > No checks are performed at registration time, but metric consistency will still be ensured at scrape time,
// > i.e. any inconsistencies will lead to scrape errors. Thus, with unchecked Collectors,
// > the responsibility to not collect metrics that lead to inconsistencies in the total scrape result
// > lies with the implementer of the Collector. While this is not a desirable state, it is sometimes necessary.
// > The typical use case is a situation where the exact metrics to be returned by a Collector cannot be predicted
// > at registration time, but the implementer has sufficient knowledge of the whole system to guarantee metric consistency.
//
// Unchecked collectors are used in the Loki "metrics" stage of the Loki "process" component.
//
// The isUncheckedCollector function is similar to how Prometheus' Go client extracts the metric description:
// https://github.com/prometheus/client_golang/blob/45f1e72421d9d11af6be784ad60b7389f7543e70/prometheus/registry.go#L372-L381
func isUncheckedCollector(c prometheus.Collector) bool {
descChan := make(chan *prometheus.Desc, 10)

go func() {
c.Describe(descChan)
close(descChan)
}()

i := 0
for range descChan {
i += 1
}

return i == 0
}

// Register implements prometheus.Registerer.
func (u *Unregisterer) Register(c prometheus.Collector) error {
if u.wrap == nil {
Expand All @@ -28,6 +60,11 @@ func (u *Unregisterer) Register(c prometheus.Collector) error {
if err != nil {
return err
}

if isUncheckedCollector(c) {
return nil
}

u.cs[c] = struct{}{}
return nil
}
Expand All @@ -43,6 +80,10 @@ func (u *Unregisterer) MustRegister(cs ...prometheus.Collector) {

// Unregister implements prometheus.Registerer.
func (u *Unregisterer) Unregister(c prometheus.Collector) bool {
if isUncheckedCollector(c) {
return true
}

if u.wrap != nil && u.wrap.Unregister(c) {
delete(u.cs, c)
return true
Expand Down
35 changes: 35 additions & 0 deletions internal/util/unregisterer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func Test_UnregisterTwice_NormalCollector(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_metric",
Help: "Test metric.",
})
u.Register(c)
require.True(t, u.Unregister(c))
require.False(t, u.Unregister(c))
}

type uncheckedCollector struct{}

func (uncheckedCollector) Describe(chan<- *prometheus.Desc) {}

func (uncheckedCollector) Collect(chan<- prometheus.Metric) {}

var _ prometheus.Collector = uncheckedCollector{}

func Test_UnregisterTwice_UncheckedCollector(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := uncheckedCollector{}
u.Register(c)
require.True(t, u.Unregister(c))
require.True(t, u.Unregister(c))
}
Loading

0 comments on commit 0936175

Please sign in to comment.