From 0d385e562f7d1a479ad35acf6981e670ebece3b5 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 16 Jul 2024 11:01:24 +0100 Subject: [PATCH] Fix bug with loki.process metrics stage during config reload (#1292) * Fix bug with metrics stage config reload --- CHANGELOG.md | 4 + .../reference/components/loki/loki.process.md | 4 + internal/component/loki/process/process.go | 2 +- .../component/loki/process/process_test.go | 193 +++++++++++++++++- internal/util/unregisterer.go | 41 ++++ internal/util/unregisterer_test.go | 35 ++++ 6 files changed, 276 insertions(+), 3 deletions(-) create mode 100644 internal/util/unregisterer_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 99edfe611b..b66eabb03f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,10 @@ Main (unreleased) - Fixed a clustering mode issue where a fatal startup failure of the clustering service would exit the service silently, without also exiting the Alloy process. (@thampiotr) +- Fix a bug which prevented config reloads to work if a Loki `metrics` stage is in the pipeline. + Previously, the reload would fail for `loki.process` without an error in the logs and the metrics + from the `metrics` stage would get stuck at the same values. (@ptodev) + v1.2.1 ----------------- diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index cb8808f565..36d1a771ce 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -641,6 +641,10 @@ The following blocks are supported inside the definition of `stage.metrics`: | metric.gauge | [metric.gauge][] | Defines a `gauge` metric. | no | | metric.histogram | [metric.histogram][] | Defines a `histogram` metric. | no | +{{< admonition type="note" >}} +The metrics will be reset if you reload the {{< param "PRODUCT_NAME" >}} configuration file. +{{< /admonition >}} + [metric.counter]: #metriccounter-block [metric.gauge]: #metricgauge-block [metric.histogram]: #metrichistogram-block diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index 2754feeb7c..35785fe020 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -138,7 +138,7 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return err } - c.entryHandler = loki.NewEntryHandler(c.processOut, func() {}) + c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) c.processIn = pipeline.Wrap(c.entryHandler).Chan() c.stages = newArgs.Stages } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 4add0069b6..92825ec2f8 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -6,11 +6,13 @@ import ( "context" "fmt" "os" + "strings" "testing" "time" "github.com/grafana/loki/v3/pkg/logproto" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -27,6 +29,8 @@ import ( "github.com/grafana/alloy/syntax" ) +const logline = `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` + func TestJSONLabelsStage(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) @@ -95,7 +99,6 @@ func TestJSONLabelsStage(t *testing.T) { // Send a log entry to the component's receiver. ts := time.Now() - logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, Entry: logproto.Entry{ @@ -461,7 +464,6 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { go func() { for { ts := time.Now() - logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, Entry: logproto.Entry{ @@ -502,3 +504,190 @@ func getServiceData(name string) (interface{}, error) { return nil, fmt.Errorf("service not found %s", name) } } + +func TestMetricsStageRefresh(t *testing.T) { + tester := newTester(t) + defer tester.stop() + + forwardArgs := ` + // This will be filled later + forward_to = []` + + numLogsToSend := 3 + + cfgWithMetric := ` + stage.metrics { + metric.counter { + name = "paulin_test" + action = "inc" + match_all = true + } + }` + forwardArgs + + cfgWithMetric_Metrics := ` + # HELP loki_process_custom_paulin_test + # TYPE loki_process_custom_paulin_test counter + loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + // The component will be reconfigured so that it has a metric. + t.Run("config with a metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithMetric, + "", + fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend)) + }) + + // The component will be "updated" with the same config. + // We expect the metric to stay the same before logs are sent - the component should be smart enough to + // know that the new config is the same as the old one and it should just keep running as it is. + // If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader" + // which reloads the collector config every X seconds. + // Those users wouldn't expect their metrics to be reset every time the config is reloaded. + t.Run("config with the same metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithMetric, + fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend), + fmt.Sprintf(cfgWithMetric_Metrics, 2*numLogsToSend)) + }) + + // Use a config which has no metrics stage. + // This should cause the metric to disappear. + cfgWithNoStages := forwardArgs + t.Run("config with no metrics stage", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "") + }) + + // Use a config which has a metric with a different name, + // as well as a metric with the same name as the one in the previous config. + // We try having a metric with the same name as before so that we can see if there + // is some sort of double registration error for that metric. + cfgWithTwoMetrics := ` + stage.metrics { + metric.counter { + name = "paulin_test_3" + action = "inc" + match_all = true + } + metric.counter { + name = "paulin_test" + action = "inc" + match_all = true + } + }` + forwardArgs + + expectedMetrics3 := ` + # HELP loki_process_custom_paulin_test_3 + # TYPE loki_process_custom_paulin_test_3 counter + loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + # HELP loki_process_custom_paulin_test + # TYPE loki_process_custom_paulin_test counter + loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + t.Run("config with a new and old metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithTwoMetrics, + "", + fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend)) + }) +} + +type tester struct { + t *testing.T + component *Component + registry *prometheus.Registry + cancelFunc context.CancelFunc + logReceiver loki.LogsReceiver + logTimestamp time.Time + logEntry loki.Entry + wantLabelSet model.LabelSet +} + +// Create the component, so that it can process and forward logs. +func newTester(t *testing.T) *tester { + reg := prometheus.NewRegistry() + + opts := component.Options{ + Logger: util.TestAlloyLogger(t), + Registerer: reg, + OnStateChange: func(e component.Exports) {}, + GetServiceData: getServiceData, + } + + initialCfg := `forward_to = []` + var args Arguments + err := syntax.Unmarshal([]byte(initialCfg), &args) + require.NoError(t, err) + + logReceiver := loki.NewLogsReceiver() + args.ForwardTo = []loki.LogsReceiver{logReceiver} + + c, err := New(opts, args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + go c.Run(ctx) + + logTimestamp := time.Now() + + return &tester{ + t: t, + component: c, + registry: reg, + cancelFunc: cancel, + logReceiver: logReceiver, + logTimestamp: logTimestamp, + logEntry: loki.Entry{ + Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, + Entry: logproto.Entry{ + Timestamp: logTimestamp, + Line: logline, + }, + }, + wantLabelSet: model.LabelSet{ + "filename": "/var/log/pods/agent/agent/1.log", + "foo": "bar", + }, + } +} + +func (t *tester) stop() { + t.cancelFunc() +} + +func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSendingLogs, expectedMetricsAfterSendingLogs string) { + var args Arguments + err := syntax.Unmarshal([]byte(cfg), &args) + require.NoError(t.t, err) + + args.ForwardTo = []loki.LogsReceiver{t.logReceiver} + + t.component.Update(args) + + // Check the component metrics. + if err := testutil.GatherAndCompare(t.registry, + strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil { + require.NoError(t.t, err) + } + + // Send logs. + for i := 0; i < numLogsToSend; i++ { + t.component.receiver.Chan() <- t.logEntry + } + + // Receive logs. + for i := 0; i < numLogsToSend; i++ { + select { + case logEntry := <-t.logReceiver.Chan(): + require.True(t.t, t.logTimestamp.Equal(logEntry.Timestamp)) + require.Equal(t.t, logline, logEntry.Line) + require.Equal(t.t, t.wantLabelSet, logEntry.Labels) + case <-time.After(5 * time.Second): + require.FailNow(t.t, "failed waiting for log line") + } + } + + // Check the component metrics. + if err := testutil.GatherAndCompare(t.registry, + strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil { + require.NoError(t.t, err) + } +} diff --git a/internal/util/unregisterer.go b/internal/util/unregisterer.go index 5b03ef4ff9..669dfadfd8 100644 --- a/internal/util/unregisterer.go +++ b/internal/util/unregisterer.go @@ -23,6 +23,38 @@ type unregisterer struct { cs map[prometheus.Collector]struct{} } +// An "unchecked collector" is a collector which returns an empty description. +// It is described in the Prometheus documentation, here: +// https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics +// +// > Alternatively, you could return no Desc at all, which will mark the Collector “unchecked”. +// > No checks are performed at registration time, but metric consistency will still be ensured at scrape time, +// > i.e. any inconsistencies will lead to scrape errors. Thus, with unchecked Collectors, +// > the responsibility to not collect metrics that lead to inconsistencies in the total scrape result +// > lies with the implementer of the Collector. While this is not a desirable state, it is sometimes necessary. +// > The typical use case is a situation where the exact metrics to be returned by a Collector cannot be predicted +// > at registration time, but the implementer has sufficient knowledge of the whole system to guarantee metric consistency. +// +// Unchecked collectors are used in the Loki "metrics" stage of the Loki "process" component. +// +// The isUncheckedCollector function is similar to how Prometheus' Go client extracts the metric description: +// https://github.com/prometheus/client_golang/blob/45f1e72421d9d11af6be784ad60b7389f7543e70/prometheus/registry.go#L372-L381 +func isUncheckedCollector(c prometheus.Collector) bool { + descChan := make(chan *prometheus.Desc, 10) + + go func() { + c.Describe(descChan) + close(descChan) + }() + + i := 0 + for range descChan { + i += 1 + } + + return i == 0 +} + // Register implements prometheus.Registerer. func (u *unregisterer) Register(c prometheus.Collector) error { if u.wrap == nil { @@ -33,6 +65,11 @@ func (u *unregisterer) Register(c prometheus.Collector) error { if err != nil { return err } + + if isUncheckedCollector(c) { + return nil + } + u.cs[c] = struct{}{} return nil } @@ -48,6 +85,10 @@ func (u *unregisterer) MustRegister(cs ...prometheus.Collector) { // Unregister implements prometheus.Registerer. func (u *unregisterer) Unregister(c prometheus.Collector) bool { + if isUncheckedCollector(c) { + return true + } + if u.wrap != nil && u.wrap.Unregister(c) { delete(u.cs, c) return true diff --git a/internal/util/unregisterer_test.go b/internal/util/unregisterer_test.go new file mode 100644 index 0000000000..e35cfbe6cb --- /dev/null +++ b/internal/util/unregisterer_test.go @@ -0,0 +1,35 @@ +package util + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_UnregisterTwice_NormalCollector(t *testing.T) { + u := WrapWithUnregisterer(prometheus.NewRegistry()) + c := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_metric", + Help: "Test metric.", + }) + u.Register(c) + require.True(t, u.Unregister(c)) + require.False(t, u.Unregister(c)) +} + +type uncheckedCollector struct{} + +func (uncheckedCollector) Describe(chan<- *prometheus.Desc) {} + +func (uncheckedCollector) Collect(chan<- prometheus.Metric) {} + +var _ prometheus.Collector = uncheckedCollector{} + +func Test_UnregisterTwice_UncheckedCollector(t *testing.T) { + u := WrapWithUnregisterer(prometheus.NewRegistry()) + c := uncheckedCollector{} + u.Register(c) + require.True(t, u.Unregister(c)) + require.True(t, u.Unregister(c)) +}