The Kafka River plugin allows you to read messages from Kafka and index bulked messages into elasticsearch. The bulk size (the number of messages to be indexed in one request) and concurrent request number is configurable. The Kafka River also supports consuming messages from multiple Kafka brokers and multiple partitions.
The plugin uses the latest Kafka and Elasticsearch version.
- Kafka version 0.8.2.1
- Elasticsearch version 1.6.0
The plugin is periodically updated, if there are newer versions of any dependencies. It is available in the ElasticSearch's official website.
-
Install Kafka if you are working on local environment (See Apache Kafka Quick Start Guide for instructions on how to Download and Build.)
-
Install the plugin
cd $ELASTICSEARCH_HOME
.bin/plugin -install <plugin-name> -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.2.1/elasticsearch-river-kafka-1.2.1-plugin.zip
Example:
cd $ELASTICSEARCH_HOME
.bin/plugin -install kafka-river -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.2.1/elasticsearch-river-kafka-1.2.1-plugin.zip
If it doesn't work, clone git repository and build plugin manually.
- Build the plugin - it will create a zip file here: $PROJECT-PATH/target/elasticsearch-river-kafka-1.2.1-SNAPSHOT-plugin.zip
- Install the plugin from target into elasticsearch
cd $ELASTICSEARCH_HOME
.bin/plugin --install <plugin-name> --url file:////$PLUGIN-PATH/elasticsearch-river-kafka-1.2.1-SNAPSHOT-plugin.zip
Update installed plugin
cd $ELASTICSEARCH_HOME
./bin/plugin -remove <plugin-name>
./bin/plugin -url file:/$PLUGIN_PATH -install <plugin-name>
To deploy Kafka river into Elasticsearch as a plugin, execute:
curl -XPUT 'localhost:9200/_river/<river-name>/_meta' -d '
{
"type" : "kafka",
"kafka" : {
"zookeeper.connect" : <zookeeper.connect>,
"zookeeper.connection.timeout.ms" : <zookeeper.connection.timeout.ms>,
"topic" : <topic.name>,
"message.type" : <message.type>
},
"index" : {
"index" : <index.name>,
"type" : <mapping.type.name>,
"bulk.size" : <bulk.size>,
"concurrent.requests" : <concurrent.requests>,
"action.type" : <action.type>,
"flush.interval" : <flush.interval>
},
"statsd" : {
"host" : <statsd.host>,
"prefix" : <statsd.prefix>,
"port" : <statsd.port>,
"log.interval" : <statsd.log.interval>
}
}'
- NOTE: Type "kafka" is required and must not be changed. It corresponds the type, given in the source code, by which elasticsearch is able to associate created river with the installed plugin.
Example:
curl -XPUT 'localhost:9200/_river/kafka-river/_meta' -d '
{
"type" : "kafka",
"kafka" : {
"zookeeper.connect" : "localhost",
"zookeeper.connection.timeout.ms" : 10000,
"topic" : "river",
"message.type" : "json"
},
"index" : {
"index" : "kafka-index",
"type" : "status",
"bulk.size" : 100,
"concurrent.requests" : 1,
"action.type" : "index",
"flush.interval" : "12h"
},
"statsd": {
"host" : "localhost",
"prefix" : "kafka.river",
"port" : 8125,
"log.interval" : 10
}
}'
The detailed description of each parameter:
-
river-name
(required) - The river name to be created in elasticsearch. -
zookeeper.connect
(optional) - Zookeeper server host. Default is:localhost
-
zookeeper.connection.timeout.ms
(optional) - Zookeeper server connection timeout in milliseconds. Default is:10000
-
topic
(optional) - The name of the topic where you want to send Kafka message. Default is:elasticsearch-river-kafka
-
message.type
(optional) - The kafka message type, which then will be inserted into ES keeping the same type. Default is:json
. The following options are available:-
json
: Inserts json message into ES separating each json property into ES document property. example:"_source": { "name": "John", "age": 28 }
-
string
: Inserts string message into ES as a documet, where the key name isvalue
, and the value is the received message. example:
"_source": { "value": "received text message" }
-
-
index
(optional) - The name of elasticsearch index. Default is:kafka-index
-
type
(optional) - The mapping type of elasticsearch index. Default is:status
-
bulk.size
(optional) - The number of messages to be bulk indexed into elasticsearch. Default is:100
-
concurrent.requests
(optional) - The number of concurrent requests of indexing that will be allowed. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. Default is:1
-
action.type
(optional) - The action type against how the messages should be processed. Default is:index
. The following options are available:index
: Creates documents in ES with thevalue
field set to the received message.delete
: Deletes documents from ES based onid
field set in the received message.raw.execute
: Execute incoming messages as a raw query.
-
flush.interval
(optional) - The number of seconds/minutes/hours after which any remaining messages get flushed to elasticsearch, even if the number of messages has not reached. The time values are represented like: "12h", "3m", "5s". Default is:12h
(12 hours)
host
(optional) - The statsd server host name. Default is:localhost
port
(optional) - The statsd server port number. Default is: 8125prefix
(optional) - Prefix to be added to all statsd metric keys. Default is:kafka.river
log.interval
(optional) - The interval, in seconds, in which to report metrics to the statsd server. Default is:10
(10 seconds)
To delete the existing river, execute:
curl -XDELETE 'localhost:9200/_river/<river-name>/'
Example:
curl -XDELETE 'localhost:9200/_river/kafka-river/'
To see the indexed data:
curl -XGET 'localhost:9200/kafka-index/_search?pretty=1'
To delete the index:
curl -XDELETE 'localhost:9200/kafka-index'
Currently Consumer Group Java API (high level api) is used to create the Kafka Consumer, which keeps track of offset automatically. This enables the River to read kafka messages from multiple brokers and multiple partitions.
Copyright (C) 2014 Mariam Hakobyan. See LICENSE
- Fork the repository on Github
- Create a named feature branch
- Develop your changes in a branch
- Write tests for your change (if applicable)
- Ensure all the tests are passing
- Submit a Pull Request using Github