Skip to content

Commit 63c8e61

Browse files
authored
Add avro_schema_nested field to schema_registry_decode / encode (#90)
* add avro nested schema functionality Signed-off-by: Jem Davies <[email protected]>
1 parent 352a5d0 commit 63c8e61

10 files changed

+479
-38
lines changed

internal/impl/confluent/processor_schema_registry_decode.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
4848
Field(service.NewBoolField("avro_raw_json").
4949
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be decoded as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between the standard json and avro json.").
5050
Advanced().Default(false)).
51+
Field(service.NewBoolField("avro_nested_schemas").
52+
Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)").
53+
Advanced().Default(false).Version("1.2.0")).
5154
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))
5255

5356
for _, f := range service.NewHTTPRequestAuthSignerFields() {
@@ -71,8 +74,9 @@ func init() {
7174
//------------------------------------------------------------------------------
7275

7376
type schemaRegistryDecoder struct {
74-
avroRawJSON bool
75-
client *schemaRegistryClient
77+
avroRawJSON bool
78+
avroNestedSchemas bool
79+
client *schemaRegistryClient
7680

7781
schemas map[int]*cachedSchemaDecoder
7882
cacheMut sync.RWMutex
@@ -100,22 +104,28 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
100104
if err != nil {
101105
return nil, err
102106
}
103-
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr)
107+
avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas")
108+
if err != nil {
109+
return nil, err
110+
}
111+
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, avroNestedSchemas, mgr)
104112
}
105113

106114
func newSchemaRegistryDecoder(
107115
urlStr string,
108116
reqSigner func(f fs.FS, req *http.Request) error,
109117
tlsConf *tls.Config,
110118
avroRawJSON bool,
119+
avroNestedSchemas bool,
111120
mgr *service.Resources,
112121
) (*schemaRegistryDecoder, error) {
113122
s := &schemaRegistryDecoder{
114-
avroRawJSON: avroRawJSON,
115-
schemas: map[int]*cachedSchemaDecoder{},
116-
shutSig: shutdown.NewSignaller(),
117-
logger: mgr.Logger(),
118-
mgr: mgr,
123+
avroRawJSON: avroRawJSON,
124+
avroNestedSchemas: avroNestedSchemas,
125+
schemas: map[int]*cachedSchemaDecoder{},
126+
shutSig: shutdown.NewSignaller(),
127+
logger: mgr.Logger(),
128+
mgr: mgr,
119129
}
120130
var err error
121131
if s.client, err = newSchemaRegistryClient(urlStr, reqSigner, tlsConf, mgr); err != nil {

internal/impl/confluent/processor_schema_registry_decode_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
227227
return nil, nil
228228
})
229229

230-
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
230+
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
231231
require.NoError(t, err)
232232

233233
tests := []struct {
@@ -330,7 +330,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
330330
return nil, nil
331331
})
332332

333-
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources())
333+
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, false, service.MockResources())
334334
require.NoError(t, err)
335335

336336
tests := []struct {
@@ -408,7 +408,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) {
408408
return nil, fmt.Errorf("nope")
409409
})
410410

411-
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
411+
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
412412
require.NoError(t, err)
413413
require.NoError(t, decoder.Close(context.Background()))
414414

@@ -455,7 +455,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
455455
return nil, nil
456456
})
457457

458-
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
458+
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
459459
require.NoError(t, err)
460460

461461
tests := []struct {
@@ -518,7 +518,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) {
518518
return nil, nil
519519
})
520520

521-
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources())
521+
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, false, service.MockResources())
522522
require.NoError(t, err)
523523

524524
tests := []struct {

internal/impl/confluent/processor_schema_registry_encode.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ We will be considering alternative approaches in future so please [get in touch]
7070
Example("1h")).
7171
Field(service.NewBoolField("avro_raw_json").
7272
Description("Whether messages encoded in Avro format should be parsed as normal JSON (\"json that meets the expectations of regular internet json\") rather than [Avro JSON](https://avro.apache.org/docs/current/specification/_print/#json-encoding). If `true` the schema returned from the subject should be parsed as [standard json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull) instead of as [avro json](https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec). There is a [comment in goavro](https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249), the [underlining library used for avro serialization](https://github.com/linkedin/goavro), that explains in more detail the difference between standard json and avro json.").
73-
Advanced().Default(false).Version("1.0.0"))
73+
Advanced().Default(false).Version("1.0.0")).
74+
Field(service.NewBoolField("avro_nested_schemas").
75+
Description("Whether Avro Schemas are nested. If true bento will resolve schema references. (Up to a maximum depth of 100)").
76+
Advanced().Default(false).Version("1.2.0"))
7477

7578
for _, f := range service.NewHTTPRequestAuthSignerFields() {
7679
spec = spec.Field(f.Version("1.0.0"))
@@ -96,6 +99,7 @@ type schemaRegistryEncoder struct {
9699
client *schemaRegistryClient
97100
subject *service.InterpolatedString
98101
avroRawJSON bool
102+
avroNestedSchemas bool
99103
schemaRefreshAfter time.Duration
100104

101105
schemas map[string]cachedSchemaEncoder
@@ -121,6 +125,10 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service
121125
if err != nil {
122126
return nil, err
123127
}
128+
avroNestedSchemas, err := conf.FieldBool("avro_nested_schemas")
129+
if err != nil {
130+
return nil, err
131+
}
124132
refreshPeriodStr, err := conf.FieldString("refresh_period")
125133
if err != nil {
126134
return nil, err
@@ -141,7 +149,7 @@ func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, mgr *service
141149
if err != nil {
142150
return nil, err
143151
}
144-
return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, refreshPeriod, refreshTicker, mgr)
152+
return newSchemaRegistryEncoder(urlStr, authSigner, tlsConf, subject, avroRawJSON, avroNestedSchemas, refreshPeriod, refreshTicker, mgr)
145153
}
146154

147155
func newSchemaRegistryEncoder(
@@ -150,12 +158,14 @@ func newSchemaRegistryEncoder(
150158
tlsConf *tls.Config,
151159
subject *service.InterpolatedString,
152160
avroRawJSON bool,
161+
avroNestedSchemas bool,
153162
schemaRefreshAfter, schemaRefreshTicker time.Duration,
154163
mgr *service.Resources,
155164
) (*schemaRegistryEncoder, error) {
156165
s := &schemaRegistryEncoder{
157166
subject: subject,
158167
avroRawJSON: avroRawJSON,
168+
avroNestedSchemas: avroNestedSchemas,
159169
schemaRefreshAfter: schemaRefreshAfter,
160170
schemas: map[string]cachedSchemaEncoder{},
161171
shutSig: shutdown.NewSignaller(),

internal/impl/confluent/processor_schema_registry_encode_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func TestSchemaRegistryEncodeAvro(t *testing.T) {
129129
subj, err := service.NewInterpolatedString("foo")
130130
require.NoError(t, err)
131131

132-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
132+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
133133
require.NoError(t, err)
134134

135135
tests := []struct {
@@ -211,7 +211,7 @@ func TestSchemaRegistryEncodeAvroRawJSON(t *testing.T) {
211211
subj, err := service.NewInterpolatedString("foo")
212212
require.NoError(t, err)
213213

214-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
214+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources())
215215
require.NoError(t, err)
216216

217217
tests := []struct {
@@ -293,7 +293,7 @@ func TestSchemaRegistryEncodeAvroLogicalTypes(t *testing.T) {
293293
subj, err := service.NewInterpolatedString("foo")
294294
require.NoError(t, err)
295295

296-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
296+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
297297
require.NoError(t, err)
298298

299299
tests := []struct {
@@ -370,7 +370,7 @@ func TestSchemaRegistryEncodeAvroRawJSONLogicalTypes(t *testing.T) {
370370
subj, err := service.NewInterpolatedString("foo")
371371
require.NoError(t, err)
372372

373-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources())
373+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, false, time.Minute*10, time.Minute, service.MockResources())
374374
require.NoError(t, err)
375375

376376
tests := []struct {
@@ -435,7 +435,7 @@ func TestSchemaRegistryEncodeClearExpired(t *testing.T) {
435435
subj, err := service.NewInterpolatedString("foo")
436436
require.NoError(t, err)
437437

438-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
438+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
439439
require.NoError(t, err)
440440
require.NoError(t, encoder.Close(context.Background()))
441441

@@ -496,7 +496,7 @@ func TestSchemaRegistryEncodeRefresh(t *testing.T) {
496496
subj, err := service.NewInterpolatedString("foo")
497497
require.NoError(t, err)
498498

499-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
499+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
500500
require.NoError(t, err)
501501
require.NoError(t, encoder.Close(context.Background()))
502502

@@ -598,7 +598,7 @@ func TestSchemaRegistryEncodeJSON(t *testing.T) {
598598
subj, err := service.NewInterpolatedString("foo")
599599
require.NoError(t, err)
600600

601-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
601+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Minute*10, time.Minute, service.MockResources())
602602
require.NoError(t, err)
603603

604604
tests := []struct {
@@ -691,7 +691,7 @@ func TestSchemaRegistryEncodeJSONConstantRefreshes(t *testing.T) {
691691
subj, err := service.NewInterpolatedString("foo")
692692
require.NoError(t, err)
693693

694-
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Millisecond, time.Millisecond*10, service.MockResources())
694+
encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, false, time.Millisecond, time.Millisecond*10, service.MockResources())
695695
require.NoError(t, err)
696696

697697
input := `{"Address":{"City":"foo","State":"bar"},"Name":"foo","MaybeHobby":"dancing"}`

internal/impl/confluent/serde_avro.go

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,27 @@ package confluent
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
8+
"strings"
79

810
"github.com/linkedin/goavro/v2"
911

1012
"github.com/warpstreamlabs/bento/public/service"
1113
)
1214

15+
type field struct {
16+
Name string `json:"name"`
17+
Type string `json:"type"`
18+
}
19+
20+
type reference struct {
21+
Type string `json:"type"`
22+
Name string `json:"name"`
23+
Namespace string `json:"namespace"`
24+
Fields []field `json:"fields"`
25+
}
26+
1327
func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) {
1428
if len(info.References) == 0 {
1529
return info.Schema, nil
@@ -45,10 +59,76 @@ func resolveAvroReferences(ctx context.Context, client *schemaRegistryClient, in
4559
return string(schemaHydratedBytes), nil
4660
}
4761

48-
func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) {
49-
schema, err := resolveAvroReferences(ctx, s.client, info)
62+
func resolveAvroReferencesNested(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (string, error) {
63+
if len(info.References) == 0 {
64+
return info.Schema, nil
65+
}
66+
67+
refsMap := map[string]string{}
68+
if err := client.WalkReferences(ctx, info.References, func(ctx context.Context, name string, info SchemaInfo) error {
69+
refsMap[name] = info.Schema
70+
return nil
71+
}); err != nil {
72+
return "", err
73+
}
74+
75+
var schemaDry any
76+
if err := json.Unmarshal([]byte(info.Schema), &schemaDry); err != nil {
77+
return "", fmt.Errorf("failed to parse root schema as enum: %w", err)
78+
}
79+
80+
schemaRaw, _ := json.Marshal(schemaDry)
81+
schemaString := string(schemaRaw)
82+
83+
var schema reference
84+
err := json.Unmarshal(schemaRaw, &schema)
5085
if err != nil {
51-
return nil, err
86+
return "", fmt.Errorf("failed to unmarshal root schema: %w", err)
87+
}
88+
89+
var ref reference
90+
maxIterations := 100
91+
92+
for i := 0; i <= maxIterations; i++ {
93+
if i == maxIterations {
94+
return "", errors.New("maximum iteration limit reached trying to resolve Avro references: possible circular dependency detected")
95+
}
96+
initialSchemaDry := schemaString
97+
98+
for k, v := range refsMap {
99+
err := json.Unmarshal([]byte(refsMap[k]), &ref)
100+
if err != nil {
101+
return "", fmt.Errorf("failed to unmarshal refsMap value %s: %w", k, err)
102+
}
103+
104+
if schema.Namespace == ref.Namespace {
105+
schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Name+`"`, `"type":`+v)
106+
} else {
107+
schemaString = strings.ReplaceAll(schemaString, `"type":"`+ref.Namespace+`.`+ref.Name+`"`, `"type":`+v)
108+
}
109+
}
110+
if schemaString == initialSchemaDry {
111+
break
112+
}
113+
}
114+
115+
return schemaString, nil
116+
}
117+
118+
func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) {
119+
var schema string
120+
var err error
121+
122+
if s.avroNestedSchemas {
123+
schema, err = resolveAvroReferencesNested(ctx, s.client, info)
124+
if err != nil {
125+
return nil, err
126+
}
127+
} else {
128+
schema, err = resolveAvroReferences(ctx, s.client, info)
129+
if err != nil {
130+
return nil, err
131+
}
52132
}
53133

54134
var codec *goavro.Codec
@@ -84,9 +164,19 @@ func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, info SchemaI
84164
}
85165

86166
func (s *schemaRegistryDecoder) getAvroDecoder(ctx context.Context, info SchemaInfo) (schemaDecoder, error) {
87-
schema, err := resolveAvroReferences(ctx, s.client, info)
88-
if err != nil {
89-
return nil, err
167+
var schema string
168+
var err error
169+
170+
if s.avroNestedSchemas {
171+
schema, err = resolveAvroReferencesNested(ctx, s.client, info)
172+
if err != nil {
173+
return nil, err
174+
}
175+
} else {
176+
schema, err = resolveAvroReferences(ctx, s.client, info)
177+
if err != nil {
178+
return nil, err
179+
}
90180
}
91181

92182
var codec *goavro.Codec

0 commit comments

Comments
 (0)