kafka client (consumer + producer) polite out of the box
make it about them, not about you
- Simon Sinek
- promise based api
- core builds
module (checkout for options & tweaking) - uses ConsumerGroup(s) means your kafka needs to be > 0.9.x ( - 0.10.2+)
- provides an incoming message flow control for consumers
- provides a drain once for consumers
- provides an easy api for producers
- Documentation is still wip; checkout
npm install --save sinek
//requires a localhost kafka broker + zookeeper @ localhost:2181
npm test
const {Kafka, Drainer, Publisher, PartitionDrainer} = require("sinek");
const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeProducer([TEST_TOPIC], CLIENT_NAME, OPTIONS);
kafkaClient.on("ready", () => {
producer = new Publisher(kafkaClient, PARTITION_COUNT); //partition count should be the default count on your brokers partiitons e.g. 30
producer.send(topic, messages, partitionKey, partition, compressionType)
producer.batch(topic, [])
producer.appendBuffer(topic, identifier, object, compressionType)
//easy api that uses a KeyedPartitioner Type and identifies the
//target partition for the object's identifier by itself
//it also brings your payload (object) in perfect shape for
//a nicely consumeable topic
//call producer.flushBuffer(topic) to batch send the payloads
producer.bufferPublishMessage(topic, identifier, object, version, compressionType)
producer.bufferUnpublishMessage(topic, identifier, object, version, compressionType)
producer.bufferUpdatehMessage(topic, identifier, object, version, compressionType)
kafkaClient.on("error", err => console.log("producer error: " + err));
const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);
kafkaClient.on("ready", () => {
consumer = new Drainer(kafkaClient, 1); //1 = thread/worker/parallel count
consumer.drain((message, done) => {
consumer.drainOnce((message, done) => {
console.log("drain done: " + r);
}).catch(e => {
console.log("drain timeout: " + e);
consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
kafkaClient.on("error", err => console.log("consumer error: " + err));
const kafkaClient = new Kafka(ZK_CON_STR, LOGGER);
kafkaClient.becomeConsumer([TEST_TOPIC], GROUP_ID, OPTIONS);
kafkaClient.on("ready", () => {
consumer = new PartitionDrainer(kafkaClient, 1); //1 = thread/worker/parallel count per partition
//drain requires a topic-name and returns a promise
consumer.drain(TEST_TOPIC, (message, done) => {
}).then(_ => ..).catch(e => console.log(e));
//drainOnce requires a topic-name
consumer.drainOnce(TEST_TOPIC, (message, done) => {
console.log("drain done: " + r);
}).catch(e => {
console.log("drain timeout: " + e);
consumer.resetConsumer([TEST_TOPIC]).then(_ => {});
kafkaClient.on("error", err => console.log("consumer error: " + err));
- interesting options for tweaking consumers
const OPTIONS = {
sessionTimeout: 12500,
protocol: ["roundrobin"],
fromOffset: "latest", //earliest
fetchMaxBytes: 1024 * 100,
fetchMinBytes: 1,
fetchMaxWaitMs: 100,
autoCommit: true,
autoCommitIntervalMs: 5000
- remove and create topic api will require a special broker configuration or these will just result in nothing at all
using the
functions on Drainer, Publisher or PartitionDrainer you can get some valueable insights into whats currently going on in your client -
when using "Drainer" to consume and write upserts into a database that require ACID functionality and a build-up of models/message-payloads you must set the AsyncLimit of new Drainer(.., 1) to "1" or you will have trouble with data integrity
if your data is spread entity wise above partitions you can use the "PartitionDrainer" to drain multiple partitions at the same time
the "Publisher" offers a simple API to create such (keyed) partitioned topics
it is probably a good idea to spawn a Consumer per Topic