diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index bef82771aaf..6e7d2e5cdcb 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -30,7 +30,11 @@ import ( "go.uber.org/atomic" ) -const testTopic = "test-topic" +const ( + testTopic = "test-topic" + testConsumerGroup = "test-consumer-group" + testPartition = int32(0) +) // When the partition starts with no existing commit, // the block-builder looks back to consume all available records from the start and ensures they are committed and flushed into a block. @@ -38,12 +42,11 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) { ctx, cancel := context.WithCancelCause(context.Background()) t.Cleanup(func() { cancel(errors.New("test done")) }) - k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic") - t.Cleanup(k.Close) + k, address := testkafka.CreateCluster(t, 1, testTopic) kafkaCommits := atomic.NewInt32(0) - k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) { - kafkaCommits.Add(1) + k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) { + kafkaCommits.Inc() return nil, nil, false }) @@ -57,7 +60,7 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) { }) client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka) - sendReq(t, ctx, client) + producedRecords := sendReq(t, ctx, client) // Wait for record to be consumed and committed. require.Eventually(t, func() bool { @@ -66,8 +69,11 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) { // Wait for the block to be flushed. require.Eventually(t, func() bool { - return len(store.BlockMetas(util.FakeTenantID)) == 1 + return len(store.BlockMetas(util.FakeTenantID)) == 1 && countFlushedTraces(store) == 1 }, time.Minute, time.Second) + + // Check committed offset + requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) } // Starting with a pre-existing commit, @@ -77,12 +83,11 @@ func TestBlockbuilder_startWithCommit(t *testing.T) { ctx, cancel := context.WithCancelCause(context.Background()) t.Cleanup(func() { cancel(errors.New("test done")) }) - k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic") - t.Cleanup(k.Close) + k, address := testkafka.CreateCluster(t, 1, testTopic) kafkaCommits := atomic.NewInt32(0) - k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) { - kafkaCommits.Add(1) + k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) { + kafkaCommits.Inc() return nil, nil, false }) @@ -121,6 +126,9 @@ func TestBlockbuilder_startWithCommit(t *testing.T) { require.Eventually(t, func() bool { return countFlushedTraces(store) == len(producedRecords)-commitedAt }, time.Minute, time.Second) + + // Check committed offset + requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) } // In case a block flush initially fails, the system retries until it succeeds. @@ -128,19 +136,18 @@ func TestBlockbuilder_flushingFails(t *testing.T) { ctx, cancel := context.WithCancelCause(context.Background()) t.Cleanup(func() { cancel(errors.New("test done")) }) - k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic") - t.Cleanup(k.Close) + k, address := testkafka.CreateCluster(t, 1, "test-topic") kafkaCommits := atomic.NewInt32(0) - k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) { - kafkaCommits.Add(1) + k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) { + kafkaCommits.Inc() return nil, nil, false }) storageWrites := atomic.NewInt32(0) store := newStoreWrapper(newStore(ctx, t), func(ctx context.Context, block tempodb.WriteableBlock, store storage.Store) error { // Fail the first block write - if storageWrites.Add(1) == 1 { + if storageWrites.Inc() == 1 { return errors.New("failed to write block") } return store.WriteBlock(ctx, block) @@ -149,7 +156,7 @@ func TestBlockbuilder_flushingFails(t *testing.T) { logger := test.NewTestingLogger(t) client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka) - sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles + producedRecords := sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles b := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) @@ -164,6 +171,9 @@ func TestBlockbuilder_flushingFails(t *testing.T) { require.Eventually(t, func() bool { return len(store.BlockMetas(util.FakeTenantID)) >= 1 }, time.Minute, time.Second) + + // Check committed offset + requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) } // Receiving records with older timestamps the block-builder processes them in the current cycle, @@ -172,13 +182,11 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) { ctx, cancel := context.WithCancelCause(context.Background()) t.Cleanup(func() { cancel(errors.New("test done")) }) - k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic") - t.Cleanup(k.Close) + k, address := testkafka.CreateCluster(t, 1, "test-topic") kafkaCommits := atomic.NewInt32(0) - k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) { - k.KeepControl() - kafkaCommits.Add(1) + k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) { + kafkaCommits.Inc() return nil, nil, false }) @@ -221,6 +229,9 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) { require.Eventually(t, func() bool { return len(store.BlockMetas(util.FakeTenantID)) == 2 }, time.Minute, time.Second) + + // Check committed offset + requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) } // FIXME - Test is unstable and will fail if records cross two consumption cycles, @@ -238,15 +249,13 @@ func TestBlockbuilder_committingFails(t *testing.T) { ctx, cancel := context.WithCancelCause(context.Background()) t.Cleanup(func() { cancel(errors.New("test done")) }) - k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic") - t.Cleanup(k.Close) + k, address := testkafka.CreateCluster(t, 1, "test-topic") kafkaCommits := atomic.NewInt32(0) - k.ControlKey(kmsg.OffsetCommit.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { - k.KeepControl() - defer kafkaCommits.Add(1) + k.ControlKey(kmsg.OffsetCommit, func(req kmsg.Request) (kmsg.Response, error, bool) { + kafkaCommits.Inc() - if kafkaCommits.Load() == 0 { // First commit fails + if kafkaCommits.Load() == 1 { // First commit fails res := kmsg.NewOffsetCommitResponse() res.Version = req.GetVersion() res.Topics = []kmsg.OffsetCommitResponseTopic{ @@ -271,7 +280,7 @@ func TestBlockbuilder_committingFails(t *testing.T) { logger := test.NewTestingLogger(t) client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka) - sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles + producedRecords := sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles b := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) @@ -288,6 +297,9 @@ func TestBlockbuilder_committingFails(t *testing.T) { require.Eventually(t, func() bool { return len(store.BlockMetas(util.FakeTenantID)) == 1 // Only one block should have been written }, time.Minute, time.Second) + + // Check committed offset + requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) } func TestCycleEndAtStartup(t *testing.T) { @@ -375,7 +387,7 @@ func blockbuilderConfig(t *testing.T, address string) Config { flagext.DefaultValues(&cfg.IngestStorageConfig.Kafka) cfg.IngestStorageConfig.Kafka.Address = address cfg.IngestStorageConfig.Kafka.Topic = testTopic - cfg.IngestStorageConfig.Kafka.ConsumerGroup = "test-consumer-group" + cfg.IngestStorageConfig.Kafka.ConsumerGroup = testConsumerGroup cfg.AssignedPartitions = map[string][]int32{cfg.InstanceID: {0}} cfg.LookbackOnNoCommit = 15 * time.Second @@ -491,6 +503,7 @@ func countFlushedTraces(store storage.Store) int { return count } +// nolint: revive func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Record { traceID := generateTraceID(t) @@ -504,6 +517,7 @@ func sendReq(t *testing.T, ctx context.Context, client *kgo.Client) []*kgo.Recor return records } +// nolint: revive func sendTracesFor(t *testing.T, ctx context.Context, client *kgo.Client, dur, interval time.Duration) []*kgo.Record { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -532,3 +546,12 @@ func generateTraceID(t *testing.T) []byte { require.NoError(t, err) return traceID } + +// nolint: revive +func requireLastCommitEquals(t testing.TB, ctx context.Context, client *kgo.Client, expectedOffset int64) { + offsets, err := kadm.NewClient(client).FetchOffsetsForTopics(ctx, testConsumerGroup, testTopic) + require.NoError(t, err) + offset, ok := offsets.Lookup(testTopic, testPartition) + require.True(t, ok) + require.Equal(t, expectedOffset, offset.At) +} diff --git a/pkg/ingest/testkafka/cluster.go b/pkg/ingest/testkafka/cluster.go index 08002d87bee..b13a1ed1d2d 100644 --- a/pkg/ingest/testkafka/cluster.go +++ b/pkg/ingest/testkafka/cluster.go @@ -4,148 +4,166 @@ package testkafka import ( "testing" - "time" - "github.com/grafana/dskit/flagext" - "github.com/grafana/tempo/pkg/ingest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kfake" "github.com/twmb/franz-go/pkg/kmsg" ) -// CreateCluster returns a fake Kafka cluster for unit testing. -func CreateCluster(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, ingest.KafkaConfig) { - cluster, addr := CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, topicName) - addSupportForConsumerGroups(t, cluster, topicName, numPartitions) +type controlFn func(kmsg.Request) (kmsg.Response, error, bool) - return cluster, createTestKafkaConfig(addr, topicName) +type Cluster struct { + t testing.TB + fake *kfake.Cluster + topic string + numPartitions int + committedOffsets map[string][]int64 + controlFuncs map[kmsg.Key]controlFn } -func createTestKafkaConfig(clusterAddr, topicName string) ingest.KafkaConfig { - cfg := ingest.KafkaConfig{} - flagext.DefaultValues(&cfg) +// CreateCluster returns a fake Kafka cluster for unit testing. +func CreateCluster(t testing.TB, numPartitions int32, topicName string) (*Cluster, string) { + fake, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(numPartitions, topicName)) + require.NoError(t, err) + t.Cleanup(fake.Close) + + addrs := fake.ListenAddrs() + require.Len(t, addrs, 1) + + c := &Cluster{ + t: t, + fake: fake, + topic: topicName, + numPartitions: int(numPartitions), + committedOffsets: map[string][]int64{}, + controlFuncs: map[kmsg.Key]controlFn{}, + } - cfg.Address = clusterAddr - cfg.Topic = topicName - cfg.WriteTimeout = 2 * time.Second + // Add support for consumer groups + c.fake.ControlKey(kmsg.OffsetCommit.Int16(), c.offsetCommit) + c.fake.ControlKey(kmsg.OffsetFetch.Int16(), c.offsetFetch) - return cfg + return c, addrs[0] } -func CreateClusterWithoutCustomConsumerGroupsSupport(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, string) { - cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(numPartitions, topicName)) - require.NoError(t, err) - t.Cleanup(cluster.Close) +func (c *Cluster) ControlKey(key kmsg.Key, fn controlFn) { + switch key { + case kmsg.OffsetCommit: + // These are called by us for deterministic order + c.controlFuncs[key] = fn + default: + // These are passed through + c.fake.ControlKey(int16(key), fn) + } +} - addrs := cluster.ListenAddrs() - require.Len(t, addrs, 1) +func (c *Cluster) ensureConsumerGroupExists(consumerGroup string) { + if _, ok := c.committedOffsets[consumerGroup]; ok { + return + } + c.committedOffsets[consumerGroup] = make([]int64, c.numPartitions+1) - return cluster, addrs[0] + // Initialise the partition offsets with the special value -1 which means "no offset committed". + for i := 0; i < len(c.committedOffsets[consumerGroup]); i++ { + c.committedOffsets[consumerGroup][i] = -1 + } } -// addSupportForConsumerGroups adds very bare-bones support for one consumer group. -// It expects that only one partition is consumed at a time. -func addSupportForConsumerGroups(t testing.TB, cluster *kfake.Cluster, topicName string, numPartitions int32) { - committedOffsets := map[string][]int64{} +// nolint: revive +func (c *Cluster) offsetCommit(request kmsg.Request) (kmsg.Response, error, bool) { + c.fake.KeepControl() - ensureConsumerGroupExists := func(consumerGroup string) { - if _, ok := committedOffsets[consumerGroup]; ok { - return + if fn := c.controlFuncs[kmsg.OffsetCommit]; fn != nil { + res, err, handled := fn(request) + if handled { + return res, err, handled } - committedOffsets[consumerGroup] = make([]int64, numPartitions+1) + } - // Initialise the partition offsets with the special value -1 which means "no offset committed". - for i := 0; i < len(committedOffsets[consumerGroup]); i++ { - committedOffsets[consumerGroup][i] = -1 - } + commitR := request.(*kmsg.OffsetCommitRequest) + consumerGroup := commitR.Group + c.ensureConsumerGroupExists(consumerGroup) + require.Len(c.t, commitR.Topics, 1, "test only has support for one topic per request") + topic := commitR.Topics[0] + require.Equal(c.t, c.topic, topic.Topic) + require.Len(c.t, topic.Partitions, 1, "test only has support for one partition per request") + + partitionID := topic.Partitions[0].Partition + c.committedOffsets[consumerGroup][partitionID] = topic.Partitions[0].Offset + + resp := request.ResponseKind().(*kmsg.OffsetCommitResponse) + resp.Default() + resp.Topics = []kmsg.OffsetCommitResponseTopic{ + { + Topic: c.topic, + Partitions: []kmsg.OffsetCommitResponseTopicPartition{{Partition: partitionID}}, + }, } - cluster.ControlKey(kmsg.OffsetCommit.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) { - cluster.KeepControl() - commitR := request.(*kmsg.OffsetCommitRequest) - consumerGroup := commitR.Group - ensureConsumerGroupExists(consumerGroup) - assert.Len(t, commitR.Topics, 1, "test only has support for one topic per request") - topic := commitR.Topics[0] - assert.Equal(t, topicName, topic.Topic) - assert.Len(t, topic.Partitions, 1, "test only has support for one partition per request") - - partitionID := topic.Partitions[0].Partition - committedOffsets[consumerGroup][partitionID] = topic.Partitions[0].Offset - - resp := request.ResponseKind().(*kmsg.OffsetCommitResponse) - resp.Default() - resp.Topics = []kmsg.OffsetCommitResponseTopic{ - { - Topic: topicName, - Partitions: []kmsg.OffsetCommitResponseTopicPartition{{Partition: partitionID}}, - }, - } + return resp, nil, true +} - return resp, nil, true - }) - - cluster.ControlKey(kmsg.OffsetFetch.Int16(), func(kreq kmsg.Request) (kmsg.Response, error, bool) { - cluster.KeepControl() - req := kreq.(*kmsg.OffsetFetchRequest) - assert.Len(t, req.Groups, 1, "test only has support for one consumer group per request") - consumerGroup := req.Groups[0].Group - ensureConsumerGroupExists(consumerGroup) - - const allPartitions = -1 - var partitionID int32 - - if len(req.Groups[0].Topics) == 0 { - // An empty request means fetch all topic-partitions for this group. - partitionID = allPartitions - } else { - partitionID = req.Groups[0].Topics[0].Partitions[0] - assert.Len(t, req.Groups[0].Topics, 1, "test only has support for one partition per request") - assert.Len(t, req.Groups[0].Topics[0].Partitions, 1, "test only has support for one partition per request") - } +// nolint: revive +func (c *Cluster) offsetFetch(kreq kmsg.Request) (kmsg.Response, error, bool) { + c.fake.KeepControl() + req := kreq.(*kmsg.OffsetFetchRequest) + require.Len(c.t, req.Groups, 1, "test only has support for one consumer group per request") + consumerGroup := req.Groups[0].Group + c.ensureConsumerGroupExists(consumerGroup) + + const allPartitions = -1 + var partitionID int32 + + if len(req.Groups[0].Topics) == 0 { + // An empty request means fetch all topic-partitions for this group. + partitionID = allPartitions + } else { + partitionID = req.Groups[0].Topics[0].Partitions[0] + assert.Len(c.t, req.Groups[0].Topics, 1, "test only has support for one partition per request") + assert.Len(c.t, req.Groups[0].Topics[0].Partitions, 1, "test only has support for one partition per request") + } - // Prepare the list of partitions for which the offset has been committed. - // This mimics the real Kafka behaviour. - var partitionsResp []kmsg.OffsetFetchResponseGroupTopicPartition - if partitionID == allPartitions { - for i := int32(1); i < numPartitions+1; i++ { - if committedOffsets[consumerGroup][i] >= 0 { - partitionsResp = append(partitionsResp, kmsg.OffsetFetchResponseGroupTopicPartition{ - Partition: i, - Offset: committedOffsets[consumerGroup][i], - }) - } - } - } else { - if committedOffsets[consumerGroup][partitionID] >= 0 { + // Prepare the list of partitions for which the offset has been committed. + // This mimics the real Kafka behaviour. + var partitionsResp []kmsg.OffsetFetchResponseGroupTopicPartition + if partitionID == allPartitions { + for i := 0; i < c.numPartitions; i++ { + if c.committedOffsets[consumerGroup][i] >= 0 { partitionsResp = append(partitionsResp, kmsg.OffsetFetchResponseGroupTopicPartition{ - Partition: partitionID, - Offset: committedOffsets[consumerGroup][partitionID], + Partition: int32(i), + Offset: c.committedOffsets[consumerGroup][i], }) } } - - // Prepare the list topics for which there are some committed offsets. - // This mimics the real Kafka behaviour. - var topicsResp []kmsg.OffsetFetchResponseGroupTopic - if len(partitionsResp) > 0 { - topicsResp = []kmsg.OffsetFetchResponseGroupTopic{ - { - Topic: topicName, - Partitions: partitionsResp, - }, - } + } else { + if c.committedOffsets[consumerGroup][partitionID] >= 0 { + partitionsResp = append(partitionsResp, kmsg.OffsetFetchResponseGroupTopicPartition{ + Partition: partitionID, + Offset: c.committedOffsets[consumerGroup][partitionID], + }) } + } - resp := kreq.ResponseKind().(*kmsg.OffsetFetchResponse) - resp.Default() - resp.Groups = []kmsg.OffsetFetchResponseGroup{ + // Prepare the list topics for which there are some committed offsets. + // This mimics the real Kafka behaviour. + var topicsResp []kmsg.OffsetFetchResponseGroupTopic + if len(partitionsResp) > 0 { + topicsResp = []kmsg.OffsetFetchResponseGroupTopic{ { - Group: consumerGroup, - Topics: topicsResp, + Topic: c.topic, + Partitions: partitionsResp, }, } - return resp, nil, true - }) + } + + resp := kreq.ResponseKind().(*kmsg.OffsetFetchResponse) + resp.Default() + resp.Groups = []kmsg.OffsetFetchResponseGroup{ + { + Group: consumerGroup, + Topics: topicsResp, + }, + } + return resp, nil, true }