From b18a3cfce2932d0e86a9d2af01a320f62ed7ba9a Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 17 Dec 2024 23:10:45 +0800 Subject: [PATCH] fix: kafka use shared channel to receive produce result (#38532) issue: #38531 --------- Signed-off-by: chyezh --- .../msgstream/mqwrapper/kafka/kafka_client.go | 3 +- .../mqwrapper/kafka/kafka_producer.go | 29 ++++++++++--------- .../mqwrapper/kafka/kafka_producer_test.go | 10 ++----- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index a2f41a885d1a9..f2d0bddb10df9 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -220,8 +220,7 @@ func (kc *kafkaClient) CreateProducer(ctx context.Context, options common.Produc metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds())) metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc() - deliveryChan := make(chan kafka.Event, 128) - producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: options.Topic} + producer := &kafkaProducer{p: pp, stopCh: make(chan struct{}), topic: options.Topic} return producer, nil } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go index e199b0353a00c..e525244d5eae1 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -18,11 +18,11 @@ import ( ) type kafkaProducer struct { - p *kafka.Producer - topic string - deliveryChan chan kafka.Event - closeOnce sync.Once - isClosed bool + p *kafka.Producer + topic string + closeOnce sync.Once + isClosed bool + stopCh chan struct{} } func (kp *kafkaProducer) Topic() string { @@ -44,24 +44,28 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes header := kafka.Header{Key: key, Value: []byte(value)} headers = append(headers, header) } + + resultCh := make(chan kafka.Event, 1) err := kp.p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, Value: message.Payload, Headers: headers, - }, kp.deliveryChan) + }, resultCh) if err != nil { metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() return nil, err } - e, ok := <-kp.deliveryChan - if !ok { + var m *kafka.Message + select { + case <-kp.stopCh: metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() - log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic)) - return nil, common.NewIgnorableError(fmt.Errorf("delivery chan of kafka producer is closed")) + log.Error("kafka produce message fail because of kafka producer is closed", zap.String("topic", kp.topic)) + return nil, common.NewIgnorableError(fmt.Errorf("kafka producer is closed")) + case e := <-resultCh: + m = e.(*kafka.Message) } - m := e.(*kafka.Message) if m.TopicPartition.Error != nil { metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() return nil, m.TopicPartition.Error @@ -86,8 +90,7 @@ func (kp *kafkaProducer) Close() { log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.String("topic", kp.topic)) } - close(kp.deliveryChan) - + close(kp.stopCh) cost := time.Since(start).Milliseconds() if cost > 500 { log.Debug("kafka producer is closed", zap.String("topic", kp.topic), zap.Int64("time cost(ms)", cost)) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go index ab8dbad91a99b..ee7db58739d5c 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/cockroachdb/errors" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/stretchr/testify/assert" @@ -44,26 +43,21 @@ func TestKafkaProducer_SendSuccess(t *testing.T) { func TestKafkaProducer_SendFail(t *testing.T) { kafkaAddress := getKafkaBrokerList() { - deliveryChan := make(chan kafka.Event, 1) rand.Seed(time.Now().UnixNano()) topic := fmt.Sprintf("test-topic-%d", rand.Int()) pp, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaAddress}) assert.NoError(t, err) - producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: topic} + producer := &kafkaProducer{p: pp, stopCh: make(chan struct{}), topic: topic} + close(producer.stopCh) msg := &common.ProducerMessage{ Payload: []byte{1}, Properties: map[string]string{}, } - var resultMsg kafka.Event = &kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}} - deliveryChan <- resultMsg - ret, err := producer.Send(context.TODO(), msg) assert.Nil(t, ret) assert.Error(t, err) - - producer.Close() } }