Skip to content

Commit 593edeb

Browse files
Abdulsametilerielif.bayrakdarElifBayrakdar
authored
feat: send direct to dead letter feature (#178)
* feat: add SendDirectToDeadLetter with producer * feat: implement deadletter feature on single and batch consumers * fix: deadletter producer close only if it initialized * fix: producer initialize topic not set message topic and refactor example * refactor: retryWithBackoff variadic func * docs: add documentation * test: add integration tests * test: add unit tests * fix: linter --------- Co-authored-by: elif.bayrakdar <[email protected]> Co-authored-by: ElifBayrakdar <[email protected]>
1 parent 0804108 commit 593edeb

File tree

13 files changed

+789
-82
lines changed

13 files changed

+789
-82
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,15 @@ Under the [examples](examples) - [with-sasl-plaintext](examples/with-sasl-plaint
222222
of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command `docker compose up`
223223
under [the specified folder](examples/with-sasl-plaintext) and then start the application.
224224

225+
#### With Send Direct To Dead Letter
226+
227+
This feature lets you send a message directly to a dead-letter topic by setting `message.SendDirectToDeadLetter = true` inside your `ConsumeFn` (or selectively for items in `BatchConsumeFn`). When this flag is set and your `ConsumeFn` returns an error, the message is produced to the configured dead-letter topic with an `x-error-message` header.
228+
229+
- Dead letter topic resolution order: if `consumer.DeadLetterTopic` is set, it is used; otherwise `retryConfiguration.deadLetterTopic` is used.
230+
- You can set `message.ErrDescription` to override the error header value written as `x-error-message`.
231+
232+
Please refer to [Send Direct To Dead Letter Example](examples/with-send-direct-to-deadletter) to run both single and batch scenarios.
233+
225234
## Configurations
226235

227236
| config | description | default |
@@ -233,6 +242,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
233242
| `concurrency` | Number of goroutines used at listeners | 1 |
234243
| `retryEnabled` | Retry/Exception consumer is working or not | false |
235244
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
245+
| `deadLetterTopic` | Dead-letter topic name to produce messages when `message.SendDirectToDeadLetter` is true. If empty, `retryConfiguration.deadLetterTopic` will be used. | "" |
236246
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
237247
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
238248
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | |
@@ -255,6 +265,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
255265
| `retryConfiguration.producerBatchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#WriterConfig.BatchSize) | 100 |
256266
| `retryConfiguration.producerBatchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/[email protected]#WriterConfig.BatchTimeout) | 100 |
257267
| `retryConfiguration.topic` | Retry/Exception topic names | |
268+
| `retryConfiguration.deadLetterTopic` | Dead-letter topic name used by the retry/exception pipeline; also used as a fallback for direct dead-lettering when top-level `deadLetterTopic` is empty. | |
258269
| `retryConfiguration.brokers` | Retry topic brokers urls | |
259270
| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 |
260271
| `retryConfiguration.concurrency` | Number of goroutines used at listeners | 1 |

batch_consumer.go

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -252,39 +252,70 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
252252
consumeErr := b.consumeFn(chunkMessages)
253253

254254
if consumeErr != nil {
255-
if b.transactionalRetry {
256-
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
257-
// Try to process same messages again for resolving transient network errors etc.
258-
if consumeErr = b.consumeFn(chunkMessages); consumeErr != nil {
259-
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
260-
b.metric.IncrementTotalUnprocessedMessagesCounter(int64(len(chunkMessages)))
255+
// Handle SendDirectToDeadLetter messages first
256+
deadLetterMessages := make([]Message, 0, len(chunkMessages))
257+
remainingMessages := make([]*Message, 0, len(chunkMessages))
258+
259+
for _, msg := range chunkMessages {
260+
if msg.SendDirectToDeadLetter {
261+
msg.AddHeader(Header{
262+
Key: errMessageKey,
263+
Value: []byte(getErrorMessage(consumeErr, msg)),
264+
})
265+
msg.Topic = "" // we set on initialize for dead letter producer
266+
deadLetterMessages = append(deadLetterMessages, *msg)
267+
} else {
268+
remainingMessages = append(remainingMessages, msg)
261269
}
262-
} else {
263-
failedCount := countFailedMessages(chunkMessages)
264-
b.metric.IncrementTotalUnprocessedMessagesCounter(failedCount)
265-
b.metric.IncrementTotalProcessedMessagesCounter(int64(len(chunkMessages)) - failedCount)
266270
}
267271

268-
if consumeErr != nil && b.retryEnabled {
269-
cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages))
270-
errorMessage := consumeErr.Error()
272+
if len(deadLetterMessages) > 0 {
273+
// Send SendDirectToDeadLetter=true messages to dead letter topic
274+
if err := b.sendToDeadLetterWithBackoff(deadLetterMessages...); err != nil {
275+
errorMsg := fmt.Sprintf(
276+
"Error producing messages to dead letter topic. Error: %s", err.Error())
277+
b.logger.Error(errorMsg)
278+
panic(errorMsg)
279+
}
280+
281+
b.metric.IncrementTotalUnprocessedMessagesCounter(int64(len(deadLetterMessages)))
282+
}
283+
284+
// Process remaining messages with normal logic if any
285+
if len(remainingMessages) > 0 {
271286
if b.transactionalRetry {
272-
for i := range chunkMessages {
273-
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
287+
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
288+
// Try to process same messages again for resolving transient network errors etc.
289+
if consumeErr = b.consumeFn(remainingMessages); consumeErr != nil {
290+
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
291+
b.metric.IncrementTotalUnprocessedMessagesCounter(int64(len(remainingMessages)))
274292
}
275293
} else {
276-
for i := range chunkMessages {
277-
if chunkMessages[i].IsFailed {
278-
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
294+
failedCount := countFailedMessages(remainingMessages)
295+
b.metric.IncrementTotalUnprocessedMessagesCounter(failedCount)
296+
b.metric.IncrementTotalProcessedMessagesCounter(int64(len(remainingMessages)) - failedCount)
297+
}
298+
299+
if b.retryEnabled {
300+
cronsumerMessages := make([]kcronsumer.Message, 0, len(remainingMessages))
301+
if b.transactionalRetry {
302+
for i := range remainingMessages {
303+
cronsumerMessages = append(cronsumerMessages, remainingMessages[i].toRetryableMessage(b.retryTopic, consumeErr))
304+
}
305+
} else {
306+
for i := range remainingMessages {
307+
if remainingMessages[i].IsFailed {
308+
cronsumerMessages = append(cronsumerMessages, remainingMessages[i].toRetryableMessage(b.retryTopic, consumeErr))
309+
}
279310
}
280311
}
281-
}
282312

283-
if err := b.retryBatchWithBackoff(cronsumerMessages); err != nil {
284-
errorMsg := fmt.Sprintf(
285-
"Error producing messages to exception/retry topic: %s. Error: %s", b.retryTopic, err.Error())
286-
b.logger.Error(errorMsg)
287-
panic(errorMsg)
313+
if err := b.retryWithBackoff(cronsumerMessages...); err != nil {
314+
errorMsg := fmt.Sprintf(
315+
"Error producing messages to exception/retry topic: %s. Error: %s", b.retryTopic, err.Error())
316+
b.logger.Error(errorMsg)
317+
panic(errorMsg)
318+
}
288319
}
289320
}
290321
}
@@ -293,18 +324,3 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
293324
b.metric.IncrementTotalProcessedMessagesCounter(int64(len(chunkMessages)))
294325
}
295326
}
296-
297-
func (b *batchConsumer) retryBatchWithBackoff(retryableMessages []kcronsumer.Message) error {
298-
var produceErr error
299-
300-
for attempt := 1; attempt <= 5; attempt++ {
301-
produceErr = b.base.cronsumer.ProduceBatch(retryableMessages)
302-
if produceErr == nil {
303-
return nil
304-
}
305-
b.logger.Warnf("Error producing message (attempt %d/%d): %v", attempt, 5, produceErr)
306-
time.Sleep((50 * time.Millisecond) * time.Duration(1<<attempt))
307-
}
308-
309-
return produceErr
310-
}

batch_consumer_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,111 @@ func Test_batchConsumer_process(t *testing.T) {
346346
// When && Then
347347
bc.process([]*Message{{}, {}, {}})
348348
})
349+
350+
t.Run("When_SendDirectToDeadLetter_True_And_RetryEnabled_Should_Not_Call_Retry", func(t *testing.T) {
351+
// Given
352+
mdlp := &mockDeadLetterProducer{}
353+
mc := mockCronsumer{wantErr: true, retryBehaviorOpen: true, maxRetry: 5}
354+
bc := batchConsumer{
355+
base: &base{
356+
metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), deadLetterProducer: mdlp,
357+
retryEnabled: true, cronsumer: &mc,
358+
},
359+
consumeFn: func(_ []*Message) error { return errors.New("err occurred") },
360+
}
361+
msgs := []*Message{
362+
{Key: []byte("1"), Value: []byte("foo"), SendDirectToDeadLetter: true},
363+
{Key: []byte("2"), Value: []byte("bar"), SendDirectToDeadLetter: true},
364+
}
365+
366+
// When
367+
bc.process(msgs)
368+
369+
// Then
370+
if mdlp.produceCalled != 1 {
371+
t.Fatalf("dead letter producer must be called once, got %d", mdlp.produceCalled)
372+
}
373+
if mc.times != 0 {
374+
t.Fatalf("retry cronsumer must not be called, got %d calls", mc.times)
375+
}
376+
})
377+
378+
t.Run("When_SendDirectToDeadLetter_True_Should_Add_Error_Header_And_Metrics", func(t *testing.T) {
379+
// Given
380+
mdlp := &mockDeadLetterProducer{}
381+
bc := batchConsumer{
382+
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), deadLetterProducer: mdlp},
383+
consumeFn: func(_ []*Message) error { return errors.New("err occurred") },
384+
}
385+
msgs := []*Message{{Key: []byte("1"), Value: []byte("foo"), SendDirectToDeadLetter: true}}
386+
387+
// When
388+
bc.process(msgs)
389+
390+
// Then
391+
if mdlp.produceCalled != 1 {
392+
t.Fatalf("dead letter producer must be called once, got %d", mdlp.produceCalled)
393+
}
394+
if len(mdlp.received) != 1 {
395+
t.Fatalf("dead letter received length must be 1, got %d", len(mdlp.received))
396+
}
397+
produced := mdlp.received[0]
398+
if produced.Topic != "" {
399+
t.Fatalf("produced message Topic must be empty, got %q", produced.Topic)
400+
}
401+
assertErrHeader(t, produced, "err occurred")
402+
if bc.metric.totalUnprocessedMessagesCounter != 1 {
403+
t.Fatalf("totalUnprocessedMessagesCounter must be 1, got %d", bc.metric.totalUnprocessedMessagesCounter)
404+
}
405+
if bc.metric.totalProcessedMessagesCounter != 0 {
406+
t.Fatalf("totalProcessedMessagesCounter must be 0, got %d", bc.metric.totalProcessedMessagesCounter)
407+
}
408+
})
409+
410+
t.Run("When_SendDirectToDeadLetter_True_Should_Use_ErrDescription_In_Header", func(t *testing.T) {
411+
// Given
412+
mdlp := &mockDeadLetterProducer{}
413+
bc := batchConsumer{
414+
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), deadLetterProducer: mdlp},
415+
consumeFn: func(_ []*Message) error { return errors.New("ignored by ErrDescription") },
416+
}
417+
msgs := []*Message{{Key: []byte("2"), Value: []byte("bar"), SendDirectToDeadLetter: true, ErrDescription: "custom direct error"}}
418+
419+
// When
420+
bc.process(msgs)
421+
422+
// Then
423+
if mdlp.produceCalled != 1 {
424+
t.Fatalf("dead letter producer must be called once, got %d", mdlp.produceCalled)
425+
}
426+
if len(mdlp.received) != 1 {
427+
t.Fatalf("dead letter received length must be 1, got %d", len(mdlp.received))
428+
}
429+
produced := mdlp.received[0]
430+
assertErrHeader(t, produced, "custom direct error")
431+
})
432+
433+
t.Run("When_DeadLetter_Producer_Fails_Should_Panic_After_Backoff", func(t *testing.T) {
434+
// Given
435+
fdlp := &failingDeadLetterProducer{}
436+
bc := batchConsumer{
437+
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), deadLetterProducer: fdlp},
438+
consumeFn: func(_ []*Message) error { return errors.New("err occurred") },
439+
}
440+
msgs := []*Message{{Key: []byte("1"), Value: []byte("foo"), SendDirectToDeadLetter: true}}
441+
442+
defer func() {
443+
if r := recover(); r == nil {
444+
t.Errorf("The code did not panic")
445+
}
446+
if fdlp.called != 5 {
447+
t.Fatalf("dead letter producer must be called 5 times with backoff, got %d", fdlp.called)
448+
}
449+
}()
450+
451+
// When && Then
452+
bc.process(msgs)
453+
})
349454
}
350455

351456
func Test_batchConsumer_Pause(t *testing.T) {

consumer.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,24 @@ func (c *consumer) process(message *Message) {
161161
consumeErr := c.consumeFn(message)
162162

163163
if consumeErr != nil {
164+
if message.SendDirectToDeadLetter {
165+
message.AddHeader(Header{
166+
Key: errMessageKey,
167+
Value: []byte(getErrorMessage(consumeErr, message)),
168+
})
169+
message.Topic = "" // we set on initialize for dead letter producer
170+
if err := c.sendToDeadLetterWithBackoff(*message); err != nil {
171+
errorMessage := fmt.Sprintf(
172+
"Error producing message %s to dead letter topic. Error: %s",
173+
string(message.Value), err.Error())
174+
c.logger.Error(errorMessage)
175+
panic(err.Error())
176+
}
177+
178+
c.metric.IncrementTotalUnprocessedMessagesCounter(1)
179+
return
180+
}
181+
164182
if c.transactionalRetry {
165183
c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error())
166184
// Try to process same message again
@@ -174,7 +192,7 @@ func (c *consumer) process(message *Message) {
174192
}
175193

176194
if consumeErr != nil && c.retryEnabled {
177-
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
195+
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr)
178196
if err := c.retryWithBackoff(retryableMsg); err != nil {
179197
errorMessage := fmt.Sprintf(
180198
"Error producing message %s to exception/retry topic %s. Error: %s",
@@ -188,18 +206,3 @@ func (c *consumer) process(message *Message) {
188206
c.metric.IncrementTotalProcessedMessagesCounter(1)
189207
}
190208
}
191-
192-
func (c *consumer) retryWithBackoff(retryableMsg kcronsumer.Message) error {
193-
var produceErr error
194-
195-
for attempt := 1; attempt <= 5; attempt++ {
196-
produceErr = c.cronsumer.Produce(retryableMsg)
197-
if produceErr == nil {
198-
return nil
199-
}
200-
c.logger.Warnf("Error producing message (attempt %d/%d): %v", attempt, 5, produceErr)
201-
time.Sleep((50 * time.Millisecond) * time.Duration(1<<attempt))
202-
}
203-
204-
return produceErr
205-
}

0 commit comments

Comments
 (0)