Skip to content

Commit

Permalink
Write message indexes correctly in protobuf (#62)
Browse files Browse the repository at this point in the history
An issue exists in the protobuf serializer where the message indexes
array length and contents are not written correctly causing the
deserialization to fail. This PR correctly sets the array length as well
as the message indexes array contents using varints and also changes the
serializer test to decode the message and check for validity.
  • Loading branch information
drnushooz committed Jan 10, 2022
1 parent d10eb43 commit 13ea138
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 23 deletions.
16 changes: 8 additions & 8 deletions server/kafka/pb_deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ type protobufDeserializer struct {
}

type protobufWrapper struct {
Schema *srclient.Schema
MessageTypeIndex []int64
CleanPayload []byte
Schema *srclient.Schema
MessageTypeIndexes []int64
CleanPayload []byte
}

func newDeserializer() pbDeserializer {
Expand Down Expand Up @@ -103,9 +103,9 @@ func (pd *protobufDeserializer) decodeProtobufStructures(schema *srclient.Schema
}

return &protobufWrapper{
Schema: schema,
MessageTypeIndex: messageTypeIDs,
CleanPayload: remainingPayload,
Schema: schema,
MessageTypeIndexes: messageTypeIDs,
CleanPayload: remainingPayload,
}, nil
}

Expand All @@ -116,12 +116,12 @@ func (pd *protobufDeserializer) getMessageDescriptorFromMessage(wrapper *protobu
}

// Traverse through the message types until we find the right type as pointed to by message array index. This array
// of ints with each type indexed level by level.
// of varints with each type indexed level by level.
messageTypes := fd.GetMessageTypes()
messageTypesLen := int64(len(messageTypes))
var messageDescriptor *desc.MessageDescriptor

for _, i := range wrapper.MessageTypeIndex {
for _, i := range wrapper.MessageTypeIndexes {
if i > messageTypesLen {
// This should never happen
return nil, fmt.Errorf("failed to decode message type: message index is larger than message types array length")
Expand Down
30 changes: 17 additions & 13 deletions server/kafka/pb_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kafka

import (
"bytes"
"encoding/binary"
"strings"
"unsafe"
Expand Down Expand Up @@ -54,7 +53,7 @@ func (ps *protobufSerializer) Serialize(schema *srclient.Schema, payload []byte)
return nil, err
}

indexBytes, err := ps.buildMessageIndexes(schema, messageDescriptor.GetFullyQualifiedName())
indexLenBytes, indexBytes, err := ps.buildMessageIndexes(schema, messageDescriptor.GetFullyQualifiedName())
if err != nil {
return nil, err
}
Expand All @@ -64,41 +63,46 @@ func (ps *protobufSerializer) Serialize(schema *srclient.Schema, payload []byte)
return nil, err
}

serializedPayload := make([]byte, len(indexBytes)+len(protoBytes)+16) // 16 extra bytes for the array length
binary.PutVarint(serializedPayload, int64(len(indexBytes)/int(unsafe.Sizeof(int32(0)))))
var serializedPayload []byte
serializedPayload = append(serializedPayload, indexLenBytes...)
if len(indexBytes) > 0 {
serializedPayload = append(serializedPayload, indexBytes...)
}
serializedPayload = append(serializedPayload, protoBytes...)
return serializedPayload, nil
}

func (ps *protobufSerializer) buildMessageIndexes(schema *srclient.Schema, name string) ([]byte, error) {
func (ps *protobufSerializer) buildMessageIndexes(schema *srclient.Schema, name string) ([]byte, []byte, error) {
fileDescriptor, err := schemaManager.getFileDescriptor(schema)
if err != nil {
return nil, err
return nil, nil, err
}

parts := strings.Split(name, ".")
messageTypes := fileDescriptor.GetMessageTypes()

var indexes []byte
var messageIndex []byte
indexesCount := int64(0)
for _, part := range parts {
i := int32(0)
i := int64(0)
for _, mType := range messageTypes {
if mType.GetName() == part {
indexBuf := new(bytes.Buffer)
err = binary.Write(indexBuf, binary.BigEndian, i)
indexBuf := make([]byte, unsafe.Sizeof(i))
bytesLen := int64(binary.PutVarint(indexBuf, i))
if err != nil {
return nil, err
return nil, nil, err
}

indexes = append(indexes, indexBuf.Bytes()...)
messageIndex = append(messageIndex, indexBuf[:bytesLen]...)
indexesCount++
break
}
i++
}
}

return indexes, nil
indexCountBytes := make([]byte, unsafe.Sizeof(indexesCount))
indexCountBytesSize := binary.PutVarint(indexCountBytes, indexesCount)

return indexCountBytes[:indexCountBytesSize], messageIndex, nil
}
10 changes: 8 additions & 2 deletions server/kafka/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,22 @@ func TestSerializePayloadJson(t *testing.T) {
func TestSerializePayloadProtobuf(t *testing.T) {
server := newMockSchemaServer(t)
defer server.close()
srClient := srclient.CreateSchemaRegistryClient(server.getServerURL())

producer := &saramaProducer{
schemaRegistryOn: true,
schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()),
schemaRegistryClient: srClient,
subjectName: protobufSubjectName,
schemaVersion: protobufSchemaVersion,
schemaType: srclient.Protobuf,
pbSerializer: newSerializer(),
}
schema, err := srClient.GetSchema(protobufSchemaID)
assert.Nil(t, err)

message, err := producer.serializePayload([]byte(protobufMessage))
assert.Nil(t, err)

_, err := producer.serializePayload([]byte(protobufMessage))
_, err = newDeserializer().Deserialize(schema, message[5:])
assert.Nil(t, err)
}

0 comments on commit 13ea138

Please sign in to comment.