Skip to content

Commit

Permalink
[Rhythm] Move group partition lag metric to ingest package, export fr…
Browse files Browse the repository at this point in the history
…om generators too (#4571)

* Move group partition lag metric to ingest package, export from generators too

* Cleanup

* changelog

* Remove unnecessary go

Co-authored-by: Mario <[email protected]>

---------

Co-authored-by: Mario <[email protected]>
  • Loading branch information
mdisibio and mapno authored Jan 22, 2025
1 parent 033c536 commit e680d6e
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio)
* [ENHANCEMENT] Export new `tempo_ingest_group_partition_lag` metric from block-builders and metrics-generators [#4571](https://github.com/grafana/tempo/pull/4571) (@mdisibio)
* [ENHANCEMENT] Use distroless base container images for improved security [#4556](https://github.com/grafana/tempo/pull/4556) (@carles-grafana)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4576) (@joe-elliott)
Correctly copy exemplars for metrics like `| rate()` when gRPC streaming.
Expand Down
77 changes: 6 additions & 71 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
)

Expand Down Expand Up @@ -51,12 +50,6 @@ var (
Name: "fetch_records_total",
Help: "Total number of records fetched from Kafka",
}, []string{"partition"})
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "partition_lag",
Help: "Lag of a partition.",
}, []string{"partition"})
metricPartitionLagSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Expand Down Expand Up @@ -167,7 +160,12 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {

b.kadm = kadm.NewClient(b.kafkaClient)

go b.metricLag(ctx)
ingest.ExportPartitionLagMetrics(
ctx,
b.kadm,
b.logger,
b.cfg.IngestStorageConfig,
b.getAssignedActivePartitions)

return nil
}
Expand Down Expand Up @@ -387,33 +385,6 @@ outer:
return more, nil
}

func (b *BlockBuilder) metricLag(ctx context.Context) {
var (
waitTime = time.Second * 15
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
)

for {
select {
case <-time.After(waitTime):
lag, err := getGroupLag(ctx, b.kadm, topic, group)
if err != nil {
level.Error(b.logger).Log("msg", "metric lag failed:", "err", err)
continue
}
for _, p := range b.getAssignedActivePartitions() {
l, ok := lag.Lookup(topic, p)
if ok {
metricPartitionLag.WithLabelValues(strconv.Itoa(int(p))).Set(float64(l.Lag))
}
}
case <-ctx.Done():
return
}
}
}

func (b *BlockBuilder) stopping(err error) error {
if b.kafkaClient != nil {
b.kafkaClient.Close()
Expand Down Expand Up @@ -442,39 +413,3 @@ func (b *BlockBuilder) getAssignedActivePartitions() []int32 {
}
return assignedActivePartitions
}

// getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
//
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
// If the block builder committed an offset for a given partition to the consumer group at least once, then
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
return nil, fmt.Errorf("fetch offsets: %w", err)
}
}
if err := offsets.Error(); err != nil {
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
}

startOffsets, err := admClient.ListStartOffsets(ctx, topic)
if err != nil {
return nil, err
}
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
if err != nil {
return nil, err
}

descrGroup := kadm.DescribedGroup{
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
// because we don't use group consumption.
State: "Empty",
}
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
}
2 changes: 1 addition & 1 deletion modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Generator struct {
logger log.Logger

kafkaWG sync.WaitGroup
kafkaStop chan struct{}
kafkaStop func()
kafkaClient *kgo.Client
kafkaAdm *kadm.Client
partitionRing ring.PartitionRingReader
Expand Down
15 changes: 8 additions & 7 deletions modules/generator/generator_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@ import (
)

func (g *Generator) startKafka() {
g.kafkaStop = make(chan struct{})
// Create context that will be used to stop the goroutines.
var ctx context.Context
ctx, g.kafkaStop = context.WithCancel(context.Background())

g.kafkaWG.Add(1)
go g.listenKafka()
go g.listenKafka(ctx)
ingest.ExportPartitionLagMetrics(ctx, g.kafkaAdm, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions)
}

func (g *Generator) stopKafka() {
close(g.kafkaStop)
g.kafkaStop()
g.kafkaWG.Wait()
}

func (g *Generator) listenKafka() {
func (g *Generator) listenKafka(ctx context.Context) {
defer g.kafkaWG.Done()

level.Info(g.logger).Log("msg", "generator now listening to kafka")
ctx := context.Background()
for {
select {
case <-time.After(2 * time.Second):
Expand All @@ -44,8 +47,6 @@ func (g *Generator) listenKafka() {
level.Error(g.logger).Log("msg", "readKafka failed", "err", err)
continue
}
case <-g.kafkaStop:
return
case <-ctx.Done():
return
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/ingest/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package ingest

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
)

var metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "ingest",
Name: "group_partition_lag",
Help: "Lag of a partition.",
}, []string{"group", "partition"})

// TODO - Simplify signature to create client instead?
func ExportPartitionLagMetrics(ctx context.Context, admClient *kadm.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32) {
go func() {
var (
waitTime = time.Second * 15
topic = cfg.Kafka.Topic
group = cfg.Kafka.ConsumerGroup
)

for {
select {
case <-time.After(waitTime):
lag, err := getGroupLag(ctx, admClient, topic, group)
if err != nil {
level.Error(log).Log("msg", "metric lag failed:", "err", err)
continue
}
for _, p := range getAssignedActivePartitions() {
l, ok := lag.Lookup(topic, p)
if ok {
metricPartitionLag.WithLabelValues(group, strconv.Itoa(int(p))).Set(float64(l.Lag))
}
}
case <-ctx.Done():
return
}
}
}()
}

// getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
//
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
// If the block builder committed an offset for a given partition to the consumer group at least once, then
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
return nil, fmt.Errorf("fetch offsets: %w", err)
}
}
if err := offsets.Error(); err != nil {
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
}

startOffsets, err := admClient.ListStartOffsets(ctx, topic)
if err != nil {
return nil, err
}
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
if err != nil {
return nil, err
}

descrGroup := kadm.DescribedGroup{
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
// because we don't use group consumption.
State: "Empty",
}
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
}

0 comments on commit e680d6e

Please sign in to comment.