Skip to content

Commit

Permalink
fix: unittest
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 17, 2024
1 parent 8ca30b6 commit 03ad51d
Showing 1 changed file with 2 additions and 8 deletions.
10 changes: 2 additions & 8 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -44,26 +43,21 @@ func TestKafkaProducer_SendSuccess(t *testing.T) {
func TestKafkaProducer_SendFail(t *testing.T) {
kafkaAddress := getKafkaBrokerList()
{
deliveryChan := make(chan kafka.Event, 1)
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-topic-%d", rand.Int())

pp, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaAddress})
assert.NoError(t, err)
producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: topic}
producer := &kafkaProducer{p: pp, stopCh: make(chan struct{}), topic: topic}
close(producer.stopCh)

msg := &common.ProducerMessage{
Payload: []byte{1},
Properties: map[string]string{},
}
var resultMsg kafka.Event = &kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}}
deliveryChan <- resultMsg

ret, err := producer.Send(context.TODO(), msg)
assert.Nil(t, ret)
assert.Error(t, err)

producer.Close()
}
}

Expand Down

0 comments on commit 03ad51d

Please sign in to comment.