From e680d6ee8d88e2fdb3e49b647eb9759dbb5b1963 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 22 Jan 2025 03:13:49 -0500 Subject: [PATCH] [Rhythm] Move group partition lag metric to ingest package, export from generators too (#4571) * Move group partition lag metric to ingest package, export from generators too * Cleanup * changelog * Remove unnecessary go Co-authored-by: Mario --------- Co-authored-by: Mario --- CHANGELOG.md | 1 + modules/blockbuilder/blockbuilder.go | 77 ++---------------------- modules/generator/generator.go | 2 +- modules/generator/generator_kafka.go | 15 ++--- pkg/ingest/metrics.go | 89 ++++++++++++++++++++++++++++ 5 files changed, 105 insertions(+), 79 deletions(-) create mode 100644 pkg/ingest/metrics.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cf781641350..cf95cf38396 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott) * [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio) * [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio) +* [ENHANCEMENT] Export new `tempo_ingest_group_partition_lag` metric from block-builders and metrics-generators [#4571](https://github.com/grafana/tempo/pull/4571) (@mdisibio) * [ENHANCEMENT] Use distroless base container images for improved security [#4556](https://github.com/grafana/tempo/pull/4556) (@carles-grafana) * [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4576) (@joe-elliott) Correctly copy exemplars for metrics like `| rate()` when gRPC streaming. diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index b5669bdc95d..1e2f9019bf7 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -20,7 +20,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" - "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" ) @@ -51,12 +50,6 @@ var ( Name: "fetch_records_total", Help: "Total number of records fetched from Kafka", }, []string{"partition"}) - metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "tempo", - Subsystem: "block_builder", - Name: "partition_lag", - Help: "Lag of a partition.", - }, []string{"partition"}) metricPartitionLagSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "tempo", Subsystem: "block_builder", @@ -167,7 +160,12 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) { b.kadm = kadm.NewClient(b.kafkaClient) - go b.metricLag(ctx) + ingest.ExportPartitionLagMetrics( + ctx, + b.kadm, + b.logger, + b.cfg.IngestStorageConfig, + b.getAssignedActivePartitions) return nil } @@ -387,33 +385,6 @@ outer: return more, nil } -func (b *BlockBuilder) metricLag(ctx context.Context) { - var ( - waitTime = time.Second * 15 - topic = b.cfg.IngestStorageConfig.Kafka.Topic - group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup - ) - - for { - select { - case <-time.After(waitTime): - lag, err := getGroupLag(ctx, b.kadm, topic, group) - if err != nil { - level.Error(b.logger).Log("msg", "metric lag failed:", "err", err) - continue - } - for _, p := range b.getAssignedActivePartitions() { - l, ok := lag.Lookup(topic, p) - if ok { - metricPartitionLag.WithLabelValues(strconv.Itoa(int(p))).Set(float64(l.Lag)) - } - } - case <-ctx.Done(): - return - } - } -} - func (b *BlockBuilder) stopping(err error) error { if b.kafkaClient != nil { b.kafkaClient.Close() @@ -442,39 +413,3 @@ func (b *BlockBuilder) getAssignedActivePartitions() []int32 { } return assignedActivePartitions } - -// getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. -// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits. -// -// The lag is the difference between the last produced offset (high watermark) and an offset in the "past". -// If the block builder committed an offset for a given partition to the consumer group at least once, then -// the lag is the difference between the last produced offset and the offset committed in the consumer group. -// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is -// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis. -func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) { - offsets, err := admClient.FetchOffsets(ctx, group) - if err != nil { - if !errors.Is(err, kerr.GroupIDNotFound) { - return nil, fmt.Errorf("fetch offsets: %w", err) - } - } - if err := offsets.Error(); err != nil { - return nil, fmt.Errorf("fetch offsets got error in response: %w", err) - } - - startOffsets, err := admClient.ListStartOffsets(ctx, topic) - if err != nil { - return nil, err - } - endOffsets, err := admClient.ListEndOffsets(ctx, topic) - if err != nil { - return nil, err - } - - descrGroup := kadm.DescribedGroup{ - // "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder, - // because we don't use group consumption. - State: "Empty", - } - return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil -} diff --git a/modules/generator/generator.go b/modules/generator/generator.go index afb859c428e..1a3216ea6db 100644 --- a/modules/generator/generator.go +++ b/modules/generator/generator.go @@ -71,7 +71,7 @@ type Generator struct { logger log.Logger kafkaWG sync.WaitGroup - kafkaStop chan struct{} + kafkaStop func() kafkaClient *kgo.Client kafkaAdm *kadm.Client partitionRing ring.PartitionRingReader diff --git a/modules/generator/generator_kafka.go b/modules/generator/generator_kafka.go index 6f80196e977..57d3f36f194 100644 --- a/modules/generator/generator_kafka.go +++ b/modules/generator/generator_kafka.go @@ -17,21 +17,24 @@ import ( ) func (g *Generator) startKafka() { - g.kafkaStop = make(chan struct{}) + // Create context that will be used to stop the goroutines. + var ctx context.Context + ctx, g.kafkaStop = context.WithCancel(context.Background()) + g.kafkaWG.Add(1) - go g.listenKafka() + go g.listenKafka(ctx) + ingest.ExportPartitionLagMetrics(ctx, g.kafkaAdm, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions) } func (g *Generator) stopKafka() { - close(g.kafkaStop) + g.kafkaStop() g.kafkaWG.Wait() } -func (g *Generator) listenKafka() { +func (g *Generator) listenKafka(ctx context.Context) { defer g.kafkaWG.Done() level.Info(g.logger).Log("msg", "generator now listening to kafka") - ctx := context.Background() for { select { case <-time.After(2 * time.Second): @@ -44,8 +47,6 @@ func (g *Generator) listenKafka() { level.Error(g.logger).Log("msg", "readKafka failed", "err", err) continue } - case <-g.kafkaStop: - return case <-ctx.Done(): return } diff --git a/pkg/ingest/metrics.go b/pkg/ingest/metrics.go new file mode 100644 index 00000000000..72ba578cd1e --- /dev/null +++ b/pkg/ingest/metrics.go @@ -0,0 +1,89 @@ +package ingest + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" +) + +var metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "ingest", + Name: "group_partition_lag", + Help: "Lag of a partition.", +}, []string{"group", "partition"}) + +// TODO - Simplify signature to create client instead? +func ExportPartitionLagMetrics(ctx context.Context, admClient *kadm.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32) { + go func() { + var ( + waitTime = time.Second * 15 + topic = cfg.Kafka.Topic + group = cfg.Kafka.ConsumerGroup + ) + + for { + select { + case <-time.After(waitTime): + lag, err := getGroupLag(ctx, admClient, topic, group) + if err != nil { + level.Error(log).Log("msg", "metric lag failed:", "err", err) + continue + } + for _, p := range getAssignedActivePartitions() { + l, ok := lag.Lookup(topic, p) + if ok { + metricPartitionLag.WithLabelValues(group, strconv.Itoa(int(p))).Set(float64(l.Lag)) + } + } + case <-ctx.Done(): + return + } + } + }() +} + +// getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. +// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits. +// +// The lag is the difference between the last produced offset (high watermark) and an offset in the "past". +// If the block builder committed an offset for a given partition to the consumer group at least once, then +// the lag is the difference between the last produced offset and the offset committed in the consumer group. +// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is +// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis. +func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) { + offsets, err := admClient.FetchOffsets(ctx, group) + if err != nil { + if !errors.Is(err, kerr.GroupIDNotFound) { + return nil, fmt.Errorf("fetch offsets: %w", err) + } + } + if err := offsets.Error(); err != nil { + return nil, fmt.Errorf("fetch offsets got error in response: %w", err) + } + + startOffsets, err := admClient.ListStartOffsets(ctx, topic) + if err != nil { + return nil, err + } + endOffsets, err := admClient.ListEndOffsets(ctx, topic) + if err != nil { + return nil, err + } + + descrGroup := kadm.DescribedGroup{ + // "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder, + // because we don't use group consumption. + State: "Empty", + } + return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil +}