Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rhythm] Block-builder consumption loop #4480

Open
wants to merge 17 commits into
base: main-rhythm
Choose a base branch
from
170 changes: 167 additions & 3 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -27,14 +28,15 @@ import (
const (
blockBuilderServiceName = "block-builder"
ConsumerGroup = "block-builder"
pollTimeout = 2 * time.Second
)

var (
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "partition_lag",
Help: "Lag of a partition.",
Name: "partition_lag_s",
mapno marked this conversation as resolved.
Show resolved Hide resolved
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
Help: "Lag of a partition in seconds.",
}, []string{"partition"})
metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "tempo",
Expand Down Expand Up @@ -67,6 +69,7 @@ type BlockBuilder struct {
fallbackOffsetMillis int64
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

kafkaClient *kgo.Client
kadm *kadm.Client
decoder *ingest.Decoder
partitionRing ring.PartitionRingReader

Expand Down Expand Up @@ -142,10 +145,12 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {
return fmt.Errorf("failed to ping kafka: %w", err)
}

b.kadm = kadm.NewClient(b.kafkaClient)

return nil
}

func (b *BlockBuilder) running(ctx context.Context) error {
func (b *BlockBuilder) runningOld(ctx context.Context) error {
// Initial polling and delay
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration)
waitTime := 2 * time.Second
Expand All @@ -167,6 +172,165 @@ func (b *BlockBuilder) running(ctx context.Context) error {
}
}

func (b *BlockBuilder) running(ctx context.Context) error {
// Initial delay
waitTime := 0 * time.Second
for {
select {
case <-time.After(waitTime):
err := b.consume(ctx)
if err != nil {
b.logger.Log("msg", "consumeCycle failed", "err", err)
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
}

// Real delay on subsequent
waitTime = b.cfg.ConsumeCycleDuration
case <-ctx.Done():
return nil
}
}
}

func (b *BlockBuilder) consume(ctx context.Context) error {
var (
end = time.Now()
partitions = b.getAssignedActivePartitions()
)

level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions)
defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now())

for _, partition := range partitions {
// Consume partition while data remains.
// TODO - round-robin one consumption per partition instead to equalize catch-up time.
for {
more, err := b.consumePartition2(ctx, partition, end)
if err != nil {
return err
}

if !more {
break
}
}
}

return nil
}

func (b *BlockBuilder) consumePartition2(ctx context.Context, partition int32, end time.Time) (more bool, err error) {
defer func(t time.Time) {
metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(partition))).Observe(time.Since(t).Seconds())
}(time.Now())

var (
dur = b.cfg.ConsumeCycleDuration
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
startOffset kgo.Offset
writer *writer
lastRec *kgo.Record
begin time.Time
)

commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic)
if err != nil {
return false, err
}

lastCommit, ok := commits.Lookup(topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
lastCommit, ok := commits.Lookup(topic, partition)
lastCommit, exists := commits.Lookup(topic, partition)
if exists && lastCommit.At >= 0 {
startOffset = startOffset.At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
}

https://pkg.go.dev/github.com/twmb/franz-go/pkg/[email protected]#OffsetResponses.Lookup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ok is more idiomatic, and the lib is reading from a map internally anyway.

if ok && lastCommit.At >= 0 {
startOffset = startOffset.At(lastCommit.At)
} else {
startOffset = kgo.NewOffset().AtStart()
}

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
// In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes
// from one partition at a time. I.e. when this partition is consumed, we start consuming the next one.
b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{
topic: {
partition: startOffset,
},
})
defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {partition}})

outer:
for {
fetches := func() kgo.Fetches {
ctx2, cancel := context.WithTimeout(ctx, pollTimeout)
defer cancel()
return b.kafkaClient.PollFetches(ctx2)
}()
err = fetches.Err()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// No more data
more = false
break
}
metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc()
return false, err
}

if fetches.Empty() {
more = false
break
}

for iter := fetches.RecordIter(); !iter.Done(); {
rec := iter.Next()

// Initialize if needed
if writer == nil {

// Determine begin and end time range, which is -/+ cycle duration.
// But don't exceed the given overall end time.
begin = rec.Timestamp.Add(-dur)
if rec.Timestamp.Add(dur).Before(end) {
end = rec.Timestamp.Add(dur)
}
mapno marked this conversation as resolved.
Show resolved Hide resolved

metricPartitionLag.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds())

writer = newPartitionSectionWriter(b.logger, int64(partition), rec.Offset, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
}

if rec.Timestamp.Before(begin) || rec.Timestamp.After(end) {
mapno marked this conversation as resolved.
Show resolved Hide resolved
break outer
}

err := b.pushTraces(rec.Key, rec.Value, writer)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if something is wrong with the WAL? I guess it will enter in a loop

return false, err
}

lastRec = rec
}
}

if lastRec == nil {
// Received no data
return false, nil
}

err = writer.flush(ctx, b.writer)
if err != nil {
return false, err
}

resp, err := b.kadm.CommitOffsets(ctx, group, kadm.OffsetsFromRecords(*lastRec))
if err != nil {
return false, err
}
if err := resp.Error(); err != nil {
return false, err
}

return more, nil
}

func (b *BlockBuilder) stopping(err error) error {
if b.kafkaClient != nil {
b.kafkaClient.Close()
Expand Down
15 changes: 3 additions & 12 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,10 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic")
k, _ := testkafka.CreateCluster(t, 1, "test-topic")
t.Cleanup(k.Close)

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Add(1)
return nil, nil, false
})
address := k.ListenAddrs()[0]

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address)
Expand All @@ -59,14 +55,9 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
sendReq(t, ctx, client)

// Wait for record to be consumed and committed.
require.Eventually(t, func() bool {
return kafkaCommits.Load() > 0
}, time.Minute, time.Second)

// Wait for the block to be flushed.
require.Eventually(t, func() bool {
return len(store.BlockMetas(util.FakeTenantID)) == 1
return len(store.BlockMetas(util.FakeTenantID)) == 1 && store.BlockMetas(util.FakeTenantID)[0].TotalObjects == 1
}, time.Minute, time.Second)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingest/testkafka/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func addSupportForConsumerGroups(t testing.TB, cluster *kfake.Cluster, topicName
// This mimics the real Kafka behaviour.
var partitionsResp []kmsg.OffsetFetchResponseGroupTopicPartition
if partitionID == allPartitions {
for i := int32(1); i < numPartitions+1; i++ {
for i := int32(0); i < numPartitions; i++ {
if committedOffsets[consumerGroup][i] >= 0 {
partitionsResp = append(partitionsResp, kmsg.OffsetFetchResponseGroupTopicPartition{
Partition: i,
Expand Down
Loading