From 0f5c4777c5d7369397c28a0160ca1c700c51ef25 Mon Sep 17 00:00:00 2001 From: Owen Niles Date: Mon, 15 Jul 2024 11:19:37 -0400 Subject: [PATCH] Add options for librdkafka partitioning compatibility --- builders.go | 19 +++++++++++++++++++ options.go | 32 +++++++++++++++++++++++++------- view.go | 25 ++++++++++++++++++------- 3 files changed, 62 insertions(+), 14 deletions(-) diff --git a/builders.go b/builders.go index 65339d36..28293537 100644 --- a/builders.go +++ b/builders.go @@ -26,6 +26,25 @@ func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder { } } +// ProducerBuilderWithHashPartitionerOptions creates a Kafka producer using the +// Sarama library. It can be used to configure the partitioner. If both +// sarama.WithCustomHashFunction and goka.WithHasher are used to set the hasher, +// the former will take precedence. +func ProducerBuilderWithHashPartitionerOptions(opts ...sarama.HashPartitionerOption) ProducerBuilder { + return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) { + config := globalConfig + defaults := []sarama.HashPartitionerOption{ + // hasher may be goka.DefaultHasher or it may have been modified by + // goka.WithHasher. It may be overridden by opts. + sarama.WithCustomHashFunction(hasher), + } + opts = append(defaults, opts...) + config.ClientID = clientID + config.Producer.Partitioner = sarama.NewCustomPartitioner(opts...) + return NewProducer(brokers, &config) + } +} + // TopicManagerBuilder creates a TopicManager to check partition counts and // create tables. type TopicManagerBuilder func(brokers []string) (TopicManager, error) diff --git a/options.go b/options.go index 1e0771cf..29d5568b 100644 --- a/options.go +++ b/options.go @@ -401,17 +401,26 @@ func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption { // view options /////////////////////////////////////////////////////////////////////////////// +type viewPartitionerCompat int + +const ( + // Interpret the bytes of the key digest as an unsigned integer to match + // librdkafka's partitioning behavior. + librdkafkaCompat viewPartitionerCompat = iota + 1 +) + // ViewOption defines a configuration option to be used when creating a view. type ViewOption func(*voptions, Table, Codec) type voptions struct { - log logger - clientID string - tableCodec Codec - updateCallback UpdateCallback - hasher func() hash.Hash32 - autoreconnect bool - backoffResetTime time.Duration + log logger + clientID string + tableCodec Codec + updateCallback UpdateCallback + hasher func() hash.Hash32 + partitionerCompat viewPartitionerCompat + autoreconnect bool + backoffResetTime time.Duration builders struct { storage storage.Builder @@ -476,6 +485,15 @@ func WithViewHasher(hasher func() hash.Hash32) ViewOption { } } +// WithViewHashUnsigned instructs the partitioner to interpret the key digest +// as an unsigned integer when partitioning. Combine this option with the +// CRC-32 hash algorithm for compatibility with librdkafka. +func WithViewHashUnsigned() ViewOption { + return func(o *voptions, table Table, codec Codec) { + o.partitionerCompat = librdkafkaCompat + } +} + // WithViewClientID defines the client ID used to identify with Kafka. func WithViewClientID(clientID string) ViewOption { return func(o *voptions, table Table, codec Codec) { diff --git a/view.go b/view.go index a614a219..1697ae7c 100644 --- a/view.go +++ b/view.go @@ -295,6 +295,11 @@ func (v *View) close() error { } func (v *View) hash(key string) (int32, error) { + numPartitions := len(v.partitions) + if numPartitions < 1 { + return 0, errors.New("no partitions found") + } + // create a new hasher every time. Alternative would be to store the hash in // view and every time reset the hasher (ie, hasher.Reset()). But that would // also require us to protect the access of the hasher with a mutex. @@ -304,14 +309,20 @@ func (v *View) hash(key string) (int32, error) { if err != nil { return -1, err } - hash := int32(hasher.Sum32()) - if hash < 0 { - hash = -hash - } - if len(v.partitions) == 0 { - return 0, errors.New("no partitions found") + + var partition int32 + hash := hasher.Sum32() + if v.opts.partitionerCompat == librdkafkaCompat { + partition = int32(hash % uint32(numPartitions)) + } else { + partition = int32(hash) % int32(numPartitions) + + if partition < 0 { + partition = -partition + } } - return hash % int32(len(v.partitions)), nil + + return partition, nil } func (v *View) find(key string) (*PartitionTable, error) {