Coherence integration with Apache Kafka
Supported on Coherence CE 21.06 and later.
The following components are provided:
- Kafka Entry Store - a CacheStore type of extension that copies data from a Coherence map to a Kafka topic upon modification. This is contained in the
coherence-kafka-core
project. - Kafka Sink Connector - an Apache Kafka Connect plugin of type "sink", which copies data posted to a Kafka topic to a Coherence map. This is contained in the
coherence-kafka-sink
project
Serializers and deserializers can be configured to handle data to and from Kafka, or an optimized pass-through mode will handle serialized entries which are already in a Coherence binary form.
When configuring a cachestore-scheme as follows, Coherence will send entries to the specified Kafka broker:
Click to see config
<?xml version="1.0"?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xmlns:kafka="class://com.oracle.coherence.kafka.KafkaNamespaceHandler"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
<caching-scheme-mapping>
<cache-mapping>
<cache-name>foo</cache-name>
<scheme-name>partitioned-rwbm-kafka</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<distributed-scheme>
<scheme-name>partitioned-rwbm-kafka</scheme-name>
<backing-map-scheme>
<partitioned>true</partitioned>
<read-write-backing-map-scheme>
<internal-cache-scheme>
<local-scheme>
</local-scheme>
</internal-cache-scheme>
<cachestore-scheme>
<class-scheme>
<kafka:producer>
<!-- use default class -->
<kafka:topic-name>{cache-name}</kafka:topic-name>
<!-- default for passthrough is false -->
<kafka:bootstrap-servers>localhost:9092</kafka:bootstrap-servers>
<kafka:key-serializer>org.apache.kafka.common.serialization.StringSerializer</kafka:key-serializer>
<kafka:value-serializer>org.apache.kafka.common.serialization.StringSerializer</kafka:value-serializer>
<kafka:max-block-ms>5000</kafka:max-block-ms>
</kafka:producer>
</class-scheme>
</cachestore-scheme>
</read-write-backing-map-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
</caching-schemes>
</cache-config>
The kafka:producer
portion takes properties that are intended for a Kafka producer and uses them (the dash is turned into a dot, for example bootstrap-servers
to bootstrap.servers
)
load()
andloadAll()
: not supported; see Kafka Sink Connector for thisstore()
: Corresponds to aput()
operation on aNamedMap
, and performs aproducer.send()
on the specified Kafka topicstoreAll()
: Corresponds to aputAll()
operation on aNamedMap
, where each element in the map will be sent to the specified Kafka topicerase()
: Corresponds to aremove()
operation on aNamedMap
; when a topic is configured with acompact
policy, all records with the same key will be deleted (akatombstone
)eraseAll()
: Corresponds to aremoveAll()
operation on aNamedMap
, and is equivalent to callingerase()
on each element of the passed map
For more details see the coherence-kafka-core
project.
The sink connector is built as a zip file intended for being deployed in a Connect process' environment.
- Coherence Entry creation/update: records published on a topic are forwarded to a Coherence NamedMap and corresponding entries are created
- Coherence Entry deletion: when Kafka records have a null value, the corresponding entry in Coherence is removed if it existed
Note: Kafka records with a null key cannot be represented in Coherence and are discarded with a logged message such as:
connect> [2021-10-20 19:21:35,680] ERROR The key is null for record: SinkRecord{kafkaOffset=1, timestampType=CreateTime} ConnectRecord{topic='MyTopic', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value=User(NoKey, 20), valueSchema=Schema{STRUCT}, timestamp=1634757687699, headers=ConnectHeaders(headers=)} (com.oracle.coherence.kafka.connect.sink.CoherenceSinkTask)
Configuration is done using a curl command such as the following (Confluent Kafka)
Click to see command
curl -X POST -H "Content-Type: application/json" \
http://localhost:8083/connectors \
-d '{"name":"coh-sink",
"config":
{
"connector.class":"com.oracle.coherence.kafka.connect.CoherenceSinkConnector",
"topics":"MyTopic",
"coherence.cache.mappings":"MyTopic->MyCache"
}
}'
or a properties file for Kafka OSS
Click to expand
name=coh-sink
topics=MyTopic
tasks.max=1
connector.class=com.oracle.coherence.kafka.connect.CoherenceSinkConnector
coherence.cache.mappings=MyTopic->MyCache
key.converter=com.oracle.coherence.kafka.connect.util.CustomConverter
value.converter=com.oracle.coherence.kafka.connect.util.CustomConverter
value.converter.serializer=org.apache.kafka.common.serialization.StringSerializer
value.converter.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.converter.serializer=org.apache.kafka.common.serialization.StringSerializer
key.converter.deserializer=org.apache.kafka.common.serialization.StringDeserializer