Skip to content

Commit

Permalink
Support connections with SASL and TLS (#30)
Browse files Browse the repository at this point in the history
Currently, users can only choose between SASL and TLS insecure skip verify and
full TLS. This change allows users to use SASL with full TLS.
  • Loading branch information
variadico authored Feb 10, 2021
1 parent 1bbd85e commit 90e4e26
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 27 deletions.
3 changes: 3 additions & 0 deletions server/core/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ func (conn *BridgeConnector) writer(msg interface{}) kafka.Producer {
} else {
w = sp
}
if s, ok := w.(interface{ NetInfo() string }); ok {
conn.bridge.Logger().Noticef(s.NetInfo())
}

conn.writers.Store(h, w)
}
Expand Down
3 changes: 3 additions & 0 deletions server/core/kafka2nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (conn *Kafka2NATSConnector) Start() error {
return err
}
conn.reader = r
if s, ok := conn.reader.(interface{ NetInfo() string }); ok {
conn.bridge.Logger().Noticef(s.NetInfo())
}

cb, err := conn.setUpListener(conn.reader, conn.natsMessageHandler)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions server/core/kafka2stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (conn *Kafka2StanConnector) Start() error {
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}
if s, ok := conn.reader.(interface{ NetInfo() string }); ok {
conn.bridge.Logger().Noticef(s.NetInfo())
}

cb, err := conn.setUpListener(conn.reader, conn.stanMessageHandler)
if err != nil {
Expand Down
43 changes: 33 additions & 10 deletions server/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type saramaConsumer struct {
groupMode bool
topic string

saslOn bool
tlsOn bool
tlsSkipVerify bool

c sarama.Consumer
pc sarama.PartitionConsumer

Expand All @@ -66,15 +70,14 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
sc.Net.SASL.Mechanism = sarama.SASLTypePlaintext
sc.Net.SASL.User = cc.SASL.User
sc.Net.SASL.Password = cc.SASL.Password

if cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: cc.SASL.InsecureSkipVerify,
}
}
if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: cc.SASL.InsecureSkipVerify,
}
} else if tlsC, err := cc.TLS.MakeTLSConfig(); err == nil {
sc.Net.TLS.Enable = (tlsC != nil)
} else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = tlsC
}

Expand All @@ -86,8 +89,11 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
}

cons := &saramaConsumer{
groupMode: cc.GroupID != "",
topic: cc.Topic,
groupMode: cc.GroupID != "",
topic: cc.Topic,
saslOn: sc.Net.SASL.Enable,
tlsOn: sc.Net.TLS.Enable,
tlsSkipVerify: cc.SASL.InsecureSkipVerify,

fetchCh: make(chan *sarama.ConsumerMessage),
commitCh: make(chan *sarama.ConsumerMessage),
Expand Down Expand Up @@ -127,6 +133,23 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
return cons, nil
}

func (c *saramaConsumer) NetInfo() string {
saslInfo := "SASL disabled"
if c.saslOn {
saslInfo = "SASL enabled"
}

tlsInfo := "TLS disabled"
if c.tlsOn {
tlsInfo = "TLS enabled"
}
if c.tlsSkipVerify {
tlsInfo += " (insecure skip verify)"
}

return fmt.Sprintf("%s, %s", saslInfo, tlsInfo)
}

func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) {
if c.groupMode {
select {
Expand Down
15 changes: 7 additions & 8 deletions server/kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ func NewManager(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig) (Manager
sc.Net.SASL.Mechanism = sarama.SASLTypePlaintext
sc.Net.SASL.User = cc.SASL.User
sc.Net.SASL.Password = cc.SASL.Password

if cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: cc.SASL.InsecureSkipVerify,
}
}
if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: cc.SASL.InsecureSkipVerify,
}
} else if tlsC, err := cc.TLS.MakeTLSConfig(); err == nil {
sc.Net.TLS.Enable = (tlsC != nil)
} else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = tlsC
}

Expand Down
45 changes: 36 additions & 9 deletions server/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka
import (
"crypto/tls"
"errors"
"fmt"
"time"

"github.com/Shopify/sarama"
Expand All @@ -33,6 +34,10 @@ type Producer interface {
type saramaProducer struct {
sp sarama.SyncProducer
topic string

saslOn bool
tlsOn bool
tlsSkipVerify bool
}

func IsTopicExist(err error) bool {
Expand All @@ -55,15 +60,14 @@ func NewProducer(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig, topic s
sc.Net.SASL.Mechanism = sarama.SASLTypePlaintext
sc.Net.SASL.User = cc.SASL.User
sc.Net.SASL.Password = cc.SASL.Password

if cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: cc.SASL.InsecureSkipVerify,
}
}
if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: cc.SASL.InsecureSkipVerify,
}
} else if tlsC, err := cc.TLS.MakeTLSConfig(); err == nil {
sc.Net.TLS.Enable = (tlsC != nil)
} else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = tlsC
}

Expand All @@ -72,7 +76,30 @@ func NewProducer(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig, topic s
return nil, err
}

return &saramaProducer{sp: sp, topic: topic}, nil
return &saramaProducer{
sp: sp,
topic: topic,
saslOn: sc.Net.SASL.Enable,
tlsOn: sc.Net.TLS.Enable,
tlsSkipVerify: cc.SASL.InsecureSkipVerify,
}, nil
}

func (p *saramaProducer) NetInfo() string {
saslInfo := "SASL disabled"
if p.saslOn {
saslInfo = "SASL enabled"
}

tlsInfo := "TLS disabled"
if p.tlsOn {
tlsInfo = "TLS enabled"
}
if p.tlsSkipVerify {
tlsInfo += " (insecure skip verify)"
}

return fmt.Sprintf("%s, %s", saslInfo, tlsInfo)
}

func (p *saramaProducer) Write(m Message) error {
Expand Down

0 comments on commit 90e4e26

Please sign in to comment.