Skip to content

Commit

Permalink
ingest: manually create Kafka topic instead of configuring num.partit…
Browse files Browse the repository at this point in the history
…ions (#10101)

* ingest: don't fail on missing topics, run in e2e tests

This PR does three things:
1. handles unknown topic/partition errors when seeking to the right offset when starting concurrentFetchers.
2. handles unknown topic/partition errors when getting the topic ID when starting concurrentFetchers
3. enables concurrent fetching and ingestion in e2e tests

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Create topic instead of relying on num.partitions

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix comment

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove auto-create client config

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update config descriptors

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Revert "Remove auto-create client config"

This reverts commit 98a542c.

* Reapply "Remove auto-create client config"

This reverts commit 3eb78b1.

* Add lint rule

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Export createTopic

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove other uses of AllowAutoTopicCreation

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Dec 6, 2024
1 parent 1bf2549 commit 36da907
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 95 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,12 @@ lint: check-makefiles
"google.golang.org/grpc/metadata.{FromIncomingContext}=google.golang.org/grpc/metadata.ValueFromIncomingContext" \
./pkg/... ./cmd/... ./integration/...

# We don't use topic auto-creation because we don't control the num.partitions.
# As a result the topic can be created with the wrong number of partitions.
faillint -paths \
"github.com/twmb/franz-go/pkg/kgo.{AllowAutoTopicCreation}" \
./pkg/... ./cmd/... ./tools/... ./integration/...

format: ## Run gofmt and goimports.
find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec gofmt -w -s {} \;
find . $(DONT_FIND) -name '*.pb.go' -prune -o -type f -name '*.go' -exec goimports -w -local github.com/grafana/mimir {} \;
Expand Down
6 changes: 3 additions & 3 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6711,7 +6711,7 @@
"kind": "field",
"name": "auto_create_topic_enabled",
"required": false,
"desc": "Enable auto-creation of Kafka topic if it doesn't exist.",
"desc": "Enable auto-creation of Kafka topic on startup if it doesn't exist. If creating the topic fails and the topic doesn't already exist, Mimir will fail to start.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "ingest-storage.kafka.auto-create-topic-enabled",
Expand All @@ -6721,9 +6721,9 @@
"kind": "field",
"name": "auto_create_topic_default_partitions",
"required": false,
"desc": "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.",
"desc": "When auto-creation of Kafka topic is enabled and this value is positive, Mimir will create the topic with this number of partitions. When the value is -1 the Kafka broker will use the default number of partitions (num.partitions configuration).",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldDefaultValue": -1,
"fieldFlag": "ingest-storage.kafka.auto-create-topic-default-partitions",
"fieldType": "int"
},
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1498,9 +1498,9 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.address string
The Kafka backend address.
-ingest-storage.kafka.auto-create-topic-default-partitions int
When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.
When auto-creation of Kafka topic is enabled and this value is positive, Mimir will create the topic with this number of partitions. When the value is -1 the Kafka broker will use the default number of partitions (num.partitions configuration). (default -1)
-ingest-storage.kafka.auto-create-topic-enabled
Enable auto-creation of Kafka topic if it doesn't exist. (default true)
Enable auto-creation of Kafka topic on startup if it doesn't exist. If creating the topic fails and the topic doesn't already exist, Mimir will fail to start. (default true)
-ingest-storage.kafka.client-id string
The Kafka client ID.
-ingest-storage.kafka.consume-from-position-at-startup string
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,9 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.address string
The Kafka backend address.
-ingest-storage.kafka.auto-create-topic-default-partitions int
When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.
When auto-creation of Kafka topic is enabled and this value is positive, Mimir will create the topic with this number of partitions. When the value is -1 the Kafka broker will use the default number of partitions (num.partitions configuration). (default -1)
-ingest-storage.kafka.auto-create-topic-enabled
Enable auto-creation of Kafka topic if it doesn't exist. (default true)
Enable auto-creation of Kafka topic on startup if it doesn't exist. If creating the topic fails and the topic doesn't already exist, Mimir will fail to start. (default true)
-ingest-storage.kafka.client-id string
The Kafka client ID.
-ingest-storage.kafka.consume-from-position-at-startup string
Expand Down
16 changes: 7 additions & 9 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3914,20 +3914,18 @@ kafka:
# CLI flag: -ingest-storage.kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]
# Enable auto-creation of Kafka topic if it doesn't exist.
# Enable auto-creation of Kafka topic on startup if it doesn't exist. If
# creating the topic fails and the topic doesn't already exist, Mimir will
# fail to start.
# CLI flag: -ingest-storage.kafka.auto-create-topic-enabled
[auto_create_topic_enabled: <boolean> | default = true]
# When auto-creation of Kafka topic is enabled and this value is positive,
# Kafka's num.partitions configuration option is set on Kafka brokers with
# this value when Mimir component that uses Kafka starts. This configuration
# option specifies the default number of partitions that the Kafka broker uses
# for auto-created topics. Note that this is a Kafka-cluster wide setting, and
# applies to any auto-created topic. If the setting of num.partitions fails,
# Mimir proceeds anyways, but auto-created topics could have an incorrect
# number of partitions.
# Mimir will create the topic with this number of partitions. When the value
# is -1 the Kafka broker will use the default number of partitions
# (num.partitions configuration).
# CLI flag: -ingest-storage.kafka.auto-create-topic-default-partitions
[auto_create_topic_default_partitions: <int> | default = 0]
[auto_create_topic_default_partitions: <int> | default = -1]
# The maximum size of a Kafka record data that should be generated by the
# producer. An incoming write request larger than this size is split into
Expand Down
6 changes: 6 additions & 0 deletions integration/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ blocks_storage:
// Do not wait before switching an INACTIVE partition to ACTIVE.
"-ingester.partition-ring.min-partition-owners-count": "0",
"-ingester.partition-ring.min-partition-owners-duration": "0s",

// Enable ingestion and fetch concurrency
"-ingest-storage.kafka.ongoing-fetch-concurrency": "12",
"-ingest-storage.kafka.startup-fetch-concurrency": "12",
"-ingest-storage.kafka.ingestion-concurrency-max": "8",
"-ingest-storage.kafka.auto-create-topic-default-partitions": "10",
}
}
)
Expand Down
1 change: 0 additions & 1 deletion pkg/blockbuilder/kafkautil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func TestKafkaMarshallingCommitMeta(t *testing.T) {
func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client {
writeClient, err := kgo.NewClient(
kgo.SeedBrokers(addrs...),
kgo.AllowAutoTopicCreation(),
// We will choose the partition of each record.
kgo.RecordPartitioner(kgo.ManualPartitioner()),
)
Expand Down
1 change: 0 additions & 1 deletion pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client {
writeClient, err := kgo.NewClient(
kgo.SeedBrokers(addrs...),
kgo.AllowAutoTopicCreation(),
// We will choose the partition of each record.
kgo.RecordPartitioner(kgo.ManualPartitioner()),
)
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
ErrInconsistentSASLCredentials = fmt.Errorf("the SASL username and password must be both configured to enable SASL authentication")
ErrInvalidIngestionConcurrencyMax = errors.New("ingest-storage.kafka.ingestion-concurrency-max must either be set to 0 or to a value greater than 0")
ErrInvalidIngestionConcurrencyParams = errors.New("ingest-storage.kafka.ingestion-concurrency-queue-capacity, ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample, ingest-storage.kafka.ingestion-concurrency-batch-size and ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard must be greater than 0")
ErrInvalidAutoCreateTopicParams = errors.New("ingest-storage.kafka.auto-create-topic-default-partitions must be -1 or greater than 0 when ingest-storage.kafka.auto-create-topic-default-partitions=true")

consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp}

Expand Down Expand Up @@ -168,8 +169,8 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.DurationVar(&cfg.TargetConsumerLagAtStartup, targetConsumerLagAtStartupFlag, 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+howToDisableConsumerLagAtStartup)
f.DurationVar(&cfg.MaxConsumerLagAtStartup, maxConsumerLagAtStartupFlag, 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+howToDisableConsumerLagAtStartup)

f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.")
f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.")
f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic on startup if it doesn't exist. If creating the topic fails and the topic doesn't already exist, Mimir will fail to start.")
f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", -1, "When auto-creation of Kafka topic is enabled and this value is positive, Mimir will create the topic with this number of partitions. When the value is -1 the Kafka broker will use the default number of partitions (num.partitions configuration).")

f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")
Expand Down Expand Up @@ -247,6 +248,10 @@ func (cfg *KafkaConfig) Validate() error {
}
}

if partitions := cfg.AutoCreateTopicDefaultPartitions; cfg.AutoCreateTopicEnabled && (partitions != -1 && partitions < 1) {
return ErrInvalidAutoCreateTopicParams
}

return nil
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/ingest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,25 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: ErrInvalidIngestionConcurrencyParams,
},
"should fail when auto create topic default partitions is lower than 1": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
cfg.KafkaConfig.Topic = "test"
cfg.KafkaConfig.AutoCreateTopicEnabled = true
cfg.KafkaConfig.AutoCreateTopicDefaultPartitions = -100
},
expectedErr: ErrInvalidAutoCreateTopicParams,
},
"should pass when auto create topic default partitions is -1 (using Kafka broker's default)": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
cfg.KafkaConfig.Topic = "test"
cfg.KafkaConfig.AutoCreateTopicEnabled = true
cfg.KafkaConfig.AutoCreateTopicDefaultPartitions = -1
},
},
}

for testName, testData := range tests {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func (r *PartitionReader) EstimatedBytesPerRecord() int64 {

func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
if r.kafkaCfg.AutoCreateTopicEnabled {
setDefaultNumberOfPartitionsForAutocreatedTopics(r.kafkaCfg, r.logger)
if err := CreateTopic(r.kafkaCfg, r.logger); err != nil {
return err
}
}

// Stop dependencies if the start() fails.
Expand Down
61 changes: 29 additions & 32 deletions pkg/storage/ingest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package ingest

import (
"context"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/plain"
Expand Down Expand Up @@ -94,10 +96,6 @@ func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger lo
}),
}

if cfg.AutoCreateTopicEnabled {
opts = append(opts, kgo.AllowAutoTopicCreation())
}

// SASL plain auth.
if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" {
opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) {
Expand Down Expand Up @@ -154,45 +152,44 @@ func (w *resultPromise[T]) wait(ctx context.Context) (T, error) {
}
}

// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
// This is best-effort, if setting the option fails, error is logged, but not returned.
func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg KafkaConfig, logger log.Logger) {
if cfg.AutoCreateTopicDefaultPartitions <= 0 {
return
}
// CreateTopic creates the topic in the Kafka cluster. If creating the topic fails, then an error is returned.
// If the topic already exists, then the function logs a message and returns nil.
func CreateTopic(cfg KafkaConfig, logger log.Logger) error {
logger = log.With(logger, "task", "autocreate_topic")

cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...)
if err != nil {
level.Error(logger).Log("msg", "failed to create kafka client", "err", err)
return
return fmt.Errorf("failed to create kafka client: %w", err)
}

adm := kadm.NewClient(cl)
defer adm.Close()
ctx := context.Background()

defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
responses, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
{
Op: kadm.SetConfig,
Name: "num.partitions",
Value: &defaultNumberOfPartitions,
},
})

// Check if any error has been returned as part of the response.
// As of kafka 2.4 we can pass -1 and the broker will use its default configuration.
const defaultReplication = -1
resp, err := adm.CreateTopic(ctx, int32(cfg.AutoCreateTopicDefaultPartitions), defaultReplication, nil, cfg.Topic)
if err == nil {
for _, res := range responses {
if res.Err != nil {
err = res.Err
break
}
}
err = resp.Err
}

if err != nil {
level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err)
return
if errors.Is(err, kerr.TopicAlreadyExists) {
level.Info(logger).Log(
"msg", "topic already exists",
"topic", resp.Topic,
"num_partitions", resp.NumPartitions,
"replication_factor", resp.ReplicationFactor,
)
return nil
}
return fmt.Errorf("failed to create topic %s: %w", cfg.Topic, err)
}

level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions)
level.Info(logger).Log(
"msg", "successfully created topic",
"topic", resp.Topic,
"num_partitions", resp.NumPartitions,
"replication_factor", resp.ReplicationFactor,
)
return nil
}
Loading

0 comments on commit 36da907

Please sign in to comment.