Skip to content

Commit 0910c53

Browse files
feat: add broker, sasl, tls configuration to retry configuration
1 parent df41605 commit 0910c53

File tree

3 files changed

+41
-33
lines changed

3 files changed

+41
-33
lines changed

README.md

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -81,29 +81,34 @@ func consumeFn(message kafka.Message) error {
8181

8282
## Configurations
8383

84-
| config | description | default |
85-
|------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|------------------|
86-
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig) | |
87-
| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
88-
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
89-
| `concurrency` | Number of goroutines used at listeners | runtime.NumCPU() |
90-
| `retryEnabled` | Retry/Exception consumer is working or not | false |
91-
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
92-
| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | |
93-
| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | |
94-
| `retryConfiguration.topic` | Retry/Exception topic names | |
95-
| `retryConfiguration.brokers` | Retry topic brokers urls | |
96-
| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 |
97-
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
98-
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
99-
| `sasl.authType` | `SCRAM` or `PLAIN` | |
100-
| `sasl.username` | SCRAM OR PLAIN username | |
101-
| `sasl.password` | SCRAM OR PLAIN password | |
102-
| `logger` | If you want to custom logger | info |
103-
| `apiEnabled` | Enabled metrics | false |
104-
| `apiConfiguration.port` | Set API port | 8090 |
105-
| `apiConfiguration.healtCheckPath` | Set Health check path | healthcheck |
106-
| `metricConfiguration.path` | Set metric endpoint path | /metrics |
84+
| config | description | default |
85+
|---------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|------------------|
86+
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/[email protected]#ReaderConfig) | |
87+
| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
88+
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
89+
| `concurrency` | Number of goroutines used at listeners | runtime.NumCPU() |
90+
| `retryEnabled` | Retry/Exception consumer is working or not | false |
91+
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
92+
| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | |
93+
| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | |
94+
| `retryConfiguration.topic` | Retry/Exception topic names | |
95+
| `retryConfiguration.brokers` | Retry topic brokers urls | |
96+
| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 |
97+
| `retryConfiguration.tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
98+
| `retryConfiguration.tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
99+
| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | |
100+
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
101+
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
102+
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
103+
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
104+
| `sasl.authType` | `SCRAM` or `PLAIN` | |
105+
| `sasl.username` | SCRAM OR PLAIN username | |
106+
| `sasl.password` | SCRAM OR PLAIN password | |
107+
| `logger` | If you want to custom logger | info |
108+
| `apiEnabled` | Enabled metrics | false |
109+
| `apiConfiguration.port` | Set API port | 8090 |
110+
| `apiConfiguration.healtCheckPath` | Set Health check path | healthcheck |
111+
| `metricConfiguration.path` | Set metric endpoint path | /metrics |
107112

108113
## Monitoring
109114

consumer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,19 @@ func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
8686
LogLevel: "info",
8787
}
8888

89-
if !cfg.SASL.IsEmpty() {
89+
if !cfg.RetryConfiguration.SASL.IsEmpty() {
9090
c.logger.Debug("Setting cronsumer SASL configurations...")
9191
kcronsumerCfg.SASL.Enabled = true
92-
kcronsumerCfg.SASL.AuthType = string(cfg.SASL.Type)
93-
kcronsumerCfg.SASL.Username = cfg.SASL.Username
94-
kcronsumerCfg.SASL.Password = cfg.SASL.Password
95-
kcronsumerCfg.SASL.Rack = cfg.Rack
92+
kcronsumerCfg.SASL.AuthType = string(cfg.RetryConfiguration.SASL.Type)
93+
kcronsumerCfg.SASL.Username = cfg.RetryConfiguration.SASL.Username
94+
kcronsumerCfg.SASL.Password = cfg.RetryConfiguration.SASL.Password
95+
kcronsumerCfg.SASL.Rack = cfg.RetryConfiguration.Rack
9696
}
9797

98-
if !cfg.TLS.IsEmpty() {
98+
if !cfg.RetryConfiguration.TLS.IsEmpty() {
9999
c.logger.Debug("Setting cronsumer TLS configurations...")
100-
kcronsumerCfg.SASL.RootCAPath = cfg.TLS.RootCAPath
101-
kcronsumerCfg.SASL.IntermediateCAPath = cfg.TLS.IntermediateCAPath
100+
kcronsumerCfg.SASL.RootCAPath = cfg.RetryConfiguration.TLS.RootCAPath
101+
kcronsumerCfg.SASL.IntermediateCAPath = cfg.RetryConfiguration.TLS.IntermediateCAPath
102102
}
103103

104104
c.retryTopic = cfg.RetryConfiguration.Topic

consumer_config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@ type MetricConfiguration struct {
4949
}
5050

5151
type RetryConfiguration struct {
52-
Brokers []string
53-
Topic string
5452
MaxRetry int
5553
StartTimeCron string
5654
WorkDuration time.Duration
55+
Brokers []string
56+
Topic string
57+
SASL *SASLConfig
58+
TLS *TLSConfig
59+
Rack string
5760
}
5861

5962
func (c *ConsumerConfig) newKafkaDialer() (*kafka.Dialer, error) {

0 commit comments

Comments
 (0)