Skip to content

Commit 4f90253

Browse files
committed
Fixing kafka sasl auth
1 parent f273bfe commit 4f90253

File tree

1 file changed

+26
-1
lines changed

1 file changed

+26
-1
lines changed

trafficUtil/kafkaUtil/kafka.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,26 @@ import (
1010

1111
"github.com/akto-api-security/api-gateway-logging/trafficUtil/utils"
1212
"github.com/segmentio/kafka-go"
13+
"github.com/segmentio/kafka-go/sasl/plain"
1314
)
1415

1516
var kafkaWriter *kafka.Writer
1617
var KafkaErrMsgCount = 0
1718
var KafkaErrMsgEpoch = time.Now()
1819
var BytesInThreshold = 500 * 1024 * 1024
1920

21+
var isAuthImplemented = false
22+
var kafkaUsername = ""
23+
var kafkaPassword = ""
24+
25+
func init() {
26+
27+
utils.InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented)
28+
utils.InitVar("KAFKA_USERNAME", &kafkaUsername)
29+
utils.InitVar("KAFKA_PASSWORD", &kafkaPassword)
30+
31+
}
32+
2033
func InitKafka() {
2134
kafka_url := os.Getenv("AKTO_KAFKA_BROKER_MAL")
2235

@@ -126,7 +139,7 @@ func Produce(ctx context.Context, message string) error {
126139
}
127140

128141
func getKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Duration) *kafka.Writer {
129-
return &kafka.Writer{
142+
kafkaWriter := kafka.Writer{
130143
Addr: kafka.TCP(kafkaURL),
131144
Topic: topic,
132145
BatchSize: batchSize,
@@ -136,4 +149,16 @@ func getKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur
136149
WriteTimeout: batchTimeout,
137150
Async: true,
138151
}
152+
153+
transport := &kafka.Transport{}
154+
155+
if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" {
156+
transport.SASL = plain.Mechanism{
157+
Username: kafkaUsername,
158+
Password: kafkaPassword,
159+
}
160+
}
161+
162+
kafkaWriter.Transport = transport
163+
return &kafkaWriter
139164
}

0 commit comments

Comments
 (0)