Skip to content

Commit f9bc1d6

Browse files
authored
Merge pull request #122 from Trendyol/fix/issue-nontransactional-errormessage
fix: Transactional Retry false x-error-message bug
2 parents 474c038 + 532d1a4 commit f9bc1d6

File tree

10 files changed

+263
-42
lines changed

10 files changed

+263
-42
lines changed

batch_consumer.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kafka
22

33
import (
4+
"errors"
45
"time"
56

67
"github.com/prometheus/client_golang/prometheus"
@@ -41,9 +42,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
4142
}
4243

4344
if cfg.RetryEnabled {
44-
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
45-
return c.consumeFn([]*Message{toMessage(message)})
46-
})
45+
c.base.setupCronsumer(cfg, c.runKonsumerFn)
4746
}
4847

4948
if cfg.APIEnabled {
@@ -53,6 +52,16 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
5352
return &c, nil
5453
}
5554

55+
func (b *batchConsumer) runKonsumerFn(message kcronsumer.Message) error {
56+
msgList := []*Message{toMessage(message)}
57+
58+
err := b.consumeFn(msgList)
59+
if msgList[0].ErrDescription != "" {
60+
err = errors.New(msgList[0].ErrDescription)
61+
}
62+
return err
63+
}
64+
5665
func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector {
5766
return b.base.GetMetricCollectors()
5867
}
@@ -176,14 +185,15 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
176185

177186
if consumeErr != nil && b.retryEnabled {
178187
cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages))
188+
errorMessage := consumeErr.Error()
179189
if b.transactionalRetry {
180190
for i := range chunkMessages {
181-
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
191+
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
182192
}
183193
} else {
184194
for i := range chunkMessages {
185195
if chunkMessages[i].IsFailed {
186-
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
196+
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
187197
}
188198
}
189199
}

batch_consumer_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,41 @@ func Test_batchConsumer_Resume(t *testing.T) {
404404
}
405405
}
406406

407+
func Test_batchConsumer_runKonsumerFn(t *testing.T) {
408+
t.Run("Should_Return_Default_Error_When_Error_Description_Does_Not_Exist", func(t *testing.T) {
409+
// Given
410+
expectedError := errors.New("default error")
411+
bc := batchConsumer{consumeFn: func(messages []*Message) error {
412+
return expectedError
413+
}}
414+
415+
// When
416+
actualError := bc.runKonsumerFn(kcronsumer.Message{})
417+
418+
// Then
419+
if actualError.Error() != expectedError.Error() {
420+
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
421+
}
422+
})
423+
424+
t.Run("Should_Return_Message_Error_Description_When_Error_Description_Exist", func(t *testing.T) {
425+
// Given
426+
expectedError := errors.New("message error description")
427+
bc := batchConsumer{consumeFn: func(messages []*Message) error {
428+
messages[0].ErrDescription = "message error description"
429+
return errors.New("default error")
430+
}}
431+
432+
// When
433+
actualError := bc.runKonsumerFn(kcronsumer.Message{})
434+
435+
// Then
436+
if actualError.Error() != expectedError.Error() {
437+
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
438+
}
439+
})
440+
}
441+
407442
func createMessages(partitionStart int, partitionEnd int) []*Message {
408443
messages := make([]*Message, 0)
409444
for i := partitionStart; i < partitionEnd; i++ {

consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (c *consumer) process(message *Message) {
146146
}
147147

148148
if consumeErr != nil && c.retryEnabled {
149-
retryableMsg := message.toRetryableMessage(c.retryTopic)
149+
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
150150
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
151151
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
152152
string(retryableMsg.Value), produceErr.Error())

examples/with-kafka-transactional-retry-disabled/main.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"github.com/Trendyol/kafka-konsumer/v2"
@@ -10,6 +11,16 @@ import (
1011
)
1112

1213
func main() {
14+
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
15+
Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"},
16+
})
17+
18+
producer.ProduceBatch(context.Background(), []kafka.Message{
19+
{Key: []byte("key1"), Value: []byte("message1")},
20+
{Key: []byte("key2"), Value: []byte("message2")},
21+
{Key: []byte("key3"), Value: []byte("message3")},
22+
})
23+
1324
consumerCfg := &kafka.ConsumerConfig{
1425
Reader: kafka.ReaderConfig{
1526
Brokers: []string{"localhost:29092"},
@@ -25,11 +36,11 @@ func main() {
2536
RetryConfiguration: kafka.RetryConfiguration{
2637
Brokers: []string{"localhost:29092"},
2738
Topic: "retry-topic",
28-
StartTimeCron: "*/5 * * * *",
29-
WorkDuration: 4 * time.Minute,
39+
StartTimeCron: "*/1 * * * *",
40+
WorkDuration: 20 * time.Second,
3041
MaxRetry: 3,
3142
},
32-
MessageGroupDuration: time.Second,
43+
MessageGroupDuration: 5 * time.Second,
3344
}
3445

3546
consumer, _ := kafka.NewConsumer(consumerCfg)
@@ -43,13 +54,19 @@ func main() {
4354
<-c
4455
}
4556

46-
// In order to load topic with data, use:
47-
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
4857
func batchConsumeFn(messages []*kafka.Message) error {
4958
// you can add custom error handling here & flag messages
5059
for i := range messages {
51-
if i%2 == 0 {
60+
if i < 2 {
5261
messages[i].IsFailed = true
62+
63+
var retryCount string
64+
retryCountHeader := messages[i].Header("x-retry-count")
65+
if retryCountHeader != nil {
66+
retryCount = string(retryCountHeader.Value)
67+
}
68+
69+
messages[i].ErrDescription = fmt.Sprintf("Key = %s error, retry count %s", string(messages[i].Key), retryCount)
5370
}
5471
}
5572

message.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import (
99
"github.com/segmentio/kafka-go/protocol"
1010
)
1111

12+
const (
13+
errMessageKey = "x-error-message"
14+
)
15+
1216
type Header = protocol.Header
1317

1418
type Message struct {
@@ -27,6 +31,12 @@ type Message struct {
2731

2832
// IsFailed Is only used on transactional retry disabled
2933
IsFailed bool
34+
35+
// ErrDescription specifies the IsFailed message's error
36+
37+
// If available, kafka-konsumer writes this description into the failed message's
38+
// headers as `x-error-message` key when producing retry topic
39+
ErrDescription string
3040
}
3141

3242
type IncomingMessage struct {
@@ -63,7 +73,7 @@ func fromKafkaMessage(kafkaMessage *kafka.Message) *Message {
6373
return message
6474
}
6575

66-
func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message {
76+
func (m *Message) toRetryableMessage(retryTopic, consumeError string) kcronsumer.Message {
6777
headers := make([]kcronsumer.Header, 0, len(m.Headers))
6878
for i := range m.Headers {
6979
headers = append(headers, kcronsumer.Header{
@@ -72,6 +82,18 @@ func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message {
7282
})
7383
}
7484

85+
if m.ErrDescription == "" {
86+
headers = append(headers, kcronsumer.Header{
87+
Key: errMessageKey,
88+
Value: []byte(consumeError),
89+
})
90+
} else {
91+
headers = append(headers, kcronsumer.Header{
92+
Key: errMessageKey,
93+
Value: []byte(m.ErrDescription),
94+
})
95+
}
96+
7597
return kcronsumer.NewMessageBuilder().
7698
WithKey(m.Key).
7799
WithValue(m.Value).

message_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"bytes"
55
"testing"
66

7+
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
8+
79
"github.com/segmentio/kafka-go"
810
)
911

@@ -113,3 +115,135 @@ func TestMessage_RemoveHeader(t *testing.T) {
113115
t.Fatalf("Header length must be equal to 0")
114116
}
115117
}
118+
119+
func TestMessage_toRetryableMessage(t *testing.T) {
120+
t.Run("When_error_description_exist", func(t *testing.T) {
121+
// Given
122+
message := Message{
123+
Key: []byte("key"),
124+
Value: []byte("value"),
125+
Headers: []Header{
126+
{
127+
Key: "x-custom-client-header",
128+
Value: []byte("bar"),
129+
},
130+
},
131+
ErrDescription: "some error description",
132+
}
133+
expected := kcronsumer.Message{
134+
Topic: "retry-topic",
135+
Key: []byte("key"),
136+
Value: []byte("value"),
137+
Headers: []kcronsumer.Header{
138+
{
139+
Key: "x-custom-client-header",
140+
Value: []byte("bar"),
141+
},
142+
{
143+
Key: "x-error-message",
144+
Value: []byte("some error description"),
145+
},
146+
},
147+
}
148+
149+
// When
150+
actual := message.toRetryableMessage("retry-topic", "consumeFn error")
151+
152+
// Then
153+
if actual.Topic != expected.Topic {
154+
t.Errorf("topic must be %q", expected.Topic)
155+
}
156+
157+
if !bytes.Equal(actual.Key, expected.Key) {
158+
t.Errorf("Key must be equal to %q", string(expected.Key))
159+
}
160+
161+
if !bytes.Equal(actual.Value, expected.Value) {
162+
t.Errorf("Value must be equal to %q", string(expected.Value))
163+
}
164+
165+
if len(actual.Headers) != 2 {
166+
t.Error("Header length must be equal to 2")
167+
}
168+
169+
if actual.Headers[0].Key != expected.Headers[0].Key {
170+
t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key)
171+
}
172+
173+
if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) {
174+
t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value)
175+
}
176+
177+
if actual.Headers[1].Key != expected.Headers[1].Key {
178+
t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key)
179+
}
180+
181+
if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) {
182+
t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value)
183+
}
184+
})
185+
t.Run("When_error_description_does_not_exist", func(t *testing.T) {
186+
// Given
187+
message := Message{
188+
Key: []byte("key"),
189+
Value: []byte("value"),
190+
Headers: []Header{
191+
{
192+
Key: "x-custom-client-header",
193+
Value: []byte("bar"),
194+
},
195+
},
196+
}
197+
expected := kcronsumer.Message{
198+
Topic: "retry-topic",
199+
Key: []byte("key"),
200+
Value: []byte("value"),
201+
Headers: []kcronsumer.Header{
202+
{
203+
Key: "x-custom-client-header",
204+
Value: []byte("bar"),
205+
},
206+
{
207+
Key: "x-error-message",
208+
Value: []byte("consumeFn error"),
209+
},
210+
},
211+
}
212+
213+
// When
214+
actual := message.toRetryableMessage("retry-topic", "consumeFn error")
215+
216+
// Then
217+
if actual.Topic != expected.Topic {
218+
t.Errorf("topic must be %q", expected.Topic)
219+
}
220+
221+
if !bytes.Equal(actual.Key, expected.Key) {
222+
t.Errorf("Key must be equal to %q", string(expected.Key))
223+
}
224+
225+
if !bytes.Equal(actual.Value, expected.Value) {
226+
t.Errorf("Value must be equal to %q", string(expected.Value))
227+
}
228+
229+
if len(actual.Headers) != 2 {
230+
t.Error("Header length must be equal to 2")
231+
}
232+
233+
if actual.Headers[0].Key != expected.Headers[0].Key {
234+
t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key)
235+
}
236+
237+
if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) {
238+
t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value)
239+
}
240+
241+
if actual.Headers[1].Key != expected.Headers[1].Key {
242+
t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key)
243+
}
244+
245+
if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) {
246+
t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value)
247+
}
248+
})
249+
}

test/integration/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: "3.8"
22
services:
33
redpanda:
4-
image: docker.redpanda.com/redpandadata/redpanda:v23.2.4
4+
image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64
55
container_name: redpanda-1
66
command:
77
- redpanda

0 commit comments

Comments
 (0)