Skip to content

Commit

Permalink
Add schema registry integration (#59)
Browse files Browse the repository at this point in the history
This is an integration for Nats Kafka bridge with Confluent compatible
schema registries. The functionality uses srclient as the schema
registry client and adds support for sending and receiving Avro, JSON
and protobuf encoded messages. There are unit tests added as well.

Resolves #13
  • Loading branch information
drnushooz committed Jan 3, 2022
1 parent 0cc4445 commit 9e878e1
Show file tree
Hide file tree
Showing 16 changed files with 1,170 additions and 58 deletions.
4 changes: 4 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ All connectors must specify Kafka connection properties, with a few optional set
* `maxbytes` - (optional) used by a Kafka reader to set the maximum bytes for a read
* `keytype` - (optional) defines the way keys are assigned to messages coming from NATS (see below)
* `keyvalue` - (optional) extra data that may be used depending on the key type
* `schemaregistryurl` - (optional) URL of the Kafka schema registry instance
* `subjectname` - (exclusive with schemaregistryurl) Name of the subject in the schema registry to use for schema
* `schemaversion` - (optional, exclusive with schemaregistryurl) Version of the schema to use from the registry, uses the latest if unspecified
* `schematype` - (optional, exclusive with schemaregistryurl) Type of schema. Can be "avro", "json" or "protobuf", default is "avro"

Available key types are:

Expand Down
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ go 1.17

require (
github.com/Shopify/sarama v1.29.1
github.com/jhump/protoreflect v1.10.1
github.com/linkedin/goavro/v2 v2.10.1
github.com/nats-io/nats-server/v2 v2.2.6
github.com/nats-io/nats-streaming-server v0.22.0
github.com/nats-io/nats.go v1.11.0
github.com/nats-io/nuid v1.0.1
github.com/nats-io/stan.go v0.9.0
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc
github.com/riferrei/srclient v0.4.0
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0
github.com/stretchr/testify v1.7.0
)

Expand All @@ -21,7 +25,8 @@ require (
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/color v1.12.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-hclog v0.16.1 // indirect
github.com/hashicorp/go-immutable-radix v1.3.0 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
Expand All @@ -46,7 +51,10 @@ require (
go.etcd.io/bbolt v1.3.6 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
74 changes: 72 additions & 2 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions server/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,9 @@ type ConnectorConfig struct {

KeyType string // what to do with the key, can be FixedKey, ...
KeyValue string // extra data for handling the key based on the keytype, may be ignored

SchemaRegistryURL string // Schema registry url for message schema validation
SubjectName string // Name of the subject in the schema registry for the value
SchemaVersion int // Version of the value schema to use. Default is latest.
SchemaType string // Can be avro, json, protobuf. Default is avro.
}
143 changes: 128 additions & 15 deletions server/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ package kafka
import (
"context"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/nats-io/nats-kafka/server/conf"
"github.com/riferrei/srclient"
"github.com/santhosh-tekuri/jsonschema/v5"
)

// Message represents a Kafka message.
Expand Down Expand Up @@ -61,6 +66,11 @@ type saramaConsumer struct {
consumeErrCh chan error

cancel context.CancelFunc

schemaRegistryOn bool
schemaRegistryClient srclient.ISchemaRegistryClient
schemaType srclient.SchemaType
pbDeserializer pbDeserializer
}

// NewConsumer returns a new Kafka Consumer.
Expand All @@ -76,6 +86,7 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
sc.Net.SASL.User = cc.SASL.User
sc.Net.SASL.Password = cc.SASL.Password
}

if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify {
sc.Net.TLS.Enable = true
sc.Net.TLS.Config = &tls.Config{
Expand All @@ -101,6 +112,23 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
tlsSkipVerify: cc.SASL.InsecureSkipVerify,
}

// If schema registry url and subject name both are set, enable schema registry integration
if cc.SchemaRegistryURL != "" && cc.SubjectName != "" {
cons.schemaRegistryClient = srclient.CreateSchemaRegistryClient(cc.SchemaRegistryURL)

switch strings.ToUpper(cc.SchemaType) {
case srclient.Json.String():
cons.schemaType = srclient.Json
case srclient.Protobuf.String():
cons.schemaType = srclient.Protobuf
cons.pbDeserializer = newDeserializer()
default:
cons.schemaType = srclient.Avro
}

cons.schemaRegistryOn = true
}

if cons.groupMode {
cg, err := sarama.NewConsumerGroup(cc.Brokers, cc.GroupID, sc)
if err != nil {
Expand Down Expand Up @@ -165,14 +193,24 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) {
case <-ctx.Done():
return Message{}, ctx.Err()
case cmsg := <-c.fetchCh:
return Message{
Topic: cmsg.Topic,
Partition: int(cmsg.Partition),
Offset: cmsg.Offset,
var deserializedValue = cmsg.Value
var err error
if c.schemaRegistryOn {
deserializedValue, err = c.deserializePayload(cmsg.Value)
}

Key: cmsg.Key,
Value: cmsg.Value,
}, nil
if err == nil {
return Message{
Topic: cmsg.Topic,
Partition: int(cmsg.Partition),
Offset: cmsg.Offset,

Key: cmsg.Key,
Value: deserializedValue,
}, nil
} else {
return Message{}, err
}
case loopErr := <-c.consumeErrCh:
return Message{}, loopErr
}
Expand All @@ -182,14 +220,24 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) {
case <-ctx.Done():
return Message{}, ctx.Err()
case cmsg := <-c.pc.Messages():
return Message{
Topic: cmsg.Topic,
Partition: int(cmsg.Partition),
Offset: cmsg.Offset,

Key: cmsg.Key,
Value: cmsg.Value,
}, nil
var deserializedValue = cmsg.Value
var err error
if c.schemaRegistryOn {
deserializedValue, err = c.deserializePayload(cmsg.Value)
}

if err == nil {
return Message{
Topic: cmsg.Topic,
Partition: int(cmsg.Partition),
Offset: cmsg.Offset,

Key: cmsg.Key,
Value: deserializedValue,
}, nil
} else {
return Message{}, err
}
}
}

Expand Down Expand Up @@ -261,3 +309,68 @@ func (c *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sa

return nil
}

// Retrieve the schema of the message and deserialize it.
func (c *saramaConsumer) deserializePayload(payload []byte) ([]byte, error) {
// first byte of the payload is 0
if payload[0] != byte(0) {
return nil, fmt.Errorf("failed to deserialize payload: magic byte is not 0")
}

// next 4 bytes contain the schema id
schemaID := binary.BigEndian.Uint32(payload[1:5])
schema, err := c.schemaRegistryClient.GetSchema(int(schemaID))
if err != nil {
return nil, err
}

var value []byte
switch c.schemaType {
case srclient.Avro:
value, err = c.deserializeAvro(schema, payload[5:])
case srclient.Json:
value, err = c.validateJsonSchema(schema, payload[5:])
case srclient.Protobuf:
value, err = c.pbDeserializer.Deserialize(schema, payload[5:])
}

if err != nil {
return nil, err
}

return value, nil
}

func (c *saramaConsumer) deserializeAvro(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) {
codec := schema.Codec()
native, _, err := codec.NativeFromBinary(cleanPayload)
if err != nil {
return nil, fmt.Errorf("unable to deserailize avro: %w", err)
}
value, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, fmt.Errorf("failed to convert to json: %w", err)
}

return value, nil
}

func (c *saramaConsumer) validateJsonSchema(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) {
jsc, err := jsonschema.CompileString("schema.json", schema.Schema())
if err != nil {
return nil, fmt.Errorf("unable to parse json schema: %w", err)
}

var parsedMessage interface{}
err = json.Unmarshal(cleanPayload, &parsedMessage)
if err != nil {
return nil, fmt.Errorf("unable to parse json message: %w", err)
}

err = jsc.Validate(parsedMessage)
if err != nil {
return nil, fmt.Errorf("json message validation failed: %w", err)
}

return cleanPayload, nil
}
Loading

0 comments on commit 9e878e1

Please sign in to comment.