Skip to content

Commit

Permalink
switch to redpanda and cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Dmytrenko <[email protected]>
  • Loading branch information
erka committed Jul 1, 2024
1 parent 883e13a commit a0ecc19
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 139 deletions.
35 changes: 15 additions & 20 deletions build/testing/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,6 @@ func Unit(ctx context.Context, client *dagger.Client, flipt *dagger.Container) e
WithExec([]string{"-scheme", "http", "-public-host", "gcs:4443"}).
AsService()

kafka := client.Container().
From("bitnami/kafka:3.3.1").
WithExposedPort(9092).
WithEnvVariable("KAFKA_ENABLE_KRAFT", "yes").
WithEnvVariable("KAFKA_CFG_PROCESS_ROLES", "controller,broker").
WithEnvVariable("KAFKA_CFG_CONTROLLER_LISTENER_NAMES", "CONTROLLER").
WithEnvVariable("KAFKA_CFG_LISTENERS", "PLAINTEXT://:9092,CONTROLLER://:9093").
WithEnvVariable("KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT").
WithEnvVariable("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS", "[email protected]:9093").
WithEnvVariable("KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE", "false").
WithEnvVariable("KAFKA_CFG_ADVERTISED_LISTENERS", "PLAINTEXT://kafka-1:9092").
WithEnvVariable("KAFKA_CFG_NODE_ID", "1").
WithEnvVariable("KAFKA_CFG_BROKER_ID", "1").
WithEnvVariable("ALLOW_PLAINTEXT_LISTENER", "yes").
WithEnvVariable("KAFKA_CFG_BROKER_ID", "1").
WithEnvVariable("KAFKA_KRAFT_CLUSTER_ID", "XkpGZQ27R3eTl3OdTm2LYA").
WithExec(nil).
AsService()

// S3 unit testing

flipt = flipt.
Expand All @@ -95,15 +76,29 @@ func Unit(ctx context.Context, client *dagger.Client, flipt *dagger.Container) e
WithEnvVariable("AZURE_STORAGE_ACCOUNT", "devstoreaccount1").
WithEnvVariable("AZURE_STORAGE_KEY", "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==")

// Kafka unit testing

kafka := client.Container().
From("redpandadata/redpanda").
WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{
Description: "kafka endpoint",
}).
WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{
Description: "schema registry endpoint",
}).
WithEnvVariable("REDPANDA_ADVERTISE_KAFKA_ADDRESS", "kafka-1:9092").
WithExec(nil).
AsService()
flipt = flipt.
WithEnvVariable("KAFKA_BOOTSTRAP_SERVER", "kafka-1").
WithServiceBinding("kafka-1", kafka)

if goFlags := os.Getenv("GOFLAGS"); goFlags != "" {
flipt = flipt.WithEnvVariable("GOFLAGS", goFlags)
}

flipt, err = flipt.
WithServiceBinding("redis", redisSrv.AsService()).
WithEnvVariable("KAFKA_BOOTSTRAP_SERVER", "kafka-1:9092").
WithEnvVariable("REDIS_HOST", "redis:6379").
WithEnvVariable("TEST_GIT_REPO_URL", "http://gitea:3000/root/features.git").
WithEnvVariable("TEST_GIT_REPO_HEAD", push["HEAD"]).
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/pkg/kadm v1.12.0
github.com/twmb/franz-go/pkg/sr v1.0.0
github.com/twmb/franz-go/plugin/kzap v1.1.2
github.com/xeipuuv/gojsonschema v1.2.0
github.com/xo/dburl v0.23.2
go.flipt.io/flipt/core v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -107,7 +108,6 @@ require (
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools/v3 v3.5.1
oras.land/oras-go/v2 v2.5.0
)

Expand Down Expand Up @@ -309,6 +309,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gotest.tools/v3 v3.5.1 // indirect
k8s.io/apimachinery v0.30.0 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,8 @@ github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1C
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/sr v1.0.0 h1:4FUatTSTEuG2xievT0iDrgnpErgRg7kFLNioJYqfrqs=
github.com/twmb/franz-go/pkg/sr v1.0.0/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c=
github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE=
github.com/twmb/franz-go/plugin/kzap v1.1.2/go.mod h1:53Cl9Uz1pbdOPDvUISIxLrZIWSa2jCuY1bTMauRMBmo=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
Expand Down
19 changes: 19 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ code.gitea.io/sdk/gitea v0.18.0/go.mod h1:IG9xZJoltDNeDSW0qiF2Vqx5orMWa7OhVWrjvr
contrib.go.opencensus.io/exporter/aws v0.0.0-20230502192102-15967c811cec/go.mod h1:uu1P0UCM/6RbsMrgPa98ll8ZcHM858i/AD06a9aLRCA=
contrib.go.opencensus.io/exporter/stackdriver v0.13.14/go.mod h1:5pSSGY0Bhuk7waTHuDf4aQ8D2DrhgETRo9fy6k3Xlzc=
contrib.go.opencensus.io/integrations/ocsql v0.1.7/go.mod h1:8DsSdjz3F+APR+0z0WkU1aRorQCFfRxvqjUUPMbF3fE=
cuelang.org/go v0.8.2/go.mod h1:CoDbYolfMms4BhWUlhD+t5ORnihR7wvjcfgyO9lL5FI=
dagger.io/dagger v0.11.8/go.mod h1:JcoHw0aCCk9vZLnl9qx6Q/3IXhP92ZUw/hb8/za+vsc=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
github.com/99designs/gqlgen v0.17.49/go.mod h1:tC8YFVZMed81x7UJ7ORUwXF4Kn6SXuucFqQBhN8+BU0=
Expand Down Expand Up @@ -168,13 +169,27 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go-v2 v1.27.2/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg=
github.com/aws/aws-sdk-go-v2/config v1.27.17/go.mod h1:MzM3balLZeaafYcPz8IihAmam/aCz6niPQI0FdprxW0=
github.com/aws/aws-sdk-go-v2/credentials v1.17.17/go.mod h1:e4khg9iY08LnFK/HXQDWMf9GDaiMari7jWPnXvKAuBU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.4/go.mod h1:Wjn5O9eS7uSi7vlPKt/v0MLTncANn9EMmoDvnzJli6o=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.9/go.mod h1:CZBXGLaJnEZI6EVNcPd7a6B5IC5cA/GkRWtu9fp3S6Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.9/go.mod h1:5jJcHuwDagxN+ErjQ3PU3ocf6Ylc/p9x+BLO/+X4iXw=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.8/go.mod h1:hD5YwHLOy6k7d6kqcn3me1bFWHOtzhaXstMd6BpdB68=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.10/go.mod h1:/WNsBOlKWZCG3PMh2aSp8vkyyT/clpMZqOtrnIKqGfk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.10/go.mod h1:gYVF3nM1ApfTRDj9pvdhootBb8WbiIejuqn4w8ruMes=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.8/go.mod h1:yUQPRlWqGG0lfNsmjbRWKVwgilfBtZTOFSLEYALlAig=
github.com/aws/aws-sdk-go-v2/service/kms v1.29.2/go.mod h1:elLDaj+1RNl9Ovn3dB6dWLVo5WQ+VLSUMKegl7N96fY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.54.4/go.mod h1:oSkRFuHVWmUY4Ssk16ErGzBqvYEbvORJFzFXzWhTB2s=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.2/go.mod h1:GvNHKQAAOSKjmlccE/+Ww2gDbwYP9EewIuvWiQSquQs=
github.com/aws/aws-sdk-go-v2/service/sns v1.29.2/go.mod h1:ZIs7/BaYel9NODoYa8PW39o15SFAXDEb4DxOG2It15U=
github.com/aws/aws-sdk-go-v2/service/sqs v1.31.2/go.mod h1:J3XhTE+VsY1jDsdDY+ACFAppZj/gpvygzC5JE0bTLbQ=
github.com/aws/aws-sdk-go-v2/service/ssm v1.49.2/go.mod h1:loBAHYxz7JyucJvq4xuW9vunu8iCzjNYfSrQg2QEczA=
github.com/aws/aws-sdk-go-v2/service/sso v1.20.10/go.mod h1:5XKooCTi9VB/xZmJDvh7uZ+v3uQ7QdX6diOyhvPA+/w=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.4/go.mod h1:MZ/PVYU/mRbmSF6WK3ybCYHjA2mig8utVokDEVLDgE0=
github.com/aws/aws-sdk-go-v2/service/sts v1.28.11/go.mod h1:QXnthRM35zI92048MMwfFChjFmoufTdhtHmouwNfhhU=
github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/bradleyjkemp/cupaloy/v2 v2.6.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
Expand Down Expand Up @@ -487,13 +502,15 @@ golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8=
golang.org/x/oauth2 v0.19.0/go.mod h1:vYi7skDa1x015PmRRYZ7+s1cWyPgrPiSYRe4rnsexc8=
Expand All @@ -507,7 +524,9 @@ golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
Expand Down
8 changes: 7 additions & 1 deletion internal/server/audit/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sr"
"github.com/twmb/franz-go/plugin/kzap"
"go.flipt.io/flipt/internal/config"
"go.flipt.io/flipt/internal/server/audit"
"go.uber.org/zap"
Expand Down Expand Up @@ -37,10 +38,15 @@ type Sink struct {
// NewSink is the constructor for a Sink.
func NewSink(ctx context.Context, logger *zap.Logger, cfg config.KafkaSinkConfig) (audit.Sink, error) {
logger = logger.With(zap.String("sink", sinkType))
logLevel := logger.Level()
if logLevel == zap.DebugLevel {
// kgo produces a lot of debug logs
logLevel = zap.InfoLevel
}
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.BootstrapServers...),
kgo.DefaultProduceTopic(cfg.Topic),
kgo.WithLogger(&klogger{logger: logger}),
kgo.WithLogger(kzap.New(logger, kzap.AtomicLevel(zap.NewAtomicLevelAt(logLevel)))),
}

if cfg.Authentication != nil {
Expand Down
53 changes: 44 additions & 9 deletions internal/server/audit/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -22,7 +23,6 @@ func createTopic(t testing.TB, bootstrapServers []string, topic string) {
kgo.SeedBrokers(bootstrapServers...),
)
require.NoError(t, err)
defer client.Close()

adminClient := kadm.NewClient(client)

Expand All @@ -31,17 +31,23 @@ func createTopic(t testing.TB, bootstrapServers []string, topic string) {

_, err = adminClient.CreateTopic(ctx, 1, 1, nil, topic)
require.NoError(t, err)
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := adminClient.DeleteTopic(ctx, topic)
assert.NoError(t, err)
adminClient.Close()
})
}

func TestNewSink(t *testing.T) {
func TestNewSinkAndSend(t *testing.T) {
srv := os.Getenv("KAFKA_BOOTSTRAP_SERVER")
if srv == "" {
t.Skip("no kafka servers provided")
}

bootstrapServers := []string{srv}
topic := fmt.Sprintf("default-%d", time.Now().Unix())
createTopic(t, bootstrapServers, topic)

e := audit.NewEvent(
flipt.NewRequest(flipt.ResourceFlag, flipt.ActionCreate, flipt.WithSubject(flipt.SubjectRule)),
&audit.Actor{
Expand All @@ -57,14 +63,30 @@ func TestNewSink(t *testing.T) {
}),
)

for _, enc := range []string{encodingAvro, encodingProto} {
t.Run(enc, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
sg := fmt.Sprintf("http://%s:8081", srv)

tests := []struct {
name string
enc string
schemaRegistry string
}{
{"avro-basic", encodingAvro, ""},
{"avro-schema-registry", encodingAvro, sg},
{"proto-basic", encodingProto, ""},
{"proto-schema-registry", encodingProto, sg},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
topic := fmt.Sprintf("default-%s", tt.name)
createTopic(t, bootstrapServers, topic)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
cfg := config.KafkaSinkConfig{
BootstrapServers: bootstrapServers,
Topic: topic,
Encoding: enc,
Encoding: tt.enc,
SchemaRegistry: tt.schemaRegistry,
}
s, err := NewSink(context.Background(), zaptest.NewLogger(t), cfg)
require.NoError(t, err)
Expand All @@ -81,10 +103,23 @@ func TestNewSink(t *testing.T) {
t.Run("unsupported", func(t *testing.T) {
cfg := config.KafkaSinkConfig{
BootstrapServers: bootstrapServers,
Topic: topic,
Topic: "unsupported",
Encoding: "unknown",
}
_, err := NewSink(context.Background(), zaptest.NewLogger(t), cfg)
require.ErrorContains(t, err, "unsupported encoding:")
})

t.Run("auth-failure", func(t *testing.T) {
topic := "default-auth-failure"
createTopic(t, bootstrapServers, topic)
cfg := config.KafkaSinkConfig{
BootstrapServers: bootstrapServers,
Topic: topic,
Encoding: encodingProto,
Authentication: &config.KafkaAuthentication{Username: "user", Password: "pass"},
}
_, err := NewSink(context.Background(), zaptest.NewLogger(t), cfg)
require.Error(t, err)
})
}
50 changes: 0 additions & 50 deletions internal/server/audit/kafka/klogger.go

This file was deleted.

58 changes: 0 additions & 58 deletions internal/server/audit/kafka/klogger_test.go

This file was deleted.

0 comments on commit a0ecc19

Please sign in to comment.