Skip to content

Commit

Permalink
Merge pull request #101 from 1047Games/main
Browse files Browse the repository at this point in the history
Add support for AWS IAM Authenticated Kafka Instances
  • Loading branch information
wallyqs committed Jan 4, 2024
2 parents bd232ef + 50f1e5e commit a2281b5
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 0 deletions.
11 changes: 11 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ connect: [
id: "zap",
topic: "test2",
subject: "two",
},{
type: "NATSToKafka",
brokers: ["localhost:9098"]
id: "zap",
topic: "test2",
subject: "two",
iam: {
enable: true,
region: "us-west-2"
},
},
],
```
Expand Down Expand Up @@ -269,6 +279,7 @@ All connectors must specify Kafka connection properties, with a few optional set
* `balancer` - required for a writer, should be "hash" or "leastbytes"
* `groupid` - (exclusive with partition) used by the reader to set a group id
* `partition` - (exclusive with groupid) used by the reader to set a partition
* `iam` - (optional) a dictionary object containing two keys `enable` and `region` which is a boolean and AWS region respectively
* `minbytes` - (optional) used by Kafka readers to set the minimum bytes for a read
* `maxbytes` - (optional) used by a Kafka reader to set the maximum bytes for a read
* `keytype` - (optional) defines the way keys are assigned to messages coming from NATS (see below)
Expand Down
14 changes: 14 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ require (

require (
github.com/armon/go-metrics v0.3.9 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.24.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.26.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.13 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.9 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.9 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.18.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.26.6 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
Expand Down
29 changes: 29 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.9 h1:O2sNqxBdvq8Eq5xmzljcYzAORli6RWCvEym4cJf9m18=
github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA=
github.com/aws/aws-sdk-go-v2 v1.24.0 h1:890+mqQ+hTpNuw0gGP6/4akolQkSToDJgHfQE7AwGuk=
github.com/aws/aws-sdk-go-v2 v1.24.0/go.mod h1:LNh45Br1YAkEKaAqvmE1m8FUx6a5b/V0oAKV7of29b4=
github.com/aws/aws-sdk-go-v2/config v1.26.2 h1:+RWLEIWQIGgrz2pBPAUoGgNGs1TOyF4Hml7hCnYj2jc=
github.com/aws/aws-sdk-go-v2/config v1.26.2/go.mod h1:l6xqvUxt0Oj7PI/SUXYLNyZ9T/yBPn3YTQcJLLOdtR8=
github.com/aws/aws-sdk-go-v2/credentials v1.16.13 h1:WLABQ4Cp4vXtXfOWOS3MEZKr6AAYUpMczLhgKtAjQ/8=
github.com/aws/aws-sdk-go-v2/credentials v1.16.13/go.mod h1:Qg6x82FXwW0sJHzYruxGiuApNo31UEtJvXVSZAXeWiw=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.10 h1:w98BT5w+ao1/r5sUuiH6JkVzjowOKeOJRHERyy1vh58=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.10/go.mod h1:K2WGI7vUvkIv1HoNbfBA1bvIZ+9kL3YVmWxeKuLQsiw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.9 h1:v+HbZaCGmOwnTTVS86Fleq0vPzOd7tnJGbFhP0stNLs=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.9/go.mod h1:Xjqy+Nyj7VDLBtCMkQYOw1QYfAEZCVLrfI0ezve8wd4=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.9 h1:N94sVhRACtXyVcjXxrwK1SKFIJrA9pOJ5yu2eSHnmls=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.9/go.mod h1:hqamLz7g1/4EJP+GH5NBhcUMLjW+gKLQabgyz6/7WAU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2 h1:GrSw8s0Gs/5zZ0SX+gX4zQjRnRsMJDJ2sLur1gRBhEM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.2/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 h1:/b31bi3YVNlkzkBrm9LfpaKoaYZUxIAj4sHfOTmLfqw=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4/go.mod h1:2aGXHFmbInwgP9ZfpmdIfOELL79zhdNYNmReK8qDfdQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.9 h1:Nf2sHxjMJR8CSImIVCONRi4g0Su3J+TSTbS7G0pUeMU=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.9/go.mod h1:idky4TER38YIjr2cADF1/ugFMKvZV7p//pVeV5LZbF0=
github.com/aws/aws-sdk-go-v2/service/sso v1.18.5 h1:ldSFWz9tEHAwHNmjx2Cvy1MjP5/L9kNoR0skc6wyOOM=
github.com/aws/aws-sdk-go-v2/service/sso v1.18.5/go.mod h1:CaFfXLYL376jgbP7VKC96uFcU8Rlavak0UlAwk1Dlhc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 h1:2k9KmFawS63euAkY4/ixVNsYYwrwnd5fIvgEKkfZFNM=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5/go.mod h1:W+nd4wWDVkSUIox9bacmkBP5NMFQeTJ/xqNabpzSR38=
github.com/aws/aws-sdk-go-v2/service/sts v1.26.6 h1:HJeiuZ2fldpd0WqngyMR6KW7ofkXNLyOaHwEIGm39Cs=
github.com/aws/aws-sdk-go-v2/service/sts v1.26.6/go.mod h1:XX5gh4CB7wAs4KhcF46G6C8a2i7eupU19dcAAE+EydU=
github.com/aws/smithy-go v1.19.0 h1:KWFKQV80DpP3vJrrA9sVAHQ5gc2z8i4EzrLhLlWXcBM=
github.com/aws/smithy-go v1.19.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -111,6 +139,7 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jhump/protoreflect v1.10.1 h1:iH+UZfsbRE6vpyZH7asAjTPWJf7RJbpZ9j/N3lDlKs0=
github.com/jhump/protoreflect v1.10.1/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down
6 changes: 6 additions & 0 deletions server/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ type SASL struct {
TLS bool
}

type IAM struct {
Enable bool
Region string
}

// MakeTLSConfig creates a tls.Config from a TLSConf, setting up the key pairs and certs
func (tlsConf *TLSConf) MakeTLSConfig() (*tls.Config, error) {
if tlsConf.Cert == "" || tlsConf.Key == "" {
Expand Down Expand Up @@ -216,6 +221,7 @@ type ConnectorConfig struct {
Topic string // kafka topic
TLS TLSConf // tls config for connecting to the kafka brokers
SASL SASL // SASL config for connecting to the kafka brokers, specifically EventHub
IAM IAM // IAM config for connecting to kafka brokers

MinBytes int64 // used by the Kafka reader (for kafka->nats connectors)
MaxBytes int64 // used by the Kafka reader (for kafka->nats connectors)
Expand Down
8 changes: 8 additions & 0 deletions server/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
}
sc.Net.SASL.User = cc.SASL.User
sc.Net.SASL.Password = cc.SASL.Password
} else if cc.IAM.Enable && cc.IAM.Region != "" {
sc.Net.SASL.Enable = true
sc.Net.SASL.Mechanism = sarama.SASLTypeOAuth
sc.Net.SASL.TokenProvider = &MSKAccessTokenProvider{Region: cc.IAM.Region}
}

if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify {
Expand All @@ -105,6 +109,10 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
} else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = tlsC
} else if cc.IAM.Enable {
tlsConfig := tls.Config{}
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tlsConfig
}

if cc.MinBytes > 0 {
Expand Down
17 changes: 17 additions & 0 deletions server/kafka/iam_token_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kafka

import (
"context"

"github.com/Shopify/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
)

type MSKAccessTokenProvider struct {
Region string // AWS IAM region
}

func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) {
token, _, err := signer.GenerateAuthToken(context.TODO(), m.Region)
return &sarama.AccessToken{Token: token}, err
}
9 changes: 9 additions & 0 deletions server/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ func NewProducer(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig, topic s
}
sc.Net.SASL.User = cc.SASL.User
sc.Net.SASL.Password = cc.SASL.Password
} else if cc.IAM.Enable && cc.IAM.Region != "" {
sc.Net.SASL.Enable = true
sc.Net.SASL.Mechanism = sarama.SASLTypeOAuth
sc.Net.SASL.TokenProvider = &MSKAccessTokenProvider{Region: cc.IAM.Region}
}

if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
Expand All @@ -95,6 +100,10 @@ func NewProducer(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig, topic s
} else if tlsC, err := cc.TLS.MakeTLSConfig(); tlsC != nil && err == nil {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = tlsC
} else if cc.IAM.Enable {
tlsConfig := tls.Config{}
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tlsConfig
}
if cc.SASL.TLS {
sc.Net.TLS.Enable = cc.SASL.TLS
Expand Down

0 comments on commit a2281b5

Please sign in to comment.