diff --git a/docs/config.md b/docs/config.md index f7a0300..1364bc0 100644 --- a/docs/config.md +++ b/docs/config.md @@ -253,6 +253,10 @@ All connectors must specify Kafka connection properties, with a few optional set * `maxbytes` - (optional) used by a Kafka reader to set the maximum bytes for a read * `keytype` - (optional) defines the way keys are assigned to messages coming from NATS (see below) * `keyvalue` - (optional) extra data that may be used depending on the key type +* `schemaregistryurl` - (optional) URL of the Kafka schema registry instance +* `subjectname` - (exclusive with schemaregistryurl) Name of the subject in the schema registry to use for schema +* `schemaversion` - (optional, exclusive with schemaregistryurl) Version of the schema to use from the registry, uses the latest if unspecified +* `schematype` - (optional, exclusive with schemaregistryurl) Type of schema. Can be "avro", "json" or "protobuf", default is "avro" Available key types are: diff --git a/go.mod b/go.mod index 60388a0..c7be02f 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,16 @@ go 1.17 require ( github.com/Shopify/sarama v1.29.1 + github.com/jhump/protoreflect v1.10.1 + github.com/linkedin/goavro/v2 v2.10.1 github.com/nats-io/nats-server/v2 v2.2.6 github.com/nats-io/nats-streaming-server v0.22.0 github.com/nats-io/nats.go v1.11.0 github.com/nats-io/nuid v1.0.1 github.com/nats-io/stan.go v0.9.0 github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc + github.com/riferrei/srclient v0.4.0 + github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 github.com/stretchr/testify v1.7.0 ) @@ -21,7 +25,8 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/fatih/color v1.12.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/snappy v0.0.3 // indirect + github.com/golang/protobuf v1.4.2 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/go-hclog v0.16.1 // indirect github.com/hashicorp/go-immutable-radix v1.3.0 // indirect github.com/hashicorp/go-msgpack v1.1.5 // indirect @@ -46,7 +51,10 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect + golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect + google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index 14f42da..2ea2e01 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Shopify/sarama v1.29.1 h1:wBAacXbYVLmWieEA/0X/JagDdCZ8NVFOfS6l6+2u5S0= @@ -14,9 +16,11 @@ github.com/armon/go-metrics v0.3.9/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -27,6 +31,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= @@ -44,6 +50,8 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -52,16 +60,23 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= @@ -95,6 +110,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJz github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jhump/protoreflect v1.10.1 h1:iH+UZfsbRE6vpyZH7asAjTPWJf7RJbpZ9j/N3lDlKs0= +github.com/jhump/protoreflect v1.10.1/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -115,6 +132,9 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.10.1 h1:ExVurHDnf0eyUocILs48kiZ4pGvaEbDvBOQcfLruA/0= +github.com/linkedin/goavro/v2 v2.10.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -149,6 +169,7 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.9.0 h1:TB73Y31au++0sU0VmnBy2pYkSrwH0zUFNRB9YePHqC4= github.com/nats-io/stan.go v0.9.0/go.mod h1:0jEuBXKauB1HHJswHM/lx05K48TJ1Yxj6VIfM4k+aB4= +github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ= github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= @@ -166,6 +187,7 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= @@ -178,6 +200,11 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/riferrei/srclient v0.4.0 h1:lms2bs8BXZNRlSEQioqXjMrPYlFeT9yoeCe22yb51rM= +github.com/riferrei/srclient v0.4.0/go.mod h1:SmCz0lrYQ1pLqXlYq0yPnRccHLGh+llDA0i6hecPeW8= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 h1:TToq11gyfNlrMFZiYujSekIsPd9AmsA2Bj/iv+s4JHE= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -192,12 +219,14 @@ github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqri github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -206,26 +235,40 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -238,6 +281,7 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -250,32 +294,55 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 h1:Vv0JUPWTyeqUq42B2WJ1FeIDjjvGKoA2Ss+Ts0lAVbs= golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190424220101-1e8e1cfdf96b/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12 h1:OwhZOOMuf7leLaSCuxtQ9FW7ui2L2L6UKOtKAUqovUQ= +google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -283,3 +350,6 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= diff --git a/server/conf/conf.go b/server/conf/conf.go index 4d495e6..2a43182 100644 --- a/server/conf/conf.go +++ b/server/conf/conf.go @@ -223,4 +223,9 @@ type ConnectorConfig struct { KeyType string // what to do with the key, can be FixedKey, ... KeyValue string // extra data for handling the key based on the keytype, may be ignored + + SchemaRegistryURL string // Schema registry url for message schema validation + SubjectName string // Name of the subject in the schema registry for the value + SchemaVersion int // Version of the value schema to use. Default is latest. + SchemaType string // Can be avro, json, protobuf. Default is avro. } diff --git a/server/kafka/consumer.go b/server/kafka/consumer.go index 464b3a2..9e92650 100644 --- a/server/kafka/consumer.go +++ b/server/kafka/consumer.go @@ -19,11 +19,16 @@ package kafka import ( "context" "crypto/tls" + "encoding/binary" + "encoding/json" "fmt" + "strings" "time" "github.com/Shopify/sarama" "github.com/nats-io/nats-kafka/server/conf" + "github.com/riferrei/srclient" + "github.com/santhosh-tekuri/jsonschema/v5" ) // Message represents a Kafka message. @@ -61,6 +66,11 @@ type saramaConsumer struct { consumeErrCh chan error cancel context.CancelFunc + + schemaRegistryOn bool + schemaRegistryClient srclient.ISchemaRegistryClient + schemaType srclient.SchemaType + pbDeserializer pbDeserializer } // NewConsumer returns a new Kafka Consumer. @@ -76,6 +86,7 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer, sc.Net.SASL.User = cc.SASL.User sc.Net.SASL.Password = cc.SASL.Password } + if sc.Net.SASL.Enable && cc.SASL.InsecureSkipVerify { sc.Net.TLS.Enable = true sc.Net.TLS.Config = &tls.Config{ @@ -101,6 +112,23 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer, tlsSkipVerify: cc.SASL.InsecureSkipVerify, } + // If schema registry url and subject name both are set, enable schema registry integration + if cc.SchemaRegistryURL != "" && cc.SubjectName != "" { + cons.schemaRegistryClient = srclient.CreateSchemaRegistryClient(cc.SchemaRegistryURL) + + switch strings.ToUpper(cc.SchemaType) { + case srclient.Json.String(): + cons.schemaType = srclient.Json + case srclient.Protobuf.String(): + cons.schemaType = srclient.Protobuf + cons.pbDeserializer = newDeserializer() + default: + cons.schemaType = srclient.Avro + } + + cons.schemaRegistryOn = true + } + if cons.groupMode { cg, err := sarama.NewConsumerGroup(cc.Brokers, cc.GroupID, sc) if err != nil { @@ -165,14 +193,24 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) { case <-ctx.Done(): return Message{}, ctx.Err() case cmsg := <-c.fetchCh: - return Message{ - Topic: cmsg.Topic, - Partition: int(cmsg.Partition), - Offset: cmsg.Offset, + var deserializedValue = cmsg.Value + var err error + if c.schemaRegistryOn { + deserializedValue, err = c.deserializePayload(cmsg.Value) + } - Key: cmsg.Key, - Value: cmsg.Value, - }, nil + if err == nil { + return Message{ + Topic: cmsg.Topic, + Partition: int(cmsg.Partition), + Offset: cmsg.Offset, + + Key: cmsg.Key, + Value: deserializedValue, + }, nil + } else { + return Message{}, err + } case loopErr := <-c.consumeErrCh: return Message{}, loopErr } @@ -182,14 +220,24 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) { case <-ctx.Done(): return Message{}, ctx.Err() case cmsg := <-c.pc.Messages(): - return Message{ - Topic: cmsg.Topic, - Partition: int(cmsg.Partition), - Offset: cmsg.Offset, - - Key: cmsg.Key, - Value: cmsg.Value, - }, nil + var deserializedValue = cmsg.Value + var err error + if c.schemaRegistryOn { + deserializedValue, err = c.deserializePayload(cmsg.Value) + } + + if err == nil { + return Message{ + Topic: cmsg.Topic, + Partition: int(cmsg.Partition), + Offset: cmsg.Offset, + + Key: cmsg.Key, + Value: deserializedValue, + }, nil + } else { + return Message{}, err + } } } @@ -261,3 +309,68 @@ func (c *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sa return nil } + +// Retrieve the schema of the message and deserialize it. +func (c *saramaConsumer) deserializePayload(payload []byte) ([]byte, error) { + // first byte of the payload is 0 + if payload[0] != byte(0) { + return nil, fmt.Errorf("failed to deserialize payload: magic byte is not 0") + } + + // next 4 bytes contain the schema id + schemaID := binary.BigEndian.Uint32(payload[1:5]) + schema, err := c.schemaRegistryClient.GetSchema(int(schemaID)) + if err != nil { + return nil, err + } + + var value []byte + switch c.schemaType { + case srclient.Avro: + value, err = c.deserializeAvro(schema, payload[5:]) + case srclient.Json: + value, err = c.validateJsonSchema(schema, payload[5:]) + case srclient.Protobuf: + value, err = c.pbDeserializer.Deserialize(schema, payload[5:]) + } + + if err != nil { + return nil, err + } + + return value, nil +} + +func (c *saramaConsumer) deserializeAvro(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) { + codec := schema.Codec() + native, _, err := codec.NativeFromBinary(cleanPayload) + if err != nil { + return nil, fmt.Errorf("unable to deserailize avro: %w", err) + } + value, err := codec.TextualFromNative(nil, native) + if err != nil { + return nil, fmt.Errorf("failed to convert to json: %w", err) + } + + return value, nil +} + +func (c *saramaConsumer) validateJsonSchema(schema *srclient.Schema, cleanPayload []byte) ([]byte, error) { + jsc, err := jsonschema.CompileString("schema.json", schema.Schema()) + if err != nil { + return nil, fmt.Errorf("unable to parse json schema: %w", err) + } + + var parsedMessage interface{} + err = json.Unmarshal(cleanPayload, &parsedMessage) + if err != nil { + return nil, fmt.Errorf("unable to parse json message: %w", err) + } + + err = jsc.Validate(parsedMessage) + if err != nil { + return nil, fmt.Errorf("json message validation failed: %w", err) + } + + return cleanPayload, nil +} diff --git a/server/kafka/consumer_test.go b/server/kafka/consumer_test.go new file mode 100644 index 0000000..1bdc863 --- /dev/null +++ b/server/kafka/consumer_test.go @@ -0,0 +1,169 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import ( + "encoding/binary" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/jhump/protoreflect/desc/protoparse" + "github.com/jhump/protoreflect/dynamic" + + "github.com/linkedin/goavro/v2" + "github.com/riferrei/srclient" + "github.com/stretchr/testify/assert" +) + +func TestDeserializePayloadAvro(t *testing.T) { + server := newMockSchemaServer(t) + defer server.close() + + consumer := saramaConsumer{ + schemaRegistryOn: true, + schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()), + schemaType: srclient.Avro, + } + + avroCodec, err := goavro.NewCodec(avroSchema) + assert.Nil(t, err) + native, _, err := avroCodec.NativeFromTextual([]byte(avroMessage)) + assert.Nil(t, err) + avroBytes, err := avroCodec.BinaryFromNative(nil, native) + assert.Nil(t, err) + + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(avroSchemaID)) + + var payload []byte + payload = append(payload, byte(0)) + payload = append(payload, schemaIDBytes...) + payload = append(payload, avroBytes...) + + _, err = consumer.deserializePayload(payload) + assert.Nil(t, err) +} + +func TestDeserializePayloadJson(t *testing.T) { + server := newMockSchemaServer(t) + defer server.close() + + consumer := &saramaConsumer{ + schemaRegistryOn: true, + schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()), + schemaType: srclient.Json, + } + + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(jsonSchemaID)) + + var payload []byte + payload = append(payload, byte(0)) + payload = append(payload, schemaIDBytes...) + payload = append(payload, []byte(jsonMessage)...) + + _, err := consumer.deserializePayload(payload) + assert.Nil(t, err) +} + +func TestDeserializePayloadProtobuf(t *testing.T) { + server := newMockSchemaServer(t) + defer server.close() + + consumer := &saramaConsumer{ + schemaRegistryOn: true, + schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()), + schemaType: srclient.Protobuf, + pbDeserializer: newDeserializer(), + } + + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(protobufSchemaID)) + + msgIndexBytes := make([]byte, 16) + length := binary.PutVarint(msgIndexBytes, 1) + binary.PutVarint(msgIndexBytes[length:], 0) + + protoBytes, err := createProtobufMessage() + assert.Nil(t, err) + + var payload []byte + payload = append(payload, byte(0)) + payload = append(payload, schemaIDBytes...) + payload = append(payload, msgIndexBytes...) + payload = append(payload, protoBytes...) + + _, err = consumer.deserializePayload(payload) + assert.Nil(t, err) +} + +func createProtobufMessage() ([]byte, error) { + errorReporter := func(err protoparse.ErrorWithPos) error { + position := err.GetPosition() + return fmt.Errorf("unable to parse file descriptor %s %d: %w", position.Filename, position.Line, err.Unwrap()) + } + + nanoTs := strconv.FormatInt(time.Now().UnixNano(), 10) + schemaFileName := "test-" + nanoTs + ".proto" + file, err := os.CreateTemp("", schemaFileName) + if err != nil { + return nil, err + } + + _, err = file.WriteString(protobufSchema) + if err != nil { + return nil, err + } + + err = file.Close() + if err != nil { + return nil, err + } + defer os.Remove(file.Name()) + + schemaMap := make(map[string]string, 1) + schemaMap[schemaFileName] = protobufSchema + var schemaFilePaths []string + schemaFilePaths = append(schemaFilePaths, schemaFileName) + protobufParser := &protoparse.Parser{ + Accessor: protoparse.FileContentsFromMap(schemaMap), + ImportPaths: []string{"."}, + InferImportPaths: true, + ValidateUnlinkedFiles: true, + ErrorReporter: errorReporter, + } + fds, err := protobufParser.ParseFiles(schemaFilePaths...) + if err != nil { + return nil, err + } + + dynamicMessage := dynamic.NewMessage(fds[0].GetMessageTypes()[0]) + err = dynamicMessage.UnmarshalJSON([]byte(protobufMessage)) + if err != nil { + return nil, err + } + + bytes, err := dynamicMessage.Marshal() + if err != nil { + return nil, err + } + + return bytes, nil +} diff --git a/server/kafka/mock_schema_registry.go b/server/kafka/mock_schema_registry.go new file mode 100644 index 0000000..17d300e --- /dev/null +++ b/server/kafka/mock_schema_registry.go @@ -0,0 +1,157 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import ( + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strconv" + "testing" + + "github.com/riferrei/srclient" + "github.com/stretchr/testify/assert" +) + +// schemas for testing +const avroSchema = `{ + "type": "record", + "name": "snack", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "manufacturer", "type": "string"}, + {"name": "calories", "type": "float"}, + {"name": "color", "type": ["null", "string"], "default": null} + ] + }` +const jsonSchema = `{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Snack", + "type": "object", + "properties": { + "name": {"type": "string"}, + "manufacturer": {"type": "string"}, + "calories": {"type": "number"}, + "color": {"type": "string"} + }, + "required": ["name", "manufacturer", "calories"] + }` +const protobufSchema = `message Snack { + required string name = 1; + required string manufacturer = 2; + required float calories = 3; + optional string color = 4; + }` + +// sample messages for testing +const avroMessage = `{"name":"cookie", "manufacturer": "cadbury", "calories": 100.0}` +const jsonMessage = `{"name":"candy", "manufacturer": "nestle", "calories": 200.0}` +const protobufMessage = `{"name":"chocolate", "manufacturer": "hersheys", "calories": 300.0}` + +// schema registry subject names +const avroSubjectName = "snack_avro-value" +const jsonSubjectName = "snack_json-value" +const protobufSubjectName = "snack_protobuf-value" + +// schema ids +const avroSchemaID = 1 +const jsonSchemaID = 2 +const protobufSchemaID = 3 + +// schema versions +const avroSchemaVersion = 1 +const jsonSchemaVersion = 1 +const protobufSchemaVersion = 1 + +type schemaResponse struct { + Subject string `json:"subject"` + Version int `json:"version"` + Schema string `json:"schema"` + SchemaType *srclient.SchemaType `json:"schemaType"` + ID int `json:"id"` + References []srclient.Reference `json:"references"` +} + +type mockSchemaRegistry struct { + instance *httptest.Server + t *testing.T +} + +func newMockSchemaServer(t *testing.T) mockSchemaRegistry { + mcs := new(mockSchemaRegistry) + mcs.instance = httptest.NewServer(http.HandlerFunc(mcs.schemaRequestHandler)) + mcs.t = t + return *mcs +} + +func (s mockSchemaRegistry) close() { + s.instance.Close() +} + +func (s mockSchemaRegistry) schemaRequestHandler(rw http.ResponseWriter, req *http.Request) { + var responsePayload *schemaResponse + switch req.URL.String() { + case "/subjects/" + avroSubjectName + "/versions/latest", + "/subjects/" + avroSubjectName + "/versions/" + strconv.Itoa(avroSchemaVersion), + "/schemas/ids/" + strconv.Itoa(avroSchemaID): + avroSchemaType := srclient.Avro + responsePayload = &schemaResponse{ + Subject: avroSubjectName, + Version: avroSchemaVersion, + Schema: avroSchema, + ID: avroSchemaID, + SchemaType: &avroSchemaType, + } + + case "/subjects/" + jsonSubjectName + "/versions/latest", + "/subjects/" + jsonSubjectName + "/versions/" + strconv.Itoa(jsonSchemaVersion), + "/schemas/ids/" + strconv.Itoa(jsonSchemaID): + jsonSchemaType := srclient.Json + responsePayload = &schemaResponse{ + Subject: jsonSubjectName, + Version: jsonSchemaVersion, + Schema: jsonSchema, + ID: jsonSchemaID, + SchemaType: &jsonSchemaType, + } + + case "/subjects/" + protobufSubjectName + "/versions/latest", + "/subjects/" + protobufSubjectName + "/versions/" + strconv.Itoa(protobufSchemaVersion), + "/schemas/ids/" + strconv.Itoa(protobufSchemaID): + protobufSchemaType := srclient.Protobuf + responsePayload = &schemaResponse{ + Subject: protobufSubjectName, + Version: protobufSchemaVersion, + Schema: protobufSchema, + ID: protobufSchemaID, + SchemaType: &protobufSchemaType, + } + + default: + assert.Error(s.t, errors.New("unhandled request")) + } + + response, err := json.Marshal(responsePayload) + assert.Nil(s.t, err) + _, err = rw.Write(response) + assert.Nil(s.t, err) +} + +func (s mockSchemaRegistry) getServerURL() string { + return s.instance.URL +} diff --git a/server/kafka/packer_utils.go b/server/kafka/packer_utils.go new file mode 100644 index 0000000..27f8811 --- /dev/null +++ b/server/kafka/packer_utils.go @@ -0,0 +1,62 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import "unsafe" + +// Utility functions to use with cmap.ConcurrentMap +func packInt32InString(inputNum int32) string { + size := int(unsafe.Sizeof(inputNum)) + buffer := make([]byte, size) + for i := 0; i < size; i++ { + buffer[i] = *(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&inputNum)) + uintptr(i))) + } + + return string(buffer) +} + +func packIntInString(inputNum int) string { + size := int(unsafe.Sizeof(inputNum)) + buffer := make([]byte, size) + for i := 0; i < size; i++ { + buffer[i] = *(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&inputNum)) + uintptr(i))) + } + + return string(buffer) +} + +func unpackInt32FromString(inputString string) int32 { + outputValue := int32(0) + inputBytes := []byte(inputString) + size := len(inputBytes) + for i := 0; i < size; i++ { + *(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&outputValue)) + uintptr(i))) = inputBytes[i] + } + + return outputValue +} + +func unpackIntFromString(inputString string) int { + outputValue := 0 + inputBytes := []byte(inputString) + size := len(inputBytes) + for i := 0; i < size; i++ { + *(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&outputValue)) + uintptr(i))) = inputBytes[i] + } + + return outputValue +} diff --git a/server/kafka/packer_utils_test.go b/server/kafka/packer_utils_test.go new file mode 100644 index 0000000..608cdec --- /dev/null +++ b/server/kafka/packer_utils_test.go @@ -0,0 +1,39 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPackIntInString(t *testing.T) { + require.Equal(t, "\u0002\u0000\u0000\u0000", packInt32InString(2)) +} + +func TestUnpackIntFromString(t *testing.T) { + require.Equal(t, 2, unpackIntFromString("\u0002\u0000\u0000\u0000")) +} + +func TestPackInt32InString(t *testing.T) { + require.Equal(t, "\u0002\u0000\u0000\u0000", packInt32InString(2)) +} + +func TestUnpackInt32FromString(t *testing.T) { + require.Equal(t, int32(2), unpackInt32FromString("\u0002\u0000\u0000\u0000")) +} diff --git a/server/kafka/partitioner.go b/server/kafka/partitioner.go index b0a29e4..ef15547 100644 --- a/server/kafka/partitioner.go +++ b/server/kafka/partitioner.go @@ -17,8 +17,6 @@ package kafka import ( - "unsafe" - "github.com/Shopify/sarama" cmap "github.com/orcaman/concurrent-map" ) @@ -45,23 +43,23 @@ func (lbp *leastBytesPartitioner) RequiresConsistency() bool { func (lbp *leastBytesPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) { // if partition count has reduced, remove the old entries for i := int32(lbp.byteCounters.Count() - 1); i >= numPartitions; i-- { - lbp.byteCounters.Remove(packIntInString(i)) + lbp.byteCounters.Remove(packInt32InString(i)) } // if the size has increased, add counters for new partitions for i := int32(lbp.byteCounters.Count()); i < numPartitions; i++ { - lbp.byteCounters.Set(packIntInString(i), uint64(0)) + lbp.byteCounters.Set(packInt32InString(i), uint64(0)) } // find the entry in the byteCounters with min bytes - minIndex := findPartitionWithMinBytes(lbp.byteCounters) + minIndex := lbp.findPartitionWithMinBytes(lbp.byteCounters) minBytes, _ := lbp.byteCounters.Get(minIndex) lbp.byteCounters.Set(minIndex, minBytes.(uint64)+uint64(message.Key.Length())+uint64(message.Value.Length())) - return unpackIntFromString(minIndex), nil + return unpackInt32FromString(minIndex), nil } -func findPartitionWithMinBytes(counters cmap.ConcurrentMap) string { +func (lbp *leastBytesPartitioner) findPartitionWithMinBytes(counters cmap.ConcurrentMap) string { var minPartition string var minBytes uint64 @@ -75,24 +73,3 @@ func findPartitionWithMinBytes(counters cmap.ConcurrentMap) string { return minPartition } - -func packIntInString(inputNum int32) string { - size := int(unsafe.Sizeof(inputNum)) - buffer := make([]byte, size) - for i := 0; i < size; i++ { - buffer[i] = *(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&inputNum)) + uintptr(i))) - } - - return string(buffer) -} - -func unpackIntFromString(inputString string) int32 { - outputValue := int32(0) - inputBytes := []byte(inputString) - size := len(inputBytes) - for i := 0; i < size; i++ { - *(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&outputValue)) + uintptr(i))) = inputBytes[i] - } - - return outputValue -} diff --git a/server/kafka/partitioner_test.go b/server/kafka/partitioner_test.go index 7102800..6bedf3e 100644 --- a/server/kafka/partitioner_test.go +++ b/server/kafka/partitioner_test.go @@ -31,14 +31,7 @@ func TestFindPartitionWithMinBytes(t *testing.T) { testBytes.Set("3", uint64(100)) testBytes.Set("4", uint64(600)) - minPartition := findPartitionWithMinBytes(testBytes) + partitioner := new(leastBytesPartitioner) + minPartition := partitioner.findPartitionWithMinBytes(testBytes) require.Equal(t, "3", minPartition) } - -func TestPackIntInString(t *testing.T) { - require.Equal(t, "\u0002\u0000\u0000\u0000", packIntInString(2)) -} - -func TestUnpackIntFromString(t *testing.T) { - require.Equal(t, int32(2), unpackIntFromString("\u0002\u0000\u0000\u0000")) -} diff --git a/server/kafka/pb_deserializer.go b/server/kafka/pb_deserializer.go new file mode 100644 index 0000000..ca23d9d --- /dev/null +++ b/server/kafka/pb_deserializer.go @@ -0,0 +1,134 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/jhump/protoreflect/desc" + + "github.com/jhump/protoreflect/dynamic" + "github.com/riferrei/srclient" +) + +type pbDeserializer interface { + Deserialize(*srclient.Schema, []byte) ([]byte, error) +} + +type protobufDeserializer struct { + schemaManager protobufSchemaManager +} + +type protobufWrapper struct { + Schema *srclient.Schema + MessageTypeIndex []int64 + CleanPayload []byte +} + +func newDeserializer() pbDeserializer { + return &protobufDeserializer{ + schemaManager: newProtobufSchemaManager(), + } +} + +func (pd *protobufDeserializer) Deserialize(schema *srclient.Schema, payload []byte) ([]byte, error) { + wrapper, err := pd.decodeProtobufStructures(schema, payload) + if err != nil { + return nil, err + } + + // Get the message descriptor from the message + messageDescriptor, err := pd.getMessageDescriptorFromMessage(wrapper) + if err != nil { + return nil, err + } + + // Deserialize the message from write format into protobuf JSON + message := dynamic.NewMessage(messageDescriptor) + err = message.Unmarshal(wrapper.CleanPayload) + if err != nil { + return nil, err + } + + jsonBytes, err := message.MarshalJSON() + if err != nil { + return nil, err + } + + return jsonBytes, nil +} + +func (pd *protobufDeserializer) decodeProtobufStructures(schema *srclient.Schema, payload []byte) (*protobufWrapper, error) { + payloadReader := bytes.NewReader(payload) + arrayLength, err := binary.ReadVarint(payloadReader) + if err != nil { + return nil, fmt.Errorf("unable to read arrayLength: %w", err) + } + + messageTypeIDs := make([]int64, arrayLength) + // The array won't be sent if there is only one message type with default index 0 + if arrayLength == 0 { + messageTypeIDs = append(messageTypeIDs, 0) + } + + for i := int64(0); i < arrayLength; i++ { + messageID, err := binary.ReadVarint(payloadReader) + if err != nil { + return nil, fmt.Errorf("unable to read messageTypeID: %w", err) + } + messageTypeIDs[i] = messageID + } + + remainingPayload := make([]byte, payloadReader.Len()) + _, err = payloadReader.Read(remainingPayload) + + if err != nil { + return nil, fmt.Errorf("unable to read remaining payload: %w", err) + } + + return &protobufWrapper{ + Schema: schema, + MessageTypeIndex: messageTypeIDs, + CleanPayload: remainingPayload, + }, nil +} + +func (pd *protobufDeserializer) getMessageDescriptorFromMessage(wrapper *protobufWrapper) (*desc.MessageDescriptor, error) { + fd, err := schemaManager.getFileDescriptor(wrapper.Schema) + if err != nil { + return nil, err + } + + // Traverse through the message types until we find the right type as pointed to by message array index. This array + // of ints with each type indexed level by level. + messageTypes := fd.GetMessageTypes() + messageTypesLen := int64(len(messageTypes)) + var messageDescriptor *desc.MessageDescriptor + + for _, i := range wrapper.MessageTypeIndex { + if i > messageTypesLen { + // This should never happen + return nil, fmt.Errorf("failed to decode message type: message index is larger than message types array length") + } + messageDescriptor = messageTypes[i] + messageTypes = messageDescriptor.GetNestedMessageTypes() + } + + return messageDescriptor, nil +} diff --git a/server/kafka/pb_schema_manager.go b/server/kafka/pb_schema_manager.go new file mode 100644 index 0000000..e992cd9 --- /dev/null +++ b/server/kafka/pb_schema_manager.go @@ -0,0 +1,106 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import ( + "fmt" + "os" + "strconv" + "sync" + "time" + + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/desc/protoparse" + cmap "github.com/orcaman/concurrent-map" + "github.com/riferrei/srclient" +) + +var once sync.Once + +var ( + schemaManager protobufSchemaManager +) + +type protobufSchemaManager struct { + protobufSchemaIDtoFDMappings cmap.ConcurrentMap // schema id to desc.FileDescriptor map +} + +func newProtobufSchemaManager() protobufSchemaManager { + once.Do(func() { + schemaManager = protobufSchemaManager{ + protobufSchemaIDtoFDMappings: cmap.New(), + } + }) + + return schemaManager +} + +func (protobufSchemaManager) getFileDescriptor(schema *srclient.Schema) (*desc.FileDescriptor, error) { + packedSchemaID := packIntInString(schema.ID()) + if !schemaManager.protobufSchemaIDtoFDMappings.Has(packedSchemaID) { + errorReporter := func(err protoparse.ErrorWithPos) error { + position := err.GetPosition() + return fmt.Errorf("unable to parse file descriptor %s %d: %w", position.Filename, position.Line, err.Unwrap()) + } + + nanoTs := strconv.FormatInt(time.Now().UnixNano(), 10) + schemaFileName := strconv.Itoa(schema.ID()) + "-" + nanoTs + ".proto" + schemaFile, err := os.CreateTemp("", schemaFileName) + if err != nil { + return nil, err + } + _, err = schemaFile.WriteString(schema.Schema()) + if err != nil { + return nil, err + } + err = schemaFile.Close() + if err != nil { + return nil, err + } + defer os.Remove(schemaFile.Name()) + + schemaMap := make(map[string]string, 1) + schemaMap[schemaFileName] = schema.Schema() + var schemaFilePaths []string + schemaFilePaths = append(schemaFilePaths, schemaFileName) + protobufParser := &protoparse.Parser{ + Accessor: protoparse.FileContentsFromMap(schemaMap), + ImportPaths: []string{"."}, + InferImportPaths: true, + ValidateUnlinkedFiles: true, + ErrorReporter: errorReporter, + } + + fds, err := protobufParser.ParseFiles(schemaFilePaths...) + if err != nil { + return nil, err + } + schemaManager.protobufSchemaIDtoFDMappings.Set(packedSchemaID, fds[0]) + } + + fd, _ := schemaManager.protobufSchemaIDtoFDMappings.Get(packedSchemaID) + return fd.(*desc.FileDescriptor), nil +} + +func (protobufSchemaManager) getMessageDescriptor(schema *srclient.Schema) (*desc.MessageDescriptor, error) { + fd, err := schemaManager.getFileDescriptor(schema) + if err != nil { + return nil, err + } + + return fd.GetMessageTypes()[0], nil +} diff --git a/server/kafka/pb_serializer.go b/server/kafka/pb_serializer.go new file mode 100644 index 0000000..59bc50d --- /dev/null +++ b/server/kafka/pb_serializer.go @@ -0,0 +1,105 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import ( + "bytes" + "encoding/binary" + "strings" + "unsafe" + + "github.com/jhump/protoreflect/dynamic" + "github.com/riferrei/srclient" +) + +type pbSerializer interface { + Serialize(*srclient.Schema, []byte) ([]byte, error) +} + +type protobufSerializer struct { + schemaManager protobufSchemaManager +} + +func newSerializer() pbSerializer { + return &protobufSerializer{ + schemaManager: newProtobufSchemaManager(), + } +} + +func (ps *protobufSerializer) Serialize(schema *srclient.Schema, payload []byte) ([]byte, error) { + // Get the message descriptor from cache or build it + messageDescriptor, err := schemaManager.getMessageDescriptor(schema) + if err != nil { + return nil, err + } + + // Parse the protobuf json sent as payload and convert it into wire format + message := dynamic.NewMessage(messageDescriptor) + err = message.UnmarshalJSON(payload) + if err != nil { + return nil, err + } + + indexBytes, err := ps.buildMessageIndexes(schema, messageDescriptor.GetFullyQualifiedName()) + if err != nil { + return nil, err + } + + protoBytes, err := message.Marshal() + if err != nil { + return nil, err + } + + serializedPayload := make([]byte, len(indexBytes)+len(protoBytes)+16) // 16 extra bytes for the array length + binary.PutVarint(serializedPayload, int64(len(indexBytes)/int(unsafe.Sizeof(int32(0))))) + if len(indexBytes) > 0 { + serializedPayload = append(serializedPayload, indexBytes...) + } + serializedPayload = append(serializedPayload, protoBytes...) + return serializedPayload, nil +} + +func (ps *protobufSerializer) buildMessageIndexes(schema *srclient.Schema, name string) ([]byte, error) { + fileDescriptor, err := schemaManager.getFileDescriptor(schema) + if err != nil { + return nil, err + } + + parts := strings.Split(name, ".") + messageTypes := fileDescriptor.GetMessageTypes() + + var indexes []byte + for _, part := range parts { + i := int32(0) + for _, mType := range messageTypes { + if mType.GetName() == part { + indexArr := make([]byte, 4) + indexBuf := bytes.NewBuffer(indexArr) + err = binary.Write(indexBuf, binary.BigEndian, &i) + if err != nil { + return nil, err + } + + indexes = append(indexes, indexArr...) + break + } + i++ + } + } + + return indexes, nil +} diff --git a/server/kafka/producer.go b/server/kafka/producer.go index 666ff5a..85dc862 100644 --- a/server/kafka/producer.go +++ b/server/kafka/producer.go @@ -18,10 +18,14 @@ package kafka import ( "crypto/tls" + "encoding/binary" "errors" "fmt" + "strings" "time" + "github.com/riferrei/srclient" + "github.com/Shopify/sarama" "github.com/nats-io/nats-kafka/server/conf" ) @@ -39,6 +43,13 @@ type saramaProducer struct { saslOn bool tlsOn bool tlsSkipVerify bool + + schemaRegistryOn bool + schemaRegistryClient srclient.ISchemaRegistryClient + subjectName string + schemaVersion int + schemaType srclient.SchemaType + pbSerializer pbSerializer } // IsTopicExist returns whether an error is caused by a topic already existing. @@ -83,13 +94,34 @@ func NewProducer(cc conf.ConnectorConfig, bc conf.NATSKafkaBridgeConfig, topic s return nil, err } - return &saramaProducer{ + prod := &saramaProducer{ sp: sp, topic: topic, saslOn: sc.Net.SASL.Enable, tlsOn: sc.Net.TLS.Enable, tlsSkipVerify: cc.SASL.InsecureSkipVerify, - }, nil + } + + // If schema registry url and subject name both are set, enable schema registry integration + if cc.SchemaRegistryURL != "" && cc.SubjectName != "" { + prod.schemaRegistryClient = srclient.CreateSchemaRegistryClient(cc.SchemaRegistryURL) + prod.subjectName = cc.SubjectName + prod.schemaVersion = cc.SchemaVersion + + switch strings.ToUpper(cc.SchemaType) { + case srclient.Json.String(): + prod.schemaType = srclient.Json + case srclient.Protobuf.String(): + prod.schemaType = srclient.Protobuf + prod.pbSerializer = newSerializer() + default: + prod.schemaType = srclient.Avro + } + + prod.schemaRegistryOn = true + } + + return prod, nil } // NetInfo returns information about whether SASL and TLS are enabled. @@ -112,9 +144,19 @@ func (p *saramaProducer) NetInfo() string { // Write sends an outgoing message. func (p *saramaProducer) Write(m Message) error { + var valueEncoder sarama.Encoder + if p.schemaRegistryOn { + encodedValue, err := p.serializePayload(m.Value) + if err != nil { + return err + } + valueEncoder = sarama.ByteEncoder(encodedValue) + } else { + valueEncoder = sarama.StringEncoder(m.Value) + } _, _, err := p.sp.SendMessage(&sarama.ProducerMessage{ Topic: p.topic, - Value: sarama.StringEncoder(m.Value), + Value: valueEncoder, Key: sarama.StringEncoder(m.Key), }) return err @@ -143,3 +185,58 @@ func (p *erroredProducer) Write(m Message) error { func (p *erroredProducer) Close() error { return p.err } + +// Retrieve the schema from the schema registry and serialize the message. This method expects data in Avro JSON format +// for cross language compatibility. +func (p *saramaProducer) serializePayload(jsonPayload []byte) ([]byte, error) { + var schema *srclient.Schema + var err error + if p.schemaRegistryOn && p.schemaVersion != 0 { + schema, err = p.schemaRegistryClient.GetSchemaByVersion(p.subjectName, p.schemaVersion) + } else { + // Version is not set, fetch and use the latest + schema, err = p.schemaRegistryClient.GetLatestSchema(p.subjectName) + } + + if err != nil { + return nil, err + } + + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID())) + + var valueBytes []byte + switch p.schemaType { + case srclient.Avro: + valueBytes, err = p.serializeAvro(schema, jsonPayload) + case srclient.Json: + valueBytes = jsonPayload + case srclient.Protobuf: + valueBytes, err = p.pbSerializer.Serialize(schema, jsonPayload) + } + + if err != nil { + return nil, err + } + + var recordValue []byte + recordValue = append(recordValue, byte(0)) + recordValue = append(recordValue, schemaIDBytes...) + recordValue = append(recordValue, valueBytes...) + + return recordValue, nil +} + +func (p *saramaProducer) serializeAvro(schema *srclient.Schema, payload []byte) ([]byte, error) { + codec := schema.Codec() + native, _, err := codec.NativeFromTextual(payload) + if err != nil { + return nil, fmt.Errorf("unable to serialize json: %w", err) + } + value, err := codec.BinaryFromNative(nil, native) + if err != nil { + return nil, fmt.Errorf("failed to convert to avro: %w", err) + } + + return value, err +} diff --git a/server/kafka/producer_test.go b/server/kafka/producer_test.go new file mode 100644 index 0000000..7303dd0 --- /dev/null +++ b/server/kafka/producer_test.go @@ -0,0 +1,73 @@ +/* + * Copyright 2019-2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka + +import ( + "testing" + + "github.com/riferrei/srclient" + "github.com/stretchr/testify/assert" +) + +func TestSerializePayloadAvro(t *testing.T) { + server := newMockSchemaServer(t) + defer server.close() + + producer := &saramaProducer{ + schemaRegistryOn: true, + schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()), + subjectName: avroSubjectName, + schemaVersion: avroSchemaVersion, + schemaType: srclient.Avro, + } + + _, err := producer.serializePayload([]byte(avroMessage)) + assert.Nil(t, err) +} + +func TestSerializePayloadJson(t *testing.T) { + server := newMockSchemaServer(t) + defer server.close() + + producer := &saramaProducer{ + schemaRegistryOn: true, + schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()), + subjectName: jsonSubjectName, + schemaVersion: jsonSchemaVersion, + schemaType: srclient.Json, + } + + _, err := producer.serializePayload([]byte(jsonMessage)) + assert.Nil(t, err) +} + +func TestSerializePayloadProtobuf(t *testing.T) { + server := newMockSchemaServer(t) + defer server.close() + + producer := &saramaProducer{ + schemaRegistryOn: true, + schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()), + subjectName: protobufSubjectName, + schemaVersion: protobufSchemaVersion, + schemaType: srclient.Protobuf, + pbSerializer: newSerializer(), + } + + _, err := producer.serializePayload([]byte(protobufMessage)) + assert.Nil(t, err) +}