Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*'")
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-43642]")
}
api("com.google.protobuf:protobuf-java-util:3.21.7") {
api("com.google.protobuf:protobuf-java-util:$protobufVersion") {
because("https://nvd.nist.gov/vuln/detail/CVE-2022-3171")
}
api("com.squareup.okio:okio:3.4.0") {
Expand Down
1 change: 1 addition & 0 deletions kafka-streams-serdes/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {

api("org.apache.kafka:kafka-clients")
api("org.apache.avro:avro")
api("com.google.protobuf:protobuf-java-util")

testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.hypertrace.core.kafkastreams.framework.serdes.proto;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import org.apache.kafka.common.serialization.Deserializer;

/**
* Custom Proto Deserializer for Kafka.
*
* <p>This class provides a deserialization mechanism for Kafka messages using Protocol Buffers
* without schema validation. It extends the Kafka Deserializer interface and allows for direct
* deserialization of byte arrays into Proto message objects by utilizing the provided Parser for
* the specific Proto message type.
*
* <p>Motivation: In setups where both producers and consumers use the same Proto schemas, the need
* for schema validation becomes redundant. The built-in {@code kafkaProtoSerdes} from Confluent
* performs schema validation via the schema registry service, which introduces overhead. This
* custom deserializer eliminates that overhead, simplifying the processing flow by bypassing schema
* validation.
*
*
* <p>Usage: To use this class, create a subclass specifying the Proto message type, pass the
* corresponding Parser to the superclass constructor, and configure Kafka to use the custom
* deserializer.
*
* <p>Example:
*
* <pre>{@code
* public class MyProtoMessageDeserializer extends ProtoDeserializer<MyProtoMessage> {
* public MyProtoMessageDeserializer() {
* super(MyProtoMessage.parser());
* }
* }
* }</pre>
*
* Then, configure Kafka to use this deserializer:
*
* <pre>{@code
* key.deserializer=com.example.MyProtoMessageDeserializer
* }</pre>
*
* @param <T> The Proto message type to be deserialized.
*/
public class ProtoDeserializer<T extends Message> implements Deserializer<T> {

private final Parser<T> parser;

public ProtoDeserializer(Parser<T> parser) {
this.parser = parser;
}

@Override
public T deserialize(String s, byte[] bytes) {
try {
return parser.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.hypertrace.core.kafkastreams.framework.serdes.proto;

import com.google.protobuf.Message;
import org.apache.kafka.common.serialization.Serializer;

public class ProtoSerializer<T extends Message> implements Serializer<T> {
@Override
public byte[] serialize(String topic, T data) {
return data.toByteArray();
}
}