See the library in action here!
The section below gives an overview on how to use the Kafka CDI library!
In a Maven managed project simply the following to your pom.xml:
...
<dependency>
<groupId>org.aerogear.kafka</groupId>
<artifactId>kafka-cdi-extension</artifactId>
<version>0.1.2</version>
</dependency>
...The @Producer annotation is used to configure and inject an instance of the SimpleKafkaProducer class, which is a simple extension of the original KafkaProducer class:
...
public class MyPublisherService {
private Logger logger = LoggerFactory.getLogger(MyPublisherService.class);
@Producer
SimpleKafkaProducer<Integer, String> producer;
/**
* A simple service method, that sends payload over the wire
*/
public void hello() {
producer.send("myTopic", "My Message");
}
}The @Consumer annotation is used to configure and declare an annotated method as a callback for the internal DelegationKafkaConsumer, which internally uses the vanilla KafkaConsumer:
public class MyListenerService {
private Logger logger = LoggerFactory.getLogger(MyListenerService.class);
/**
* Simple listener that receives messages from the Kafka broker
*/
@Consumer(topics = "myTopic", groupId = "myGroupID")
public void receiver(final String message) {
logger.info("That's what I got: " + message);
}
}Receiving the key and the value is also possible:
public class MyListenerService {
private Logger logger = LoggerFactory.getLogger(MyListenerService.class);
/**
* Simple listener that receives messages from the Kafka broker
*/
@Consumer(topics = "myTopic", groupId = "myGroupID")
public void receiver(final String key, final String value) {
logger.info("That's what I got: (key: " + key + " , value:" + value + ")");
}
}With the @KafkaStream annotation the libary supports the Kafka Streams API:
@KafkaStream(input = "push_messages_metrics", output = "successMessagesPerJob2")
public KTable<String, Long> successMessagesPerJobTransformer(final KStream<String, String> source) {
final KTable<String, Long> successCountsPerJob = source.filter((key, value) -> value.equals("Success"))
.groupByKey()
.count("successMessagesPerJob");
return successCountsPerJob;
}The method accepts a KStream instance from the input topic, inside the method body some (simple) stream processing can be done, and as return value we support either KStream or KTable. The entire setup is handled by the library itself.
A minimal of configuration is currently needed. For that there is a @KafkaConfig annotation. The first occurrence is used:
@KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}")
public class MyService {
...
}Apache Kafka uses a binary message format, and comes with a handful of handy Serializers and Deserializers, available through the Serdes class. The Kafka CDI extension adds a Serde for the JsonObject:
To send serialize a JsonObject, simply specify the type, like:
...
@Producer
SimpleKafkaProducer<Integer, JsonObject> producer;
...
producer.send("myTopic", myJsonObj);For deserialization the argument on the annotation @Consumer method is used to setup the actual Deserializer
@Consumer(topic = "myTopic", groupId = "myGroupID", keyType = Integer.class)
public void receiveJsonObject(JsonObject message) {
logger.info("That's what I got: " + message);
}To setup Apache Kafka there are different ways to get started. This section quickly discusses pure Docker and Openshift.
Starting a Zookeeper cluster:
docker run -d --name zookeeper jplock/zookeeper:3.4.6Next, we need to start Kafka and link the Zookeeper Linux container to it:
docker run -d --name kafka --link zookeeper:zookeeper ches/kafkaNow, that the broker is running, we need to figure out the IP address of it:
docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafkaWe use this IP address when inside our @KafkaConfig annotation that our Producers and Consumers can speak to Apache Kafka.
For Apache Kafka on Openshift please check this repository: