Skip to content

Commit b9c0b2b

Browse files
feat: introduce retry configuration concurrency (#146)
* feat: introduce retry configuration concurrency * docs: add retry config concurrency to the readme
1 parent 81762bd commit b9c0b2b

File tree

3 files changed

+10
-1
lines changed

3 files changed

+10
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
255255
| `retryConfiguration.topic` | Retry/Exception topic names | |
256256
| `retryConfiguration.brokers` | Retry topic brokers urls | |
257257
| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 |
258+
| `retryConfiguration.concurrency` | Number of goroutines used at listeners | 1 |
258259
| `retryConfiguration.tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
259260
| `retryConfiguration.tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
260261
| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | |

consumer_config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
125125
Duration: cfg.RetryConfiguration.WorkDuration,
126126
MaxRetry: cfg.RetryConfiguration.MaxRetry,
127127
VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup,
128-
Concurrency: cfg.Concurrency,
128+
Concurrency: cfg.RetryConfiguration.Concurrency,
129129
MinBytes: cfg.Reader.MinBytes,
130130
MaxBytes: cfg.Reader.MaxBytes,
131131
MaxWait: cfg.Reader.MaxWait,
@@ -224,6 +224,7 @@ type RetryConfiguration struct {
224224
MaxRetry int
225225
WorkDuration time.Duration
226226
SkipMessageByHeaderFn SkipMessageByHeaderFn
227+
Concurrency int
227228
}
228229

229230
type BatchConfiguration struct {
@@ -284,6 +285,10 @@ func (cfg *ConsumerConfig) setDefaults() {
284285
cfg.Concurrency = 1
285286
}
286287

288+
if cfg.RetryConfiguration.Concurrency == 0 {
289+
cfg.RetryConfiguration.Concurrency = cfg.Concurrency
290+
}
291+
287292
if cfg.CommitInterval == 0 {
288293
cfg.CommitInterval = time.Second
289294
// Kafka-go library default value is 0, we need to also change this.

consumer_config_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ func TestConsumerConfig_validate(t *testing.T) {
3535
if cfg.BatchConfiguration != nil {
3636
t.Fatalf("Batch configuration not specified so it must stay as nil")
3737
}
38+
if cfg.RetryConfiguration.Concurrency != 1 {
39+
t.Fatal("Retry Configuration Concurrency must equal to 1")
40+
}
3841
})
3942
t.Run("Set_Defaults_For_BatchConfiguration", func(t *testing.T) {
4043
// Given

0 commit comments

Comments
 (0)