From 90e4e26d2e2f3afb346c568db59a5e9a1a7769ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaime=20Pi=C3=B1a?= Date: Tue, 9 Feb 2021 17:30:20 -0800 Subject: [PATCH] Support connections with SASL and TLS (#30) Currently, users can only choose between SASL and TLS insecure skip verify and full TLS. This change allows users to use SASL with full TLS. --- server/core/connector.go | 3 +++ server/core/kafka2nats.go | 3 +++ server/core/kafka2stan.go | 3 +++ server/kafka/consumer.go | 43 ++++++++++++++++++++++++++++--------- server/kafka/manager.go | 15 ++++++------- server/kafka/producer.go | 45 +++++++++++++++++++++++++++++++-------- 6 files changed, 85 insertions(+), 27 deletions(-) diff --git a/server/core/connector.go b/server/core/connector.go index 6f13a8c..c790cf2 100644 --- a/server/core/connector.go +++ b/server/core/connector.go @@ -392,6 +392,9 @@ func (conn *BridgeConnector) writer(msg interface{}) kafka.Producer { } else { w = sp } + if s, ok := w.(interface{ NetInfo() string }); ok { + conn.bridge.Logger().Noticef(s.NetInfo()) + } conn.writers.Store(h, w) } diff --git a/server/core/kafka2nats.go b/server/core/kafka2nats.go index 2e371dc..27487db 100644 --- a/server/core/kafka2nats.go +++ b/server/core/kafka2nats.go @@ -55,6 +55,9 @@ func (conn *Kafka2NATSConnector) Start() error { return err } conn.reader = r + if s, ok := conn.reader.(interface{ NetInfo() string }); ok { + conn.bridge.Logger().Noticef(s.NetInfo()) + } cb, err := conn.setUpListener(conn.reader, conn.natsMessageHandler) if err != nil { diff --git a/server/core/kafka2stan.go b/server/core/kafka2stan.go index cf988e3..dc09309 100644 --- a/server/core/kafka2stan.go +++ b/server/core/kafka2stan.go @@ -55,6 +55,9 @@ func (conn *Kafka2StanConnector) Start() error { if err != nil { return fmt.Errorf("failed to create consumer: %w", err) } + if s, ok := conn.reader.(interface{ NetInfo() string }); ok { + conn.bridge.Logger().Noticef(s.NetInfo()) + } cb, err := conn.setUpListener(conn.reader, conn.stanMessageHandler) if err != nil { diff --git a/server/kafka/consumer.go b/server/kafka/consumer.go index e07ab9c..b113af7 100644 --- a/server/kafka/consumer.go +++ b/server/kafka/consumer.go @@ -46,6 +46,10 @@ type saramaConsumer struct { groupMode bool topic string + saslOn bool + tlsOn bool + tlsSkipVerify bool + c sarama.Consumer pc sarama.PartitionConsumer @@ -66,15 +70,14 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer, sc.Net.SASL.Mechanism = sarama.SASLTypePlaintext sc.Net.SASL.User = cc.SASL.User sc.Net.SASL.Password = cc.SASL.Password - - if cc.SASL.InsecureSkipVerify { - sc.Net.TLS.Enable = true - sc.Net.TLS.Config = &tls.Config{ - InsecureSkipVerify: cc.SASL.InsecureSkipVerify, - } + } + if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify { + sc.Net.TLS.Enable = true + sc.Net.TLS.Config = &tls.Config{ + InsecureSkipVerify: cc.SASL.InsecureSkipVerify, } - } else if tlsC, err := cc.TLS.MakeTLSConfig(); err == nil { - sc.Net.TLS.Enable = (tlsC != nil) + } else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil { + sc.Net.TLS.Enable = true sc.Net.TLS.Config = tlsC } @@ -86,8 +89,11 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer, } cons := &saramaConsumer{ - groupMode: cc.GroupID != "", - topic: cc.Topic, + groupMode: cc.GroupID != "", + topic: cc.Topic, + saslOn: sc.Net.SASL.Enable, + tlsOn: sc.Net.TLS.Enable, + tlsSkipVerify: cc.SASL.InsecureSkipVerify, fetchCh: make(chan *sarama.ConsumerMessage), commitCh: make(chan *sarama.ConsumerMessage), @@ -127,6 +133,23 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer, return cons, nil } +func (c *saramaConsumer) NetInfo() string { + saslInfo := "SASL disabled" + if c.saslOn { + saslInfo = "SASL enabled" + } + + tlsInfo := "TLS disabled" + if c.tlsOn { + tlsInfo = "TLS enabled" + } + if c.tlsSkipVerify { + tlsInfo += " (insecure skip verify)" + } + + return fmt.Sprintf("%s, %s", saslInfo, tlsInfo) +} + func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) { if c.groupMode { select { diff --git a/server/kafka/manager.go b/server/kafka/manager.go index 51b58da..c646cde 100644 --- a/server/kafka/manager.go +++ b/server/kafka/manager.go @@ -44,15 +44,14 @@ func NewManager(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig) (Manager sc.Net.SASL.Mechanism = sarama.SASLTypePlaintext sc.Net.SASL.User = cc.SASL.User sc.Net.SASL.Password = cc.SASL.Password - - if cc.SASL.InsecureSkipVerify { - sc.Net.TLS.Enable = true - sc.Net.TLS.Config = &tls.Config{ - InsecureSkipVerify: cc.SASL.InsecureSkipVerify, - } + } + if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify { + sc.Net.TLS.Enable = true + sc.Net.TLS.Config = &tls.Config{ + InsecureSkipVerify: cc.SASL.InsecureSkipVerify, } - } else if tlsC, err := cc.TLS.MakeTLSConfig(); err == nil { - sc.Net.TLS.Enable = (tlsC != nil) + } else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil { + sc.Net.TLS.Enable = true sc.Net.TLS.Config = tlsC } diff --git a/server/kafka/producer.go b/server/kafka/producer.go index 55adda6..95c0679 100644 --- a/server/kafka/producer.go +++ b/server/kafka/producer.go @@ -19,6 +19,7 @@ package kafka import ( "crypto/tls" "errors" + "fmt" "time" "github.com/Shopify/sarama" @@ -33,6 +34,10 @@ type Producer interface { type saramaProducer struct { sp sarama.SyncProducer topic string + + saslOn bool + tlsOn bool + tlsSkipVerify bool } func IsTopicExist(err error) bool { @@ -55,15 +60,14 @@ func NewProducer(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig, topic s sc.Net.SASL.Mechanism = sarama.SASLTypePlaintext sc.Net.SASL.User = cc.SASL.User sc.Net.SASL.Password = cc.SASL.Password - - if cc.SASL.InsecureSkipVerify { - sc.Net.TLS.Enable = true - sc.Net.TLS.Config = &tls.Config{ - InsecureSkipVerify: cc.SASL.InsecureSkipVerify, - } + } + if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify { + sc.Net.TLS.Enable = true + sc.Net.TLS.Config = &tls.Config{ + InsecureSkipVerify: cc.SASL.InsecureSkipVerify, } - } else if tlsC, err := cc.TLS.MakeTLSConfig(); err == nil { - sc.Net.TLS.Enable = (tlsC != nil) + } else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil { + sc.Net.TLS.Enable = true sc.Net.TLS.Config = tlsC } @@ -72,7 +76,30 @@ func NewProducer(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig, topic s return nil, err } - return &saramaProducer{sp: sp, topic: topic}, nil + return &saramaProducer{ + sp: sp, + topic: topic, + saslOn: sc.Net.SASL.Enable, + tlsOn: sc.Net.TLS.Enable, + tlsSkipVerify: cc.SASL.InsecureSkipVerify, + }, nil +} + +func (p *saramaProducer) NetInfo() string { + saslInfo := "SASL disabled" + if p.saslOn { + saslInfo = "SASL enabled" + } + + tlsInfo := "TLS disabled" + if p.tlsOn { + tlsInfo = "TLS enabled" + } + if p.tlsSkipVerify { + tlsInfo += " (insecure skip verify)" + } + + return fmt.Sprintf("%s, %s", saslInfo, tlsInfo) } func (p *saramaProducer) Write(m Message) error {