Skip to content

Commit

Permalink
feat(audit): implement sink for kafka (#3204)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Dmytrenko <[email protected]>
  • Loading branch information
erka committed Jul 5, 2024
1 parent 8046e27 commit dae061c
Show file tree
Hide file tree
Showing 29 changed files with 1,472 additions and 48 deletions.
41 changes: 41 additions & 0 deletions build/testing/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package testing

import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"math/big"
"regexp"
"strings"
"time"

"github.com/google/go-cmp/cmp"
"go.flipt.io/build/internal/dagger"
Expand Down Expand Up @@ -135,3 +141,38 @@ func assertExec(ctx context.Context, flipt *dagger.Container, args []string, opt

return container, nil
}

// generateTLSCert generates a TLS certificate and private key.
func generateTLSCert(dnsname ...string) (keyBytes, certBytes []byte, err error) {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
return nil, nil, err
}
template := &x509.Certificate{
SerialNumber: serialNumber,
IsCA: true,
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour),
DNSNames: dnsname,
}

key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, nil, err
}
bytes, err := x509.CreateCertificate(rand.Reader, template, template, &key.PublicKey, key)
if err != nil {
return nil, nil, err
}
certBytes = pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: bytes,
})

keyBytes = pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(key),
})
return
}
29 changes: 1 addition & 28 deletions build/testing/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,37 +345,10 @@ func cache(ctx context.Context, _ *dagger.Client, base, flipt *dagger.Container,
}

func cacheWithTLS(ctx context.Context, client *dagger.Client, base, flipt *dagger.Container, conf testConfig) func() error {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
keyBytes, crtBytes, err := generateTLSCert("redis")
if err != nil {
return func() error { return err }
}
template := &x509.Certificate{
SerialNumber: serialNumber,
IsCA: true,
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour),
DNSNames: []string{"redis"},
}

key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return func() error { return err }
}
bytes, err := x509.CreateCertificate(rand.Reader, template, template, &key.PublicKey, key)
if err != nil {
return func() error { return err }
}
crtBytes := pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: bytes,
})

keyBytes := pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(key),
})

redis := client.Container().
From("redis:alpine").
WithExposedPort(6379).
Expand Down
93 changes: 93 additions & 0 deletions build/testing/redpanda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package testing

import (
"context"
"fmt"

_ "embed"

"go.flipt.io/build/internal/dagger"
)

func redpandaTLSService(ctx context.Context, client *dagger.Client, hostAlias, superuser string) (*dagger.Service, error) {
key, cert, err := generateTLSCert(hostAlias)
if err != nil {
return nil, err
}
kafka := client.Container().
From("redpandadata/redpanda").
WithNewFile("/etc/redpanda/.bootstrap.yaml", dagger.ContainerWithNewFileOpts{
Contents: fmt.Sprintf(redpandaBoostrapConfigurationTpl, superuser),
}).
WithNewFile("/etc/redpanda/redpanda.yaml", dagger.ContainerWithNewFileOpts{
Contents: fmt.Sprintf(redpandaConfigurationTpl, hostAlias),
}).
WithNewFile("/etc/redpanda/key.pem", dagger.ContainerWithNewFileOpts{
Contents: string(key),
}).
WithNewFile("/etc/redpanda/cert.pem", dagger.ContainerWithNewFileOpts{
Contents: string(cert),
}).
WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{
Description: "kafka endpoint",
}).
WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{
Description: "schema registry endpoint",
}).
WithExposedPort(9644, dagger.ContainerWithExposedPortOpts{
Description: "admin api endpoint",
}).
AsService()
return kafka, nil
}

var redpandaBoostrapConfigurationTpl = `
superusers:
- %s
kafka_enable_authorization: true
`

var redpandaConfigurationTpl = `
redpanda:
developer_mode: true
admin:
address: 0.0.0.0
port: 9644
kafka_api:
- address: 0.0.0.0
name: external
port: 9092
authentication_method: sasl
- address: 127.0.0.1
name: internal
port: 29092
authentication_method: sasl
advertised_kafka_api:
- address: %s
name: external
port: 9092
- address: localhost
name: internal
port: 29092
kafka_api_tls:
- name: external
enabled: true
cert_file: /etc/redpanda/cert.pem
key_file: /etc/redpanda/key.pem
require_client_auth: false
schema_registry:
schema_registry_api:
- address: "0.0.0.0"
name: main
port: 8081
authentication_method: sasl
schema_registry_client:
brokers:
- address: localhost
port: 29092
`
9 changes: 9 additions & 0 deletions build/testing/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ func Unit(ctx context.Context, client *dagger.Client, flipt *dagger.Container) (
WithEnvVariable("AZURE_STORAGE_ACCOUNT", "devstoreaccount1").
WithEnvVariable("AZURE_STORAGE_KEY", "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==")

// Kafka unit testing
kafka, err := redpandaTLSService(ctx, client, "kafka", "admin")
if err != nil {
return nil, err
}
flipt = flipt.
WithEnvVariable("KAFKA_BOOTSTRAP_SERVER", "kafka").
WithServiceBinding("kafka", kafka)

if goFlags := os.Getenv("GOFLAGS"); goFlags != "" {
flipt = flipt.WithEnvVariable("GOFLAGS", goFlags)
}
Expand Down
13 changes: 13 additions & 0 deletions config/flipt.schema.cue
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,19 @@ import "strings"
cloud?: {
enabled?: bool | *false
}
kafka?: {
enabled?: bool | *false
topic: string
bootstrap_servers: [...string]
encoding?: *"protobuf" | "avro"
schema_registry?: string
require_tls?: bool | *false
insecure_skip_tls?: bool | *false
authentication?: {
username: string
password: string
} | null
}
}
buffer?: {
capacity?: int | *2
Expand Down
42 changes: 42 additions & 0 deletions config/flipt.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,48 @@
}
},
"title": "Cloud"
},
"kafka": {
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"type": "boolean",
"default": false
},
"topic": {
"type": "string"
},
"bootstrap_servers": {
"type": ["array", "null"],
"items": { "type": "string" }
},
"encoding": {
"type": "string",
"enum": ["protobuf", "avro"],
"default": "protobuf"
},
"schema_registry": {
"type": "string"
},
"require_tls": {
"type": "boolean",
"default": false
},
"insecure_skip_tls": {
"type": "boolean",
"default": false
},
"authentication": {
"type": ["object", "null"],
"additionalProperties": false,
"properties": {
"username": { "type": "string" },
"password": { "type": "string" }
}
}
},
"title": "Kafka"
}
}
},
Expand Down
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
github.com/h2non/gock v1.2.0
github.com/hamba/avro v1.8.0
github.com/hashicorp/cap v0.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-retryablehttp v0.7.7
Expand All @@ -66,6 +67,10 @@ require (
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.31.0
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/pkg/kadm v1.12.0
github.com/twmb/franz-go/pkg/sr v1.0.0
github.com/twmb/franz-go/plugin/kzap v1.1.2
github.com/xeipuuv/gojsonschema v1.2.0
github.com/xo/dburl v0.23.2
go.flipt.io/flipt/core v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -212,6 +217,7 @@ require (
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand All @@ -228,9 +234,12 @@ require (
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/locker v1.0.1 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/mountinfo v0.7.1 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/onsi/ginkgo/v2 v2.17.3 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
Expand Down Expand Up @@ -268,6 +277,7 @@ require (
github.com/tchap/go-patricia/v2 v2.3.1 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
github.com/vmihailenco/go-tinylfu v0.2.2 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
Expand Down
16 changes: 14 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE=
github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hamba/avro v1.8.0 h1:eCVrLX7UYThA3R3yBZ+rpmafA5qTc3ZjpTz6gYJoVGU=
github.com/hamba/avro v1.8.0/go.mod h1:NiGUcrLLT+CKfGu5REWQtD9OVPPYUGMVFiC+DE0lQfY=
github.com/hashicorp/cap v0.6.0 h1:uOSdbtXu8zsbRyjwpiTy6QiuX3+5paAbNkYlop7QexM=
github.com/hashicorp/cap v0.6.0/go.mod h1:DwzHkoG6pxSARiqwvAgxmCPUpTTCCw2wVuPrIFOzpe0=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -592,8 +594,8 @@ github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg=
github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=
github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/moby/sys/mountinfo v0.7.1 h1:/tTvQaSJRr2FshkhXiIpux6fQ2Zvc4j7tAhMTStAG2g=
github.com/moby/sys/mountinfo v0.7.1/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc=
github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo=
github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg=
Expand Down Expand Up @@ -776,6 +778,16 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAotr2udEg4=
github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/sr v1.0.0 h1:4FUatTSTEuG2xievT0iDrgnpErgRg7kFLNioJYqfrqs=
github.com/twmb/franz-go/pkg/sr v1.0.0/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c=
github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE=
github.com/twmb/franz-go/plugin/kzap v1.1.2/go.mod h1:53Cl9Uz1pbdOPDvUISIxLrZIWSa2jCuY1bTMauRMBmo=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
Expand Down
Loading

0 comments on commit dae061c

Please sign in to comment.