-
Notifications
You must be signed in to change notification settings - Fork 95
/
reader_generic.go
163 lines (151 loc) · 3.49 KB
/
reader_generic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package avro
import (
"fmt"
"reflect"
"time"
)
// ReadNext reads the next Avro element as a generic interface.
func (r *Reader) ReadNext(schema Schema) any {
var ls LogicalSchema
lts, ok := schema.(LogicalTypeSchema)
if ok {
ls = lts.Logical()
}
switch schema.Type() {
case Boolean:
return r.ReadBool()
case Int:
if ls != nil {
switch ls.Type() {
case Date:
i := r.ReadInt()
sec := int64(i) * int64(24*time.Hour/time.Second)
return time.Unix(sec, 0).UTC()
case TimeMillis:
return time.Duration(r.ReadInt()) * time.Millisecond
}
}
return int(r.ReadInt())
case Long:
if ls != nil {
switch ls.Type() {
case TimeMicros:
return time.Duration(r.ReadLong()) * time.Microsecond
case TimestampMillis:
i := r.ReadLong()
sec := i / 1e3
nsec := (i - sec*1e3) * 1e6
return time.Unix(sec, nsec).UTC()
case TimestampMicros:
i := r.ReadLong()
sec := i / 1e6
nsec := (i - sec*1e6) * 1e3
return time.Unix(sec, nsec).UTC()
}
}
return r.ReadLong()
case Float:
return r.ReadFloat()
case Double:
return r.ReadDouble()
case String:
return r.ReadString()
case Bytes:
if ls != nil && ls.Type() == Decimal {
dec := ls.(*DecimalLogicalSchema)
return ratFromBytes(r.ReadBytes(), dec.Scale())
}
return r.ReadBytes()
case Record:
fields := schema.(*RecordSchema).Fields()
obj := make(map[string]any, len(fields))
for _, field := range fields {
obj[field.Name()] = r.ReadNext(field.Type())
}
return obj
case Ref:
return r.ReadNext(schema.(*RefSchema).Schema())
case Enum:
symbols := schema.(*EnumSchema).Symbols()
idx := int(r.ReadInt())
if idx < 0 || idx >= len(symbols) {
r.ReportError("Read", "unknown enum symbol")
return nil
}
return symbols[idx]
case Array:
arr := []any{}
r.ReadArrayCB(func(r *Reader) bool {
elem := r.ReadNext(schema.(*ArraySchema).Items())
arr = append(arr, elem)
return true
})
return arr
case Map:
obj := map[string]any{}
r.ReadMapCB(func(r *Reader, field string) bool {
elem := r.ReadNext(schema.(*MapSchema).Values())
obj[field] = elem
return true
})
return obj
case Union:
types := schema.(*UnionSchema).Types()
idx := int(r.ReadLong())
if idx < 0 || idx > len(types)-1 {
r.ReportError("Read", "unknown union type")
return nil
}
schema = types[idx]
if schema.Type() == Null {
return nil
}
key := schemaTypeName(schema)
obj := map[string]any{}
obj[key] = r.ReadNext(types[idx])
return obj
case Fixed:
size := schema.(*FixedSchema).Size()
obj := make([]byte, size)
r.Read(obj)
if ls != nil && ls.Type() == Decimal {
dec := ls.(*DecimalLogicalSchema)
return ratFromBytes(obj, dec.Scale())
}
return byteSliceToArray(obj, size)
default:
r.ReportError("Read", fmt.Sprintf("unexpected schema type: %v", schema.Type()))
return nil
}
}
// ReadArrayCB reads an array with a callback per item.
func (r *Reader) ReadArrayCB(fn func(*Reader) bool) {
for {
l, _ := r.ReadBlockHeader()
if l == 0 {
break
}
for range l {
fn(r)
}
}
}
// ReadMapCB reads an array with a callback per item.
func (r *Reader) ReadMapCB(fn func(*Reader, string) bool) {
for {
l, _ := r.ReadBlockHeader()
if l == 0 {
break
}
for range l {
field := r.ReadString()
fn(r, field)
}
}
}
var byteType = reflect.TypeOf((*byte)(nil)).Elem()
func byteSliceToArray(b []byte, size int) any {
vArr := reflect.New(reflect.ArrayOf(size, byteType)).Elem()
reflect.Copy(vArr, reflect.ValueOf(b))
return vArr.Interface()
}