Skip to content

Commit a4fac27

Browse files
feat: add kafka rack to consumer configurations
1 parent d965996 commit a4fac27

File tree

2 files changed

+5
-0
lines changed

2 files changed

+5
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func consumeFn(message kafka.Message) error {
8787
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info | |
8888
| `concurrency` | Number of goroutines used at listeners | runtime.NumCPU() |
8989
| `retryEnabled` | Retry/Exception consumer is working or not | false |
90+
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
9091
| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | |
9192
| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | |
9293
| `retryConfiguration.topic` | Retry/Exception topic names | |

consumer_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type ConsumeFn func(Message) error
1414
type ConsumerConfig struct {
1515
Reader ReaderConfig
1616

17+
Rack string
1718
SASL *SASLConfig
1819
TLS *TLSConfig
1920

@@ -61,6 +62,9 @@ func (c *ConsumerConfig) newKafkaReader() (*kafka.Reader, error) {
6162

6263
reader := kafka.ReaderConfig(c.Reader)
6364
reader.Dialer = dialer
65+
if c.Rack != "" {
66+
reader.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: c.Rack}}
67+
}
6468

6569
return kafka.NewReader(reader), nil
6670
}

0 commit comments

Comments
 (0)