From 67d977b9197bd3993b1dfc606f46b5fa097f5bba Mon Sep 17 00:00:00 2001 From: Xargin Date: Thu, 22 Dec 2022 19:31:54 +0800 Subject: [PATCH] fix(producer): return errors for every message in retryBatch to avoid producer hang forever (#2378) * fix(producer): return errors for every message in retryBatch to avoid the producer hang forever * chore: add test Co-authored-by: zcong1993 --- async_producer.go | 2 +- async_producer_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/async_producer.go b/async_producer.go index 61746c8fd..50f226f8e 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1161,7 +1161,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio produceSet.bufferCount += len(pSet.msgs) for _, msg := range pSet.msgs { if msg.retries >= p.conf.Producer.Retry.Max { - p.returnError(msg, kerr) + p.returnErrors(pSet.msgs, kerr) return } msg.retries++ diff --git a/async_producer_test.go b/async_producer_test.go index 7e0c66068..fb895e84f 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1302,6 +1302,75 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { } } +// test case for https://github.com/Shopify/sarama/pull/2378 +func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) { + broker := NewMockBroker(t, 1) + + metadataResponse := &MetadataResponse{ + Version: 1, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) + + initProducerIDResponse := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } + + prodNotLeaderResponse := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) + + handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) { + switch req.body.key() { + case 3: + return metadataResponse + case 22: + return initProducerIDResponse + case 0: // for msg, always return error to trigger retryBatch + return prodNotLeaderResponse + } + return nil + } + + config := NewTestConfig() + config.Version = V0_11_0_0 + config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 + config.Producer.Retry.Max = 1 // set max retry to 1 + config.Producer.RequiredAcks = WaitForAll + config.Producer.Return.Successes = true + config.Producer.Flush.Frequency = 50 * time.Millisecond + config.Producer.Retry.Backoff = 100 * time.Millisecond + + broker.setHandler(handlerFailBeforeWrite) + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + go func() { + for i := 0; i < 7; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")} + time.Sleep(100 * time.Millisecond) + } + }() + + // this will block until 5 minutes timeout before pr 2378 merge + expectResults(t, producer, 0, 10) + + broker.Close() + closeProducer(t, producer) +} + func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { broker := NewMockBroker(t, 1)