Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kafka use shared channel to receive produce result #38532

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ func (kc *kafkaClient) CreateProducer(ctx context.Context, options common.Produc
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc()

deliveryChan := make(chan kafka.Event, 128)
producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: options.Topic}
producer := &kafkaProducer{p: pp, stopCh: make(chan struct{}), topic: options.Topic}
return producer, nil
}

Expand Down
29 changes: 16 additions & 13 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
)

type kafkaProducer struct {
p *kafka.Producer
topic string
deliveryChan chan kafka.Event
closeOnce sync.Once
isClosed bool
p *kafka.Producer
topic string
closeOnce sync.Once
isClosed bool
stopCh chan struct{}
}

func (kp *kafkaProducer) Topic() string {
Expand All @@ -44,24 +44,28 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes
header := kafka.Header{Key: key, Value: []byte(value)}
headers = append(headers, header)
}

resultCh := make(chan kafka.Event, 1)
err := kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
Headers: headers,
}, kp.deliveryChan)
}, resultCh)
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
return nil, err
}

e, ok := <-kp.deliveryChan
if !ok {
var m *kafka.Message
select {
case <-kp.stopCh:
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic))
return nil, common.NewIgnorableError(fmt.Errorf("delivery chan of kafka producer is closed"))
log.Error("kafka produce message fail because of kafka producer is closed", zap.String("topic", kp.topic))
return nil, common.NewIgnorableError(fmt.Errorf("kafka producer is closed"))
case e := <-resultCh:
m = e.(*kafka.Message)
}

m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
return nil, m.TopicPartition.Error
Expand All @@ -86,8 +90,7 @@ func (kp *kafkaProducer) Close() {
log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.String("topic", kp.topic))
}

close(kp.deliveryChan)

close(kp.stopCh)
cost := time.Since(start).Milliseconds()
if cost > 500 {
log.Debug("kafka producer is closed", zap.String("topic", kp.topic), zap.Int64("time cost(ms)", cost))
Expand Down
10 changes: 2 additions & 8 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -44,26 +43,21 @@ func TestKafkaProducer_SendSuccess(t *testing.T) {
func TestKafkaProducer_SendFail(t *testing.T) {
kafkaAddress := getKafkaBrokerList()
{
deliveryChan := make(chan kafka.Event, 1)
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-topic-%d", rand.Int())

pp, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaAddress})
assert.NoError(t, err)
producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: topic}
producer := &kafkaProducer{p: pp, stopCh: make(chan struct{}), topic: topic}
close(producer.stopCh)

msg := &common.ProducerMessage{
Payload: []byte{1},
Properties: map[string]string{},
}
var resultMsg kafka.Event = &kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}}
deliveryChan <- resultMsg

ret, err := producer.Send(context.TODO(), msg)
assert.Nil(t, ret)
assert.Error(t, err)

producer.Close()
}
}

Expand Down
Loading