File tree Expand file tree Collapse file tree 3 files changed +37
-0
lines changed
src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto Expand file tree Collapse file tree 3 files changed +37
-0
lines changed Original file line number Diff line number Diff line change 11plugins {
22 `java- library`
33 jacoco
4+ id(" com.google.protobuf" ) version " 0.9.4"
45 id(" org.hypertrace.avro-plugin" )
56 id(" org.hypertrace.publish-plugin" )
67 id(" org.hypertrace.jacoco-report-plugin" )
@@ -15,6 +16,7 @@ dependencies {
1516
1617 api(" org.apache.kafka:kafka-clients" )
1718 api(" org.apache.avro:avro" )
19+ implementation(" com.google.protobuf:protobuf-java:3.23.0" ) // Adjust the version as needed
1820
1921 testImplementation(" org.junit.jupiter:junit-jupiter:5.8.2" )
2022}
Original file line number Diff line number Diff line change 1+ package org .hypertrace .core .kafkastreams .framework .serdes .proto ;
2+
3+ import com .google .protobuf .InvalidProtocolBufferException ;
4+ import com .google .protobuf .Message ;
5+ import com .google .protobuf .Parser ;
6+ import org .apache .kafka .common .serialization .Deserializer ;
7+
8+ public class ProtoDeserializer <T extends Message > implements Deserializer <T > {
9+
10+ private final Parser <T > parser ;
11+
12+ public ProtoDeserializer (Parser <T > parser ) {
13+ this .parser = parser ;
14+ }
15+
16+ @ Override
17+ public T deserialize (String s , byte [] bytes ) {
18+ try {
19+ return parser .parseFrom (bytes );
20+ } catch (InvalidProtocolBufferException e ) {
21+ throw new RuntimeException (e );
22+ }
23+ }
24+ }
Original file line number Diff line number Diff line change 1+ package org .hypertrace .core .kafkastreams .framework .serdes .proto ;
2+
3+ import com .google .protobuf .Message ;
4+ import org .apache .kafka .common .serialization .Serializer ;
5+
6+ public class ProtoSerializer <T extends Message > implements Serializer <T > {
7+ @ Override
8+ public byte [] serialize (String topic , T data ) {
9+ return data .toByteArray ();
10+ }
11+ }
You can’t perform that action at this time.
0 commit comments