@@ -12,6 +12,7 @@ import (
1212
1313 trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload"
1414 "github.com/segmentio/kafka-go"
15+ "github.com/segmentio/kafka-go/sasl/plain"
1516 "google.golang.org/protobuf/proto"
1617)
1718
@@ -29,12 +30,21 @@ var useTLS = false
2930var InsecureSkipVerify = true
3031var tlsCACertPath = "./ca.crt"
3132
33+ var isAuthImplemented = false
34+ var kafkaUsername = ""
35+ var kafkaPassword = ""
36+
3237func init () {
3338
3439 InitVar ("USE_TLS" , & useTLS )
3540 InitVar ("INSECURE_SKIP_VERIFY" , & InsecureSkipVerify )
3641 InitVar ("TLS_CA_CERT_PATH" , & tlsCACertPath )
3742
43+ // Initialize SASL authentication variables
44+ InitVar ("IS_AUTH_IMPLEMENTED" , & isAuthImplemented )
45+ InitVar ("KAFKA_USERNAME" , & kafkaUsername )
46+ InitVar ("KAFKA_PASSWORD" , & kafkaPassword )
47+
3848}
3949
4050func Produce (kafkaWriter * kafka.Writer , ctx context.Context , value * trafficpb.HttpResponseParam ) error {
@@ -132,12 +142,24 @@ func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur
132142 Compression : kafka .Zstd ,
133143 }
134144
145+ // Configure transport with TLS and/or SASL authentication
146+ transport := & kafka.Transport {}
147+
135148 if useTLS {
136149 tlsConfig , _ := NewTLSConfig (tlsCACertPath )
137- kafkaWriter .Transport = & kafka.Transport {
138- TLS : tlsConfig ,
150+ transport .TLS = tlsConfig
151+ }
152+
153+ // Add SASL authentication if enabled
154+ if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" {
155+ slog .Info ("Configuring SASL plain authentication" , "username" , kafkaUsername )
156+ transport .SASL = plain.Mechanism {
157+ Username : kafkaUsername ,
158+ Password : kafkaPassword ,
139159 }
140160 }
161+
162+ kafkaWriter .Transport = transport
141163 return & kafkaWriter
142164}
143165
@@ -152,13 +174,25 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential {
152174 MaxBytes : 10e6 , // 10MB
153175 }
154176
177+ // Configure dialer with TLS and/or SASL authentication
178+ dialer := & kafka.Dialer {}
179+
155180 if useTLS {
156181 tlsConfig , _ := NewTLSConfig (tlsCACertPath )
157- config .Dialer = & kafka.Dialer {
158- TLS : tlsConfig ,
182+ dialer .TLS = tlsConfig
183+ }
184+
185+ // Add SASL authentication if enabled
186+ if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" {
187+ slog .Info ("Configuring SASL plain authentication for reader" , "username" , kafkaUsername )
188+ dialer .SASLMechanism = plain.Mechanism {
189+ Username : kafkaUsername ,
190+ Password : kafkaPassword ,
159191 }
160192 }
161193
194+ config .Dialer = dialer
195+
162196 r := kafka .NewReader (config )
163197
164198 // Set up a context with a timeout
0 commit comments