diff --git a/server/kafka/pb_deserializer.go b/server/kafka/pb_deserializer.go index ca23d9d..20ecac9 100644 --- a/server/kafka/pb_deserializer.go +++ b/server/kafka/pb_deserializer.go @@ -36,9 +36,9 @@ type protobufDeserializer struct { } type protobufWrapper struct { - Schema *srclient.Schema - MessageTypeIndex []int64 - CleanPayload []byte + Schema *srclient.Schema + MessageTypeIndexes []int64 + CleanPayload []byte } func newDeserializer() pbDeserializer { @@ -103,9 +103,9 @@ func (pd *protobufDeserializer) decodeProtobufStructures(schema *srclient.Schema } return &protobufWrapper{ - Schema: schema, - MessageTypeIndex: messageTypeIDs, - CleanPayload: remainingPayload, + Schema: schema, + MessageTypeIndexes: messageTypeIDs, + CleanPayload: remainingPayload, }, nil } @@ -116,12 +116,12 @@ func (pd *protobufDeserializer) getMessageDescriptorFromMessage(wrapper *protobu } // Traverse through the message types until we find the right type as pointed to by message array index. This array - // of ints with each type indexed level by level. + // of varints with each type indexed level by level. messageTypes := fd.GetMessageTypes() messageTypesLen := int64(len(messageTypes)) var messageDescriptor *desc.MessageDescriptor - for _, i := range wrapper.MessageTypeIndex { + for _, i := range wrapper.MessageTypeIndexes { if i > messageTypesLen { // This should never happen return nil, fmt.Errorf("failed to decode message type: message index is larger than message types array length") diff --git a/server/kafka/pb_serializer.go b/server/kafka/pb_serializer.go index c95a184..885f942 100644 --- a/server/kafka/pb_serializer.go +++ b/server/kafka/pb_serializer.go @@ -17,7 +17,6 @@ package kafka import ( - "bytes" "encoding/binary" "strings" "unsafe" @@ -54,7 +53,7 @@ func (ps *protobufSerializer) Serialize(schema *srclient.Schema, payload []byte) return nil, err } - indexBytes, err := ps.buildMessageIndexes(schema, messageDescriptor.GetFullyQualifiedName()) + indexLenBytes, indexBytes, err := ps.buildMessageIndexes(schema, messageDescriptor.GetFullyQualifiedName()) if err != nil { return nil, err } @@ -64,8 +63,8 @@ func (ps *protobufSerializer) Serialize(schema *srclient.Schema, payload []byte) return nil, err } - serializedPayload := make([]byte, len(indexBytes)+len(protoBytes)+16) // 16 extra bytes for the array length - binary.PutVarint(serializedPayload, int64(len(indexBytes)/int(unsafe.Sizeof(int32(0))))) + var serializedPayload []byte + serializedPayload = append(serializedPayload, indexLenBytes...) if len(indexBytes) > 0 { serializedPayload = append(serializedPayload, indexBytes...) } @@ -73,32 +72,37 @@ func (ps *protobufSerializer) Serialize(schema *srclient.Schema, payload []byte) return serializedPayload, nil } -func (ps *protobufSerializer) buildMessageIndexes(schema *srclient.Schema, name string) ([]byte, error) { +func (ps *protobufSerializer) buildMessageIndexes(schema *srclient.Schema, name string) ([]byte, []byte, error) { fileDescriptor, err := schemaManager.getFileDescriptor(schema) if err != nil { - return nil, err + return nil, nil, err } parts := strings.Split(name, ".") messageTypes := fileDescriptor.GetMessageTypes() - var indexes []byte + var messageIndex []byte + indexesCount := int64(0) for _, part := range parts { - i := int32(0) + i := int64(0) for _, mType := range messageTypes { if mType.GetName() == part { - indexBuf := new(bytes.Buffer) - err = binary.Write(indexBuf, binary.BigEndian, i) + indexBuf := make([]byte, unsafe.Sizeof(i)) + bytesLen := int64(binary.PutVarint(indexBuf, i)) if err != nil { - return nil, err + return nil, nil, err } - indexes = append(indexes, indexBuf.Bytes()...) + messageIndex = append(messageIndex, indexBuf[:bytesLen]...) + indexesCount++ break } i++ } } - return indexes, nil + indexCountBytes := make([]byte, unsafe.Sizeof(indexesCount)) + indexCountBytesSize := binary.PutVarint(indexCountBytes, indexesCount) + + return indexCountBytes[:indexCountBytesSize], messageIndex, nil } diff --git a/server/kafka/producer_test.go b/server/kafka/producer_test.go index 7303dd0..8fe8308 100644 --- a/server/kafka/producer_test.go +++ b/server/kafka/producer_test.go @@ -58,16 +58,22 @@ func TestSerializePayloadJson(t *testing.T) { func TestSerializePayloadProtobuf(t *testing.T) { server := newMockSchemaServer(t) defer server.close() + srClient := srclient.CreateSchemaRegistryClient(server.getServerURL()) producer := &saramaProducer{ schemaRegistryOn: true, - schemaRegistryClient: srclient.CreateSchemaRegistryClient(server.getServerURL()), + schemaRegistryClient: srClient, subjectName: protobufSubjectName, schemaVersion: protobufSchemaVersion, schemaType: srclient.Protobuf, pbSerializer: newSerializer(), } + schema, err := srClient.GetSchema(protobufSchemaID) + assert.Nil(t, err) + + message, err := producer.serializePayload([]byte(protobufMessage)) + assert.Nil(t, err) - _, err := producer.serializePayload([]byte(protobufMessage)) + _, err = newDeserializer().Deserialize(schema, message[5:]) assert.Nil(t, err) }