This repository contains examples of custom deserializers usable in Conduktor with plugin mechanism.
You can download the latest jar containing these deserializers my_custom_deserializers_2.13-2.3.0.jar
To learn how to use the "custom deserializer" feature see Conduktor documentation
Console requires a single jar containing all dependencies (also known as a "fat" jar) to be able to use a custom deserializer.
To build such a jar for the examples in this repository, you simply have to run:
sbt assembly
If you modify the examples to include new dependencies, please make sure that the assembly merge strategy is properly configured to include all the necessary files for your dependencies to work.
For example, if you make add a Custom deserializer using protobuf to this repository, you should modify the existing assembly merge strategy as following:
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "versions", "9", "module-info.class") => MergeStrategy.first
case PathList("com", "google", "type", xs @ _*) => MergeStrategy.first // New line to properly merge Google libraries
case x => (assembly / assemblyMergeStrategy).value(x)
}
io.example.conduktor.custom.deserializers.MyCustomDeserializer
This deserializer transforms the data (bytes) it receives from Kafka into a String
(text),
then sees if it matches then following format:
-- this is the serialized data
-
If the message received from Kafka effectively starts with a
--<space>
characters sequence then followed by some text, it creates a new instance of a data structure namedMyMessage
, that contains only one field namedvalue
and is of typeString
, as following:MyMessage(value = "this is the serialized data")
In Conduktor, this data structure will be interpreted and displayed as JSON:
{ "value": "this is the serialized data" }
-
If the message received from Kafka doesn't match the expected format, then the deserializer fails with an error message:
Invalid format received for message: <the received message>
This simple example is here to demonstrate 2 things:
- What's happening when a custom deserializer fails to deserialize a message.
- Give a simplified example of "deserialization" (the message has to respect of certain format so that the deserializer can extract the data)
To see simple example around constants, jump here
io.example.conduktor.custom.deserializers.MyCustomProtobufDeserializer
This example allow to deserialize a protobuf payload corresponding to this schema :
message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
required string number = 1;
optional PhoneType type = 2 [default = HOME];
}
repeated PhoneNumber phones = 4;
}
io.example.conduktor.custom.deserializers.MyConfigurableDeserializer
This example allow to show deserializer configuration to change it's behavior.
To configure the behabor, the Deserializer check for a output
property in it's configuration.
With configuration :
output=passthrough
The data on record are not de coded and returned as-is in bytes array form.
With configuration :
output=config
The configuration is returned on each record deserialization. For example with configuration
output=config
other.property=some value
Will always return JSON like
{
"output": "config",
"other.property": "some value"
}
With configuration output defined to something else other than config
or passthrough
and not empty like:
output=some constant output
The Deserializer will always return String value like
"some constant output"