diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index d76b6fe83d0..d2e9c7819f3 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -682,7 +682,6 @@ block_builder: instance_id: hostname assigned_partitions: {} consume_cycle_duration: 5m0s - lookback_on_no_commit: 12h0m0s block: max_block_bytes: 20971520 bloom_filter_false_positive: 0.01 diff --git a/integration/e2e/ingest/config-kafka.yaml b/integration/e2e/ingest/config-kafka.yaml index ec99ae09978..1d9fe35b170 100644 --- a/integration/e2e/ingest/config-kafka.yaml +++ b/integration/e2e/ingest/config-kafka.yaml @@ -64,7 +64,6 @@ overrides: path: /var/tempo/overrides block_builder: - lookback_on_no_commit: 15m consume_cycle_duration: 1m ingest: diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 34708518165..629f42de763 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "sync" + "strconv" "time" "github.com/go-kit/log" @@ -27,6 +27,7 @@ import ( const ( blockBuilderServiceName = "block-builder" ConsumerGroup = "block-builder" + pollTimeout = 2 * time.Second ) var ( @@ -36,6 +37,12 @@ var ( Name: "partition_lag", Help: "Lag of a partition.", }, []string{"partition"}) + metricPartitionLagSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "partition_lag_seconds", + Help: "Lag of a partition in seconds.", + }, []string{"partition"}) metricConsumeCycleDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "tempo", Subsystem: "block_builder", @@ -61,12 +68,11 @@ var ( type BlockBuilder struct { services.Service - logger log.Logger - cfg Config - assignedPartitions []int32 // TODO - Necessary? - fallbackOffsetMillis int64 + logger log.Logger + cfg Config kafkaClient *kgo.Client + kadm *kadm.Client decoder *ingest.Decoder partitionRing ring.PartitionRingReader @@ -112,9 +118,6 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) { return fmt.Errorf("failed to create WAL: %w", err) } - // Fallback offset is a millisecond timestamp used to look up a real offset if partition doesn't have a commit. - b.fallbackOffsetMillis = time.Now().Add(-b.cfg.LookbackOnNoCommit).UnixMilli() - b.kafkaClient, err = ingest.NewReaderClient( b.cfg.IngestStorageConfig.Kafka, ingest.NewReaderClientMetrics(blockBuilderServiceName, prometheus.DefaultRegisterer), @@ -142,200 +145,129 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) { return fmt.Errorf("failed to ping kafka: %w", err) } + b.kadm = kadm.NewClient(b.kafkaClient) + + go b.metricLag(ctx) + return nil } func (b *BlockBuilder) running(ctx context.Context) error { - // Initial polling and delay - cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration) - waitTime := 2 * time.Second + // Initial delay + waitTime := 0 * time.Second for { select { case <-time.After(waitTime): - err := b.consumeCycle(ctx, cycleEndTime) + err := b.consume(ctx) if err != nil { - b.logger.Log("msg", "consumeCycle failed", "err", err) - - // Don't progress cycle forward, keep trying at this timestamp - continue + level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err) } - cycleEndTime, waitTime = nextCycleEnd(cycleEndTime, b.cfg.ConsumeCycleDuration) + // Real delay on subsequent + waitTime = b.cfg.ConsumeCycleDuration case <-ctx.Done(): return nil } } } -func (b *BlockBuilder) stopping(err error) error { - if b.kafkaClient != nil { - b.kafkaClient.Close() - } - return err -} - -func (b *BlockBuilder) consumeCycle(ctx context.Context, cycleEndTime time.Time) error { - level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", cycleEndTime) - defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) - - groupLag, err := getGroupLag( - ctx, - kadm.NewClient(b.kafkaClient), - b.cfg.IngestStorageConfig.Kafka.Topic, - b.cfg.IngestStorageConfig.Kafka.ConsumerGroup, - b.fallbackOffsetMillis, +func (b *BlockBuilder) consume(ctx context.Context) error { + var ( + end = time.Now() + partitions = b.getAssignedActivePartitions() ) - if err != nil { - return fmt.Errorf("failed to get group lag: %w", err) - } - - assignedPartitions := b.getAssignedActivePartitions() - - for _, partition := range assignedPartitions { - if ctx.Err() != nil { - return ctx.Err() - } - - partitionLag, ok := groupLag.Lookup(b.cfg.IngestStorageConfig.Kafka.Topic, partition) - if !ok { - return fmt.Errorf("lag for partition %d not found", partition) - } - - level.Debug(b.logger).Log( - "msg", "partition lag", - "partition", partition, - "lag", fmt.Sprintf("%+v", partitionLag), - ) - metricPartitionLag.WithLabelValues(fmt.Sprintf("%d", partition)).Set(float64(partitionLag.Lag)) + level.Info(b.logger).Log("msg", "starting consume cycle", "cycle_end", end, "active_partitions", partitions) + defer func(t time.Time) { metricConsumeCycleDuration.Observe(time.Since(t).Seconds()) }(time.Now()) - if partitionLag.Lag <= 0 { - level.Info(b.logger).Log( - "msg", "nothing to consume in partition", - "partition", partition, - "commit_offset", partitionLag.Commit.At, - "start_offset", partitionLag.Start.Offset, - "end_offset", partitionLag.End.Offset, - "lag", partitionLag.Lag, - ) - continue - } + for _, partition := range partitions { + // Consume partition while data remains. + // TODO - round-robin one consumption per partition instead to equalize catch-up time. + for { + more, err := b.consumePartition(ctx, partition, end) + if err != nil { + return err + } - if err = b.consumePartition(ctx, partition, partitionLag, cycleEndTime); err != nil { - _ = level.Error(b.logger).Log("msg", "failed to consume partition", "partition", partition, "err", err) + if !more { + break + } } } + return nil } -func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, partitionLag kadm.GroupMemberLag, cycleEndTime time.Time) error { - level.Info(b.logger).Log( - "msg", "consuming partition", - "partition", partition, - "cycle_end", cycleEndTime, - ) +func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, overallEnd time.Time) (more bool, err error) { + defer func(t time.Time) { + metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(partition))).Observe(time.Since(t).Seconds()) + }(time.Now()) - sectionEndTime := cycleEndTime + var ( + dur = b.cfg.ConsumeCycleDuration + topic = b.cfg.IngestStorageConfig.Kafka.Topic + group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup + startOffset kgo.Offset + init bool + writer *writer + lastRec *kgo.Record + end time.Time + ) - lastCommitTs, err := unmarshallCommitMeta(partitionLag.Commit.Metadata) + commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic) if err != nil { - return fmt.Errorf("failed to unmarshal commit metadata: %w", err) - } - if lastCommitTs == 0 { - lastCommitTs = b.fallbackOffsetMillis // No commit yet, use fallback offset. - } - commitRecTs := time.UnixMilli(lastCommitTs) - - // We need to align the commit record timestamp to the section end time so we don't consume the same section again. - commitSectionEndTime := alignToSectionEndTime(commitRecTs, b.cfg.ConsumeCycleDuration) - if sectionEndTime.Sub(commitSectionEndTime) > time.Duration(1.5*float64(b.cfg.ConsumeCycleDuration)) { - // We're lagging behind or there is no commit, we need to consume in smaller sections. - // We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEndTime, - // i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEndTime) - // where T is the CommitRecordTimestamp, the timestamp of the record, whose offset we committed previously. - sectionEndTime, _ = nextCycleEnd(commitSectionEndTime, b.cfg.ConsumeCycleDuration) - - level.Debug(b.logger).Log( - "msg", "lagging behind, consuming in sections", - "partition", partition, - "section_end", sectionEndTime, - "commit_rec_ts", commitRecTs, - "commit_section_end", commitSectionEndTime, - "cycle_end", cycleEndTime, - ) + return false, err } - // Continue consuming in sections until we're caught up. - for !sectionEndTime.After(cycleEndTime) { - newCommitAt, err := b.consumePartitionSection(ctx, partition, sectionEndTime, partitionLag) - if err != nil { - return fmt.Errorf("failed to consume partition section: %w", err) - } - sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeCycleDuration) - if newCommitAt > partitionLag.Commit.At { - // We've committed a new offset, so we need to update the lag. - partitionLag.Commit.At = newCommitAt - partitionLag.Lag = partitionLag.End.Offset - newCommitAt - } + lastCommit, ok := commits.Lookup(topic, partition) + if ok && lastCommit.At >= 0 { + startOffset = startOffset.At(lastCommit.At) + } else { + startOffset = kgo.NewOffset().AtStart() } - return nil -} -func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition int32, sectionEndTime time.Time, lag kadm.GroupMemberLag) (int64, error) { level.Info(b.logger).Log( - "msg", "consuming partition section", + "msg", "consuming partition", "partition", partition, - "section_end", sectionEndTime, - "commit_offset", lag.Commit.At, - "start_offset", lag.Start.Offset, - "end_offset", lag.End.Offset, - "lag", lag.Lag, + "commit_offset", lastCommit.At, + "start_offset", startOffset, ) - defer func(t time.Time) { - metricProcessPartitionSectionDuration.WithLabelValues(fmt.Sprintf("%d", partition)).Observe(time.Since(t).Seconds()) - }(time.Now()) - - // TODO - Review what endTimestamp is used here - writer := newPartitionSectionWriter(b.logger, uint64(partition), uint64(sectionEndTime.UnixMilli()), b.cfg.BlockConfig, b.overrides, b.wal, b.enc) - // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. // In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes // from one partition at a time. I.e. when this partition is consumed, we start consuming the next one. b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{ - b.cfg.IngestStorageConfig.Kafka.Topic: { - partition: kgo.NewOffset().At(lag.Commit.At), + topic: { + partition: startOffset, }, }) - defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{b.cfg.IngestStorageConfig.Kafka.Topic: {partition}}) + defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {partition}}) - var ( - firstRec *kgo.Record - lastRec *kgo.Record - ) +outer: + for { + fetches := func() kgo.Fetches { + ctx2, cancel := context.WithTimeout(ctx, pollTimeout) + defer cancel() + return b.kafkaClient.PollFetches(ctx2) + }() + err = fetches.Err() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + // No more data + break + } + metricFetchErrors.WithLabelValues(strconv.Itoa(int(partition))).Inc() + return false, err + } -consumerLoop: - for recOffset := int64(-1); recOffset < lag.End.Offset-1; { - if err := context.Cause(ctx); err != nil { - return lag.Commit.At, err + if fetches.Empty() { + break } - // PollFetches can return a non-failed fetch with zero records. In such a case, with only the fetches at hands, - // we cannot tell if the consumer has already reached the latest end of the partition, i.e. no more records to consume, - // or there is more data in the backlog, and we must retry the poll. That's why the consumer loop above has to guard - // the iterations against the cycleEndOffset, so it retried the polling up until the expected end of the partition is reached. - fetches := b.kafkaClient.PollFetches(ctx) - fetches.EachError(func(_ string, _ int32, err error) { - if !errors.Is(err, context.Canceled) { - level.Error(b.logger).Log("msg", "failed to fetch records", "err", err) - metricFetchErrors.WithLabelValues(fmt.Sprintf("%d", partition)).Inc() - } - }) + for iter := fetches.RecordIter(); !iter.Done(); { + rec := iter.Next() - for recIter := fetches.RecordIter(); !recIter.Done(); { - rec := recIter.Next() - recOffset = rec.Offset level.Debug(b.logger).Log( "msg", "processing record", "partition", rec.Partition, @@ -343,64 +275,99 @@ consumerLoop: "timestamp", rec.Timestamp, ) - if firstRec == nil { - firstRec = rec + // Initialize on first record + if !init { + end = rec.Timestamp.Add(dur) // When block will be cut + metricPartitionLagSeconds.WithLabelValues(strconv.Itoa(int(partition))).Set(time.Since(rec.Timestamp).Seconds()) + writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), b.cfg.BlockConfig, b.overrides, b.wal, b.enc) + init = true + } + + if rec.Timestamp.After(end) { + // Cut this block but continue only if we have at least another full cycle + if overallEnd.Sub(rec.Timestamp) >= dur { + more = true + } + break outer } - // Stop consuming after we reached the sectionEndTime marker. - // NOTE: the timestamp of the record is when the record was produced relative to distributor's time. - if rec.Timestamp.After(sectionEndTime) { - break consumerLoop + if rec.Timestamp.After(overallEnd) { + break outer } - err := b.pushTraces(rec.Key, rec.Value, writer) // TODO - Batch pushes by tenant + err := b.pushTraces(rec.Key, rec.Value, writer) if err != nil { - // All "non-terminal" errors are handled by the TSDBBuilder. - return lag.Commit.At, fmt.Errorf("process record in partition %d at offset %d: %w", rec.Partition, rec.Offset, err) + return false, err } + lastRec = rec } } - // Nothing was consumed from Kafka at all. - if firstRec == nil { - level.Info(b.logger).Log("msg", "no records were consumed") - return lag.Commit.At, nil - } - - // No records were processed for this cycle. if lastRec == nil { - level.Info(b.logger).Log("msg", "nothing to commit due to first record has a timestamp greater than this section end", "first_rec_offset", firstRec.Offset, "first_rec_ts", firstRec.Timestamp) - return lag.Commit.At, nil + // Received no data + level.Info(b.logger).Log( + "msg", "no data", + "partition", partition, + ) + return false, nil } - if err := writer.flush(ctx, b.writer); err != nil { - return lag.Commit.At, fmt.Errorf("failed to flush partition to object storage: %w", err) + err = writer.flush(ctx, b.writer) + if err != nil { + return false, err } - commit := kadm.Offset{ - Topic: lastRec.Topic, - Partition: lastRec.Partition, - At: lastRec.Offset + 1, // offset+1 means everything up to (including) the offset was processed - LeaderEpoch: lastRec.LeaderEpoch, - Metadata: marshallCommitMeta(lastRec.Timestamp.UnixMilli()), + // TODO - Retry commit + resp, err := b.kadm.CommitOffsets(ctx, group, kadm.OffsetsFromRecords(*lastRec)) + if err != nil { + return false, err + } + if err := resp.Error(); err != nil { + return false, err } - return commit.At, b.commitState(ctx, commit) + + level.Info(b.logger).Log( + "msg", "successfully committed offset to kafka", + "partition", partition, + "last_record", lastRec.Offset, + ) + + return more, nil } -func (b *BlockBuilder) commitState(ctx context.Context, commit kadm.Offset) error { - offsets := make(kadm.Offsets) - offsets.Add(commit) +func (b *BlockBuilder) metricLag(ctx context.Context) { + var ( + waitTime = time.Second * 15 + topic = b.cfg.IngestStorageConfig.Kafka.Topic + group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup + ) - // TODO - Commit with backoff - adm := kadm.NewClient(b.kafkaClient) - err := adm.CommitAllOffsets(ctx, b.cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets) - if err != nil { - return fmt.Errorf("failed to commit offsets: %w", err) + for { + select { + case <-time.After(waitTime): + lag, err := getGroupLag(ctx, b.kadm, topic, group) + if err != nil { + level.Error(b.logger).Log("msg", "metric lag failed:", "err", err) + continue + } + for _, p := range b.getAssignedActivePartitions() { + l, ok := lag.Lookup(topic, p) + if ok { + metricPartitionLag.WithLabelValues(strconv.Itoa(int(p))).Set(float64(l.Lag)) + } + } + case <-ctx.Done(): + return + } } - level.Info(b.logger).Log("msg", "successfully committed offset to kafka", "offset", commit.At) +} - return nil +func (b *BlockBuilder) stopping(err error) error { + if b.kafkaClient != nil { + b.kafkaClient.Close() + } + return err } func (b *BlockBuilder) pushTraces(tenantBytes, reqBytes []byte, p partitionSectionWriter) error { @@ -433,7 +400,7 @@ func (b *BlockBuilder) getAssignedActivePartitions() []int32 { // the lag is the difference between the last produced offset and the offset committed in the consumer group. // Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is // running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis. -func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) { +func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) { offsets, err := admClient.FetchOffsets(ctx, group) if err != nil { if !errors.Is(err, kerr.GroupIDNotFound) { @@ -453,40 +420,6 @@ func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin return nil, err } - resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) { - if fallbackOffsetMillis < 0 { - return nil, fmt.Errorf("cannot resolve fallback offset for value %v", fallbackOffsetMillis) - } - return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic) - }) - // If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at. - for topic, pt := range startOffsets.Offsets() { - for partition, startOffset := range pt { - if _, ok := offsets.Lookup(topic, partition); ok { - continue - } - fallbackOffsets, err := resolveFallbackOffsets() - if err != nil { - return nil, fmt.Errorf("resolve fallback offsets: %w", err) - } - o, ok := fallbackOffsets.Lookup(topic, partition) - if !ok { - return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic) - } - if o.Offset < startOffset.At { - // Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition). - // This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream. - continue - } - offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{ - Topic: o.Topic, - Partition: o.Partition, - At: o.Offset, - LeaderEpoch: o.LeaderEpoch, - }}) - } - } - descrGroup := kadm.DescribedGroup{ // "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder, // because we don't use group consumption. @@ -494,52 +427,3 @@ func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group strin } return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil } - -func (b *BlockBuilder) onRevoked(_ context.Context, _ *kgo.Client, revoked map[string][]int32) { - for topic, partitions := range revoked { - partitionsStr := fmt.Sprintf("%v", partitions) - level.Info(b.logger).Log("msg", "partitions revoked", "topic", topic, "partitions", partitionsStr) - } - b.assignedPartitions = revoked[b.cfg.IngestStorageConfig.Kafka.Topic] -} - -func (b *BlockBuilder) onAssigned(_ context.Context, _ *kgo.Client, assigned map[string][]int32) { - // TODO - All partitions are assigned, not just the ones in use by the partition ring (ingesters). - for topic, partitions := range assigned { - var partitionsStr string - for _, partition := range partitions { - partitionsStr += fmt.Sprintf("%d, ", partition) - } - level.Info(b.logger).Log("msg", "partitions assigned", "topic", topic, "partitions", partitionsStr) - } - b.assignedPartitions = assigned[b.cfg.IngestStorageConfig.Kafka.Topic] -} - -// cycleEndAtStartup is the timestamp of the cycle end at startup. -// It's the nearest interval boundary in the past. -func cycleEndAtStartup(t time.Time, interval time.Duration) time.Time { - cycleEnd := t.Truncate(interval) - if cycleEnd.After(t) { - cycleEnd = cycleEnd.Add(-interval) - } - return cycleEnd -} - -// nextCycleEnd returns the timestamp of the next cycleEnd relative to the time t. -// One cycle is a duration of one interval. -func nextCycleEnd(t time.Time, interval time.Duration) (time.Time, time.Duration) { - cycleEnd := t.Truncate(interval).Add(interval) - waitTime := cycleEnd.Sub(t) - for waitTime > interval { - // Example - with interval=1h and buffer=15m: - // - at t=14:12, next cycle starts at 14:15 (startup cycle ended at 13:15) - // - at t=14:17, next cycle starts at 15:15 (startup cycle ended at 14:15) - cycleEnd = cycleEnd.Add(-interval) - waitTime -= interval - } - return cycleEnd, waitTime -} - -func alignToSectionEndTime(t time.Time, interval time.Duration) time.Time { - return t.Truncate(interval).Add(interval) -} diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index 6e7d2e5cdcb..0e99eada347 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -302,82 +302,6 @@ func TestBlockbuilder_committingFails(t *testing.T) { requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) } -func TestCycleEndAtStartup(t *testing.T) { - now := time.Date(1995, 8, 26, 0, 0, 0, 0, time.UTC) - - for _, tc := range []struct { - name string - now, cycleEnd, cycleEndAtStartup time.Time - waitDur, interval time.Duration - }{ - { - name: "now doesn't need to be truncated", - now: now, - cycleEnd: now.Add(5 * time.Minute), - cycleEndAtStartup: now, - waitDur: 5 * time.Minute, - interval: 5 * time.Minute, - }, - { - name: "now needs to be truncated", - now: now.Add(2 * time.Minute), - cycleEnd: now.Truncate(5 * time.Minute).Add(5 * time.Minute), - cycleEndAtStartup: now.Truncate(5 * time.Minute), - waitDur: 3 * time.Minute, - interval: 5 * time.Minute, - }, - } { - t.Run(tc.name, func(t *testing.T) { - cycleEndAtStartup := cycleEndAtStartup(tc.now, tc.interval) - require.Equal(t, tc.cycleEndAtStartup, cycleEndAtStartup) - - cycleEnd, waitDur := nextCycleEnd(tc.now, tc.interval) - require.Equal(t, tc.cycleEnd, cycleEnd) - require.Equal(t, tc.waitDur, waitDur) - }) - } -} - -func TestNextCycleEnd(t *testing.T) { - tests := []struct { - name string - t time.Time - interval time.Duration - expectedTime time.Time - expectedWait time.Duration - }{ - { - name: "ExactInterval", - t: time.Date(2023, 10, 1, 12, 0, 0, 0, time.UTC), - interval: time.Hour, - expectedTime: time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC), - expectedWait: time.Hour, - }, - { - name: "PastInterval", - t: time.Date(2023, 10, 1, 12, 30, 0, 0, time.UTC), - interval: time.Hour, - expectedTime: time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC), - expectedWait: 30 * time.Minute, - }, - { - name: "FutureInterval", - t: time.Date(2023, 10, 1, 12, 0, 0, 1, time.UTC), - interval: time.Hour, - expectedTime: time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC), - expectedWait: 59*time.Minute + 59*time.Second + 999999999*time.Nanosecond, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - resultTime, resultWait := nextCycleEnd(tc.t, tc.interval) - require.Equal(t, tc.expectedTime, resultTime) - require.Equal(t, tc.expectedWait, resultWait) - }) - } -} - func blockbuilderConfig(t *testing.T, address string) Config { cfg := Config{} flagext.DefaultValues(&cfg) @@ -390,7 +314,6 @@ func blockbuilderConfig(t *testing.T, address string) Config { cfg.IngestStorageConfig.Kafka.ConsumerGroup = testConsumerGroup cfg.AssignedPartitions = map[string][]int32{cfg.InstanceID: {0}} - cfg.LookbackOnNoCommit = 15 * time.Second cfg.ConsumeCycleDuration = 5 * time.Second cfg.WAL.Filepath = t.TempDir() @@ -517,7 +440,7 @@ func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Recor return records } -// nolint: revive +// nolint: revive,unparam func sendTracesFor(t *testing.T, ctx context.Context, client *kgo.Client, dur, interval time.Duration) []*kgo.Record { ticker := time.NewTicker(interval) defer ticker.Stop() diff --git a/modules/blockbuilder/commit_meta.go b/modules/blockbuilder/commit_meta.go deleted file mode 100644 index 142f58945e7..00000000000 --- a/modules/blockbuilder/commit_meta.go +++ /dev/null @@ -1,38 +0,0 @@ -package blockbuilder - -import "fmt" - -const ( - kafkaCommitMetaV1 = 1 -) - -// marshallCommitMeta generates the commit metadata string. -// commitRecTs: timestamp of the record which was committed (and not the commit time). -func marshallCommitMeta(commitRecTs int64) string { - return fmt.Sprintf("%d,%d", kafkaCommitMetaV1, commitRecTs) -} - -// unmarshallCommitMeta parses the commit metadata string. -// commitRecTs: timestamp of the record which was committed (and not the commit time). -func unmarshallCommitMeta(s string) (commitRecTs int64, err error) { - if s == "" { - return - } - var ( - version int - metaStr string - ) - _, err = fmt.Sscanf(s, "%d,%s", &version, &metaStr) - if err != nil { - return 0, fmt.Errorf("invalid commit metadata format: parse meta version: %w", err) - } - - if version != kafkaCommitMetaV1 { - return 0, fmt.Errorf("unsupported commit meta version %d", version) - } - _, err = fmt.Sscanf(metaStr, "%d", &commitRecTs) - if err != nil { - return 0, fmt.Errorf("invalid commit metadata format: %w", err) - } - return -} diff --git a/modules/blockbuilder/commit_meta_test.go b/modules/blockbuilder/commit_meta_test.go deleted file mode 100644 index 0317d8eec70..00000000000 --- a/modules/blockbuilder/commit_meta_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package blockbuilder - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMarshallCommitMeta(t *testing.T) { - tests := []struct { - name string - commitRecTs int64 - expectedMeta string - }{ - {"ValidTimestamp", 1627846261, "1,1627846261"}, - {"ZeroTimestamp", 0, "1,0"}, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - meta := marshallCommitMeta(tc.commitRecTs) - assert.Equal(t, tc.expectedMeta, meta, "expected: %s, got: %s", tc.expectedMeta, meta) - }) - } -} - -func TestUnmarshallCommitMeta(t *testing.T) { - tests := []struct { - name string - meta string - expectedTs int64 - expectedError bool - }{ - {"ValidMeta", "1,1627846261", 1627846261, false}, - {"InvalidMetaFormat", "1,invalid", 0, true}, - {"UnsupportedVersion", "2,1627846261", 0, true}, - {"EmptyMeta", "", 0, false}, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ts, err := unmarshallCommitMeta(tc.meta) - assert.Equal(t, tc.expectedError, err != nil, "expected error: %v, got: %v", tc.expectedError, err) - assert.Equal(t, tc.expectedTs, ts, "expected: %d, got: %d", tc.expectedTs, ts) - }) - } -} diff --git a/modules/blockbuilder/config.go b/modules/blockbuilder/config.go index bf3060bec61..0fa9d6209d4 100644 --- a/modules/blockbuilder/config.go +++ b/modules/blockbuilder/config.go @@ -37,8 +37,6 @@ type Config struct { AssignedPartitions map[string][]int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."` ConsumeCycleDuration time.Duration `yaml:"consume_cycle_duration" doc:"Interval between consumption cycles."` - LookbackOnNoCommit time.Duration `yaml:"lookback_on_no_commit" category:"advanced"` - BlockConfig BlockConfig `yaml:"block" doc:"Configuration for the block builder."` WAL wal.Config `yaml:"wal" doc:"Configuration for the write ahead log."` @@ -76,8 +74,6 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { f.StringVar(&c.InstanceID, "block-builder.instance-id", hostname, "Instance id.") f.Var(newPartitionAssignmentVar(&c.AssignedPartitions), prefix+".assigned-partitions", "List of partitions assigned to this block builder.") f.DurationVar(&c.ConsumeCycleDuration, prefix+".consume-cycle-duration", 5*time.Minute, "Interval between consumption cycles.") - // TODO - Review default - f.DurationVar(&c.LookbackOnNoCommit, prefix+".lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.") c.BlockConfig.RegisterFlagsAndApplyDefaults(prefix+".block", f) c.WAL.RegisterFlags(f)