Skip to content

Commit

Permalink
SASL Mechanism: AWS MSK IAM (making requested edits) (#798)
Browse files Browse the repository at this point in the history
Co-authored-by: Achille <[email protected]>
Co-authored-by: Christian Maher <[email protected]>
  • Loading branch information
3 people authored Nov 24, 2021
1 parent 2e02f37 commit e88d48a
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 3 deletions.
14 changes: 11 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ jobs:
- checkout
- restore_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
- run: go mod download
- run:
name: Download dependencies
command: go mod download
- save_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
paths:
- /go/pkg/mod
- run: go test -race -cover ./...
- run:
name: Test kafka-go
command: go test -race -cover ./...
- run:
name: Test kafka-go/sasl/aws_msk_iam
working_directory: ./sasl/aws_msk_iam
command: go test -race -cover ./...

# Starting at version 0.11, the kafka features and configuration remained
# mostly stable, so we can use this CI job configuration as template for other
Expand Down Expand Up @@ -219,7 +227,7 @@ jobs:
- 9093:9093
environment: *environment
steps: *steps

workflows:
version: 2
run:
Expand Down
8 changes: 8 additions & 0 deletions sasl/aws_msk_iam/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/segmentio/kafka-go/sasl/aws_msk_iam

go 1.15

require (
github.com/aws/aws-sdk-go v1.41.3
github.com/segmentio/kafka-go v0.4.24
)
60 changes: 60 additions & 0 deletions sasl/aws_msk_iam/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
github.com/aws/aws-sdk-go v1.41.3 h1:deglLZ1jjHdhkd6Rbad1MZM4gL+1pfnTfjuFk6CGJFM=
github.com/aws/aws-sdk-go v1.41.3/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.24 h1:R3tYSYxyLK3SknDIU15LtpDdq59gRg2/J0GKhDFXrBQ=
github.com/segmentio/kafka-go v0.4.24/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
124 changes: 124 additions & 0 deletions sasl/aws_msk_iam/msk_iam.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package aws_msk_iam

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"runtime"
"strings"
"time"

sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/segmentio/kafka-go/sasl"
)

const (
// These constants come from https://github.com/aws/aws-msk-iam-auth#details and
// https://github.com/aws/aws-msk-iam-auth/blob/main/src/main/java/software/amazon/msk/auth/iam/internals/AWS4SignedPayloadGenerator.java.
signVersion = "2020_10_22"
signService = "kafka-cluster"
signAction = "kafka-cluster:Connect"
signVersionKey = "version"
signHostKey = "host"
signUserAgentKey = "user-agent"
signActionKey = "action"
queryActionKey = "Action"
)

var signUserAgent = fmt.Sprintf("kafka-go/sasl/aws_msk_iam/%s", runtime.Version())

// Mechanism implements sasl.Mechanism for the AWS_MSK_IAM mechanism, based on the official java implementation:
// https://github.com/aws/aws-msk-iam-auth
type Mechanism struct {
// The sigv4.Signer to use when signing the request. Required.
Signer *sigv4.Signer
// The region where the msk cluster is hosted, e.g. "us-east-1". Required.
Region string
// The time the request is planned for. Optional, defaults to time.Now() at time of authentication.
SignTime time.Time
// The duration for which the presigned request is active. Optional, defaults to 5 minutes.
Expiry time.Duration
}

func (m *Mechanism) Name() string {
return "AWS_MSK_IAM"
}

// Start produces the authentication values required for AWS_MSK_IAM. It produces the following json as a byte array,
// making use of the aws-sdk to produce the signed output.
// {
// "version" : "2020_10_22",
// "host" : "<broker host>",
// "user-agent": "<user agent string from the client>",
// "action": "kafka-cluster:Connect",
// "x-amz-algorithm" : "<algorithm>",
// "x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
// "x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
// "x-amz-security-token" : "<clientAWSSessionToken if any>",
// "x-amz-signedheaders" : "host",
// "x-amz-expires" : "<expiration in seconds>",
// "x-amz-signature" : "<AWS SigV4 signature computed by the client>"
// }
func (m *Mechanism) Start(ctx context.Context) (sess sasl.StateMachine, ir []byte, err error) {
saslMeta := sasl.MetadataFromContext(ctx)
if saslMeta == nil {
return nil, nil, errors.New("missing sasl metadata")
}

query := url.Values{
queryActionKey: {signAction},
}

signUrl := url.URL{
Scheme: "kafka",
Host: saslMeta.Host,
Path: "/",
RawQuery: query.Encode(),
}

req, err := http.NewRequest("GET", signUrl.String(), nil)
if err != nil {
return nil, nil, err
}

signTime := m.SignTime
if signTime.IsZero() {
signTime = time.Now()
}

expiry := m.Expiry
if expiry == 0 {
expiry = 5 * time.Minute
}

header, err := m.Signer.Presign(req, nil, signService, m.Region, expiry, signTime)
if err != nil {
return nil, nil, err
}
signedMap := map[string]string{
signVersionKey: signVersion,
signHostKey: signUrl.Host,
signUserAgentKey: signUserAgent,
signActionKey: signAction,
}
// The protocol requires lowercase keys.
for key, vals := range header {
signedMap[strings.ToLower(key)] = vals[0]
}
for key, vals := range req.URL.Query() {
signedMap[strings.ToLower(key)] = vals[0]
}

signedJson, err := json.Marshal(signedMap)
return m, signedJson, err
}

func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
// After the initial step, the authentication is complete
// kafka will return error if it rejected the credentials, so we'll only
// arrive here on success.
return true, nil, nil
}
105 changes: 105 additions & 0 deletions sasl/aws_msk_iam/msk_iam_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package aws_msk_iam

import (
"bytes"
"context"
"encoding/json"
"testing"
"time"

"github.com/segmentio/kafka-go/sasl"

"github.com/aws/aws-sdk-go/aws/credentials"
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
)

const (
accessKeyId = "ACCESS_KEY"
secretAccessKey = "SECRET_KEY"
)

// using a fixed time allows the signature to be verifiable in a test
var signTime = time.Date(2021, 10, 14, 13, 5, 0, 0, time.UTC)

func TestAwsMskIamMechanism(t *testing.T) {
tests := []struct {
description string
ctx func() context.Context
shouldFail bool
}{
{
description: "with metadata",
ctx: func() context.Context {
return sasl.WithMetadata(context.Background(), &sasl.Metadata{
Host: "localhost",
Port: 9092,
})
},
},
{
description: "without metadata",
ctx: func() context.Context {
return context.Background()
},
shouldFail: true,
},
}

for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
ctx := tt.ctx()

creds := credentials.NewStaticCredentials(accessKeyId, secretAccessKey, "")
mskMechanism := &Mechanism{
Signer: sigv4.NewSigner(creds),
Region: "us-east-1",
SignTime: signTime,
}

sess, auth, err := mskMechanism.Start(ctx)
if tt.shouldFail { // if error is expected
if err == nil { // but we don't find one
t.Fatal("error expected")
} else { // but we do find one
return // return early since the remaining assertions are irrelevant
}
} else { // if error is not expected (typical)
if err != nil { // but we do find one
t.Fatal(err)
}
}

if sess != mskMechanism {
t.Error(
"Unexpected session",
"expected", mskMechanism,
"got", sess,
)
}

expectedMap := map[string]string{
"version": "2020_10_22",
"action": "kafka-cluster:Connect",
"host": "localhost",
"user-agent": signUserAgent,
"x-amz-algorithm": "AWS4-HMAC-SHA256",
"x-amz-credential": "ACCESS_KEY/20211014/us-east-1/kafka-cluster/aws4_request",
"x-amz-date": "20211014T130500Z",
"x-amz-expires": "300",
"x-amz-signedheaders": "host",
"x-amz-signature": "6b8d25f9b45b9c7db9da855a49112d80379224153a27fd279c305a5b7940d1a7",
}
expectedAuth, err := json.Marshal(expectedMap)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(expectedAuth, auth) {
t.Error("Unexpected authentication",
"expected", expectedAuth,
"got", auth,
)
}
})
}
}

0 comments on commit e88d48a

Please sign in to comment.