Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Two Way Handling for Standard Json #249

Merged
merged 4 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,98 @@ func NewCodec(schemaSpecification string) (*Codec, error) {
})
}

// NewCodecForStandardJSON returns a codec that uses a special union
// processing code that allows normal json to be ingested via an
// avro schema, by inferring the "type" intended for union types.
//
// This is the one-way code to get such json into the avro system
// and the deserialization is not supported in this codec - its
// json into avro-json one-way and one-way only for this codec.
//
// The "type" inference is done by using the types specified as
// potentially acceptable types for the union, and trying to
// unpack the incomin json into each of the specified types for
// the union type. See union.go +/Standard JSON/ for a general
// description of the problem and details of the solution
// are in union.go +/nativeAvroFromTextualJson/
//
// For a general description of a codex seen the comment for NewCodec
// above.
//
// The following is the exact same schema used in the above
// code for NewCodec:
//
// codec, err := goavro.NewCodecForStandardJSON(`
// {
// "type": "record",
// "name": "LongList",
// "fields" : [
// {"name": "next", "type": ["null", "LongList"], "default": null}
// ]
// }`)
// if err != nil {
// fmt.Println(err)
// }
//
// The above will take json of this sort:
//
// {"next": null}
//
// {"next":{"next":null}}
//
// {"next":{"next":{"next":null}}}
//
// For more examples see the test cases in union_test.go
func NewCodecForStandardJSON(schemaSpecification string) (*Codec, error) {
return NewCodecFrom(schemaSpecification, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceJSON,
buildCodecForTypeDescribedBySliceOneWayJson,
})
}

// NewCodecForStandardJSONOneWay is an alias for NewCodecForStandardJSON
// added to make the transition to two-way json handling more smooth
//
// This will unambiguously provide OneWay avro encoding for standard
// internet json. This takes in internet json, and brings it into
// the avro world, but the deserialization retains the unique
// form of normal avro-friendly json where unions have their
// types types specified in stream like this example from
// the official docs // https://avro.apache.org/docs/1.11.1/api/c/
//
// `{"string": "Follow your bliss."}`
//
// To be clear this means the incoming json string:
//
// "Follow your bliss."
//
// would deserialize according to the avro-json expectations to:
//
// `{"string": "Follow your bliss."}`
//
// To get full two-way support see the below NewCodecForStandardJSONFull
func NewCodecForStandardJSONOneWay(schemaSpecification string) (*Codec, error) {
return NewCodecForStandardJSON(schemaSpecification)
}

// NewCodecForStandardJSONFull provides full serialization/deserialization
// for json that meets the expectations of regular internet json, viewed as
// something distinct from avro-json which has special handling for union
// types. For details see the above comments.
//
// With this `codec` you can expect to see a json string like this:
//
// "Follow your bliss."
//
// to deserialize into the same json structure
//
// "Follow your bliss."
func NewCodecForStandardJSONFull(schemaSpecification string) (*Codec, error) {
return NewCodecFrom(schemaSpecification, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceTwoWayJson,
})
}

Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/linkedin/goavro/v2

go 1.12

require github.com/golang/snappy v0.0.1
require (
github.com/golang/snappy v0.0.1
github.com/stretchr/testify v1.7.5
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,16 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
45 changes: 43 additions & 2 deletions text_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ package goavro

import (
"bytes"
"encoding/json"
"fmt"
"math"
"testing"

"github.com/stretchr/testify/assert"
)

func testTextDecodeFail(t *testing.T, schema string, buf []byte, errorMessage string) {
Expand Down Expand Up @@ -63,18 +66,56 @@ func testTextDecodePass(t *testing.T, schema string, datum interface{}, encoded
}
toNativeAndCompare(t, schema, datum, encoded, codec)
}
func testJSONDecodePass(t *testing.T, schema string, datum interface{}, encoded []byte) {
func testJsonDecodePass(t *testing.T, schema string, datum interface{}, encoded []byte) {
t.Helper()
codec, err := NewCodecFrom(schema, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceJSON,
buildCodecForTypeDescribedBySliceOneWayJson,
})
if err != nil {
t.Fatalf("schema: %s; %s", schema, err)
}
toNativeAndCompare(t, schema, datum, encoded, codec)
}
func testNativeToTextualJsonPass(t *testing.T, schema string, datum interface{}, encoded []byte) {
t.Helper()
codec, err := NewCodecFrom(schema, &codecBuilder{
buildCodecForTypeDescribedByMap,
buildCodecForTypeDescribedByString,
buildCodecForTypeDescribedBySliceTwoWayJson,
})
if err != nil {
t.Fatalf("schema: %s; %s", schema, err)
}
toTextualAndCompare(t, schema, datum, encoded, codec)
}

func toTextualAndCompare(t *testing.T, schema string, datum interface{}, expected []byte, codec *Codec) {
t.Helper()
decoded, err := codec.TextualFromNative(nil, datum)
if err != nil {
t.Fatalf("schema: %s; %s", schema, err)
}

// do extra stuff to to the challenge equality of maps
var want interface{}

if err := json.Unmarshal(expected, &want); err != nil {
t.Errorf("Could not unmarshal the expected data into a go struct:%#v:", string(expected))
}

var got interface{}

if err := json.Unmarshal(decoded, &got); err != nil {
t.Errorf("Could not unmarshal the received data into a go struct:%#v:", string(decoded))
}

if !assert.Equal(t, want, got) {
t.Errorf("GOT: %v; WANT: %v", string(decoded), string(expected))
}
}

func toNativeAndCompare(t *testing.T, schema string, datum interface{}, encoded []byte, codec *Codec) {
t.Helper()
decoded, remaining, err := codec.NativeFromTextual(encoded)
Expand Down
79 changes: 67 additions & 12 deletions union.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func makeCodecInfo(st map[string]*Codec, enclosingNamespace string, schemaArray

}

func nativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
func unionNativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {

return func(buf []byte) (interface{}, []byte, error) {
var decoded interface{}
Expand All @@ -114,7 +114,7 @@ func nativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, erro
return Union(cr.allowedTypes[index], decoded), buf, nil
}
}
func binaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
func unionBinaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
Expand All @@ -141,7 +141,7 @@ func binaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte
return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func nativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
func unionNativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) {
return func(buf []byte) (interface{}, []byte, error) {
if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) {
if _, ok := cr.indexFromName["null"]; ok {
Expand All @@ -159,7 +159,7 @@ func nativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, err
return datum, buf, nil
}
}
func textualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
func unionTextualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
Expand Down Expand Up @@ -196,6 +196,37 @@ func textualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byt
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func textualJsonFromNativeAvro(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) {
return func(buf []byte, datum interface{}) ([]byte, error) {
switch v := datum.(type) {
case nil:
_, ok := cr.indexFromName["null"]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
return append(buf, "null"...), nil
case map[string]interface{}:
if len(v) != 1 {
return nil, fmt.Errorf("cannot encode textual union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
// will execute exactly once
for key, value := range v {
index, ok := cr.indexFromName[key]
if !ok {
return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum)
}
var err error
c := cr.codecFromIndex[index]
buf, err = c.textualFromNative(buf, value)
if err != nil {
return nil, fmt.Errorf("cannot encode textual union: %s", err)
}
return buf, nil
}
}
return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum)
}
}
func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
Expand All @@ -213,10 +244,10 @@ func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,

typeName: &name{"union", nullNamespace},
nativeFromBinary: nativeFromBinary(&cr),
binaryFromNative: binaryFromNative(&cr),
nativeFromTextual: nativeFromTextual(&cr),
textualFromNative: textualFromNative(&cr),
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: unionNativeFromTextual(&cr),
textualFromNative: unionTextualFromNative(&cr),
}
return rv, nil
}
Expand Down Expand Up @@ -246,7 +277,31 @@ func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace
// and then it will remain avro-json object
// avro data is not serialized back into standard json
// the data goes to avro-json and stays that way
func buildCodecForTypeDescribedBySliceJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
func buildCodecForTypeDescribedBySliceOneWayJson(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}

cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb)
if err != nil {
return nil, err
}

rv := &Codec{
// NOTE: To support record field default values, union schema set to the
// type name of first member
// TODO: add/change to schemaCanonical below
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,

typeName: &name{"union", nullNamespace},
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJson(&cr),
textualFromNative: unionTextualFromNative(&cr),
}
return rv, nil
}
func buildCodecForTypeDescribedBySliceTwoWayJson(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) {
if len(schemaArray) == 0 {
return nil, errors.New("Union ought to have one or more members")
}
Expand All @@ -263,10 +318,10 @@ func buildCodecForTypeDescribedBySliceJSON(st map[string]*Codec, enclosingNamesp
schemaOriginal: cr.codecFromIndex[0].typeName.fullName,

typeName: &name{"union", nullNamespace},
nativeFromBinary: nativeFromBinary(&cr),
binaryFromNative: binaryFromNative(&cr),
nativeFromBinary: unionNativeFromBinary(&cr),
binaryFromNative: unionBinaryFromNative(&cr),
nativeFromTextual: nativeAvroFromTextualJson(&cr),
textualFromNative: textualFromNative(&cr),
textualFromNative: textualJsonFromNativeAvro(&cr),
}
return rv, nil
}
Expand Down
Loading