Skip to content

Commit

Permalink
added avro serde
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Dmytrenko <[email protected]>
  • Loading branch information
erka committed Jun 25, 2024
1 parent a3a7ce2 commit e465ecd
Show file tree
Hide file tree
Showing 17 changed files with 273 additions and 92 deletions.
2 changes: 1 addition & 1 deletion config/flipt.schema.cue
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ import "strings"
enabled?: bool | *false
topic: string
bootstrap_servers: [...string]
encoding?: string | *"protobuf"
encoding?: *"protobuf" | "avro"
}
}
buffer?: {
Expand Down
2 changes: 1 addition & 1 deletion config/flipt.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@
},
"encoding": {
"type": "string",
"enum": ["protobuf"],
"enum": ["protobuf", "avro"],
"default": "protobuf"
}
},
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
github.com/h2non/gock v1.2.0
github.com/hamba/avro v1.8.0
github.com/hashicorp/cap v0.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-retryablehttp v0.7.7
Expand All @@ -67,7 +68,6 @@ require (
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.31.0
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/pkg/sr v1.0.0
github.com/xeipuuv/gojsonschema v1.2.0
github.com/xo/dburl v0.23.1
go.flipt.io/flipt/core v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -105,6 +105,7 @@ require (
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools/v3 v3.5.1
oras.land/oras-go/v2 v2.5.0
)

Expand Down Expand Up @@ -214,6 +215,7 @@ require (
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand All @@ -233,6 +235,8 @@ require (
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/onsi/ginkgo/v2 v2.17.3 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
Expand Down Expand Up @@ -302,7 +306,6 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gotest.tools/v3 v3.5.1 // indirect
k8s.io/apimachinery v0.30.0 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE=
github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hamba/avro v1.8.0 h1:eCVrLX7UYThA3R3yBZ+rpmafA5qTc3ZjpTz6gYJoVGU=
github.com/hamba/avro v1.8.0/go.mod h1:NiGUcrLLT+CKfGu5REWQtD9OVPPYUGMVFiC+DE0lQfY=
github.com/hashicorp/cap v0.6.0 h1:uOSdbtXu8zsbRyjwpiTy6QiuX3+5paAbNkYlop7QexM=
github.com/hashicorp/cap v0.6.0/go.mod h1:DwzHkoG6pxSARiqwvAgxmCPUpTTCCw2wVuPrIFOzpe0=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -780,8 +782,6 @@ github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/sr v1.0.0 h1:4FUatTSTEuG2xievT0iDrgnpErgRg7kFLNioJYqfrqs=
github.com/twmb/franz-go/pkg/sr v1.0.0/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/protoc-gen-go-flipt-sdk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func generateSubSDK(gen *protogen.Plugin, file *protogen.File) (typ, client stri

// define client structure
typ = strings.Title(string(file.GoPackageName))
client = relativeImport(g, file, typ+"Client")
if len(file.Services) > 0 {
relativeImport(g, file, typ+"Client")
}

// We generate an interface which conjoins all the client interfaces
// generated by the gRPC protoc generator.
Expand Down
26 changes: 13 additions & 13 deletions internal/server/audit/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ const (

// Event holds information that represents an action that was attempted in the system.
type Event struct {
Version string `json:"version"`
Version string `json:"version" avro:"version"`

Type string `json:"type"`
Type string `json:"type" avro:"type"`

Action string `json:"action"`
Action string `json:"action" avro:"action"`

Metadata Metadata `json:"metadata"`
Metadata Metadata `json:"metadata" avro:"metadata"`

Payload interface{} `json:"payload"`
Payload interface{} `json:"payload" avro:"payload"`

Timestamp string `json:"timestamp"`
Timestamp string `json:"timestamp" avro:"timestamp"`

Status string `json:"status"`
Status string `json:"status" avro:"status"`
}

// NewEvent is the constructor for an event.
Expand Down Expand Up @@ -199,14 +199,14 @@ func decodeToEvent(kvs []attribute.KeyValue) (*Event, error) {
}

type Actor struct {
Authentication string `json:"authentication,omitempty"`
IP string `json:"ip,omitempty"`
Email string `json:"email,omitempty"`
Name string `json:"name,omitempty"`
Picture string `json:"picture,omitempty"`
Authentication string `json:"authentication,omitempty" avro:"authentication"`
IP string `json:"ip,omitempty" avro:"ip"`
Email string `json:"email,omitempty" avro:"email"`
Name string `json:"name,omitempty" avro:"name"`
Picture string `json:"picture,omitempty" avro:"picture"`
}

// Metadata holds information of what metadata an event will contain.
type Metadata struct {
Actor *Actor `json:"actor,omitempty"`
Actor *Actor `json:"actor,omitempty" avro:"actor"`
}
37 changes: 37 additions & 0 deletions internal/server/audit/kafka/avro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kafka

import (
"bytes"
"encoding/json"
"errors"

"github.com/hamba/avro"
"go.flipt.io/flipt/internal/server/audit"
rpcaudit "go.flipt.io/flipt/rpc/flipt/audit"
)

func toAvro(v any) ([]byte, error) {
if e, ok := v.(audit.Event); ok {
if e.Payload != nil {
//FIXME: this modifies the origin payload
payloadString, err := json.Marshal(e.Payload)
if err != nil {
return nil, err
}
var payload map[string]any
err = json.Unmarshal(payloadString, &payload)
if err != nil {
return nil, err
}
e.Payload = payload
}
buf := &bytes.Buffer{}
enc, err := avro.NewEncoder(rpcaudit.AvroSchema, buf)
if err != nil {
return nil, err
}
err = enc.Encode(e)
return buf.Bytes(), err
}
return nil, errors.New("unsupported struct")
}
43 changes: 43 additions & 0 deletions internal/server/audit/kafka/encoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package kafka

import (
"testing"

"github.com/stretchr/testify/require"
"go.flipt.io/flipt/internal/server/audit"
"go.flipt.io/flipt/rpc/flipt"
)

func TestEncoding(t *testing.T) {
tests := []struct {
name string
f func(any) ([]byte, error)
}{
{"protobuf", toProtobuf},
{"avro", toAvro},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := flipt.NewRequest(flipt.ResourceFlag, flipt.ActionCreate, flipt.WithSubject(flipt.SubjectRule))
e := audit.NewEvent(
r,
&audit.Actor{
Authentication: "token",
IP: "127.0.0.1",
},
&audit.Flag{
Key: "this-flag",
Name: "this-flag",
Description: "this description",
Enabled: false,
NamespaceKey: "default",
},
)

b, err := tt.f(*e)
require.NoError(t, err)
require.NotEmpty(t, b)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.flipt.io/flipt/internal/server/audit"
protoaudit "go.flipt.io/flipt/rpc/flipt/audit"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -36,12 +37,21 @@ func toProtobuf(v any) ([]byte, error) {
r.Timestamp = timestamppb.New(t)

if e.Payload != nil {
// FIXME: payload should be added here
payload, err := json.Marshal(e.Payload)
//FIXME: this modifies the origin payload
payloadString, err := json.Marshal(e.Payload)
if err != nil {
return nil, err
}
r.Payload = proto.String(string(payload))
var payloadMap map[string]any
err = json.Unmarshal(payloadString, &payloadMap)
if err != nil {
return nil, err
}
payload, err := structpb.NewStruct(payloadMap)
if err != nil {
return nil, err
}
r.Payload = payload
}

b, err := proto.Marshal(r)
Expand Down
74 changes: 74 additions & 0 deletions rpc/flipt/audit/event.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
{
"name": "event",
"type": "record",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "action",
"type": "string"
},
{
"name": "metadata",
"type": {
"type": "record",
"name": "metadata",
"fields": [
{
"name": "actor",
"type": [
"null",
{
"type": "record",
"name": "actor",
"fields": [
{
"name": "authentication",
"type": "string"
},
{
"name": "ip",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "picture",
"type": "string"
}
]
}
]
}
]
}
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "status",
"type": "string"
},
{
"name": "payload",
"type": {
"type":"map",
"values": ["null", "string", "int", "boolean"]
}
}
]
}
Loading

0 comments on commit e465ecd

Please sign in to comment.