Skip to content

Commit a9362e1

Browse files
committed
(chore): migrate to using confluent's kafka library
1 parent 0f718c9 commit a9362e1

File tree

6 files changed

+16
-15
lines changed

6 files changed

+16
-15
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# kafka-wrapper
2-
A simple kafka wrapper for `node-rdkafka` client.
2+
A simple kafka wrapper for `@confluentinc/kafka-javascript` client.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
],
1919
"license": "ISC",
2020
"dependencies": {
21+
"@confluentinc/kafka-javascript": "^1.3.1",
2122
"node-rdkafka": "^2.12.0"
2223
}
2324
}

src/admin.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
const Client = require('./client');
2-
const Kafka = require('node-rdkafka');
2+
const Kafka = require('@confluentinc/kafka-javascript');
33

44
class KafkaAdmin extends Client {
55

src/consumer.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const Kafka = require('node-rdkafka');
1+
const Kafka = require('@confluentinc/kafka-javascript');
22
const Client = require('./client');
33

44
class KafkaConsumer extends Client {
@@ -7,8 +7,8 @@ class KafkaConsumer extends Client {
77
* Initializes a KafkaConsumer.
88
* @param {String} clientId: id to identify a client consuming the message.
99
* @param {String} groupId: consumer group id, the consumer belongs to.
10-
* @param {import('node-rdkafka').ConsumerGlobalConfig} config: configs for consumer.
11-
* @param {import('node-rdkafka').ConsumerTopicConfig} topicConfig: topic configs
10+
* @param {import('@confluentinc/kafka-javascript').ConsumerGlobalConfig} config: configs for consumer.
11+
* @param {import('@confluentinc/kafka-javascript').ConsumerTopicConfig} topicConfig: topic configs
1212
* @param {EventEmitter} emitter: to emit log events
1313
*/
1414
constructor(clientId, groupId, config, topicConfig, emitter) {
@@ -74,7 +74,7 @@ class KafkaConsumer extends Client {
7474

7575
/**
7676
* Subscribe to topics.
77-
* @param {import('node-rdkafka').SubscribeTopicList} topics: array of topic names.
77+
* @param {import('@confluentinc/kafka-javascript').SubscribeTopicList} topics: array of topic names.
7878
* @returns {KafkaConsumer}
7979
*/
8080
subscribe(topics) {
@@ -179,7 +179,7 @@ class KafkaConsumer extends Client {
179179

180180
/**
181181
* Parses message before passing it to consumer callback.
182-
* @param {Object} msg - expects it to be in node-rdkafka msg format.
182+
* @param {Object} msg - expects it to be in @confluentinc/kafka-javascript msg format.
183183
* @returns
184184
*/
185185
_parseMessage(msg) {

src/producer.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
const Kafka = require('node-rdkafka');
1+
const Kafka = require('@confluentinc/kafka-javascript');
22
const Client = require('./client');
33

44
class KafkaProducer extends Client {
55

66
/**
77
* Initializes a KafkaProducer.
88
* @param {String} clientId: id to identify a client producing the message.
9-
* @param {import('node-rdkafka').ProducerGlobalConfig} config: configs for producer.
10-
* @param {import('node-rdkafka').ProducerTopicConfig} topicConfig: topic configs.
9+
* @param {import('@confluentinc/kafka-javascript').ProducerGlobalConfig} config: configs for producer.
10+
* @param {import('@confluentinc/kafka-javascript').ProducerTopicConfig} topicConfig: topic configs.
1111
* @param {EventEmitter} emitter: to emit log messages
1212
*/
1313
constructor(clientId, config, topicConfig, emitter) {
@@ -85,10 +85,10 @@ class KafkaProducer extends Client {
8585
/**
8686
* Produce a message to a topic-partition.
8787
* @param {String} topic: name of topic
88-
* @param {import('node-rdkafka').NumberNullUndefined} partition: partition number to produce to.
88+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} partition: partition number to produce to.
8989
* @param {any} message: message to be produced.
90-
* @param {import('node-rdkafka').MessageKey} key: key associated with the message.
91-
* @param {import('node-rdkafka').NumberNullUndefined} timestamp: timestamp to send with the message.
90+
* @param {import('@confluentinc/kafka-javascript').MessageKey} key: key associated with the message.
91+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} timestamp: timestamp to send with the message.
9292
* @returns {import('../types').BooleanOrNumber}: returns boolean or librdkafka error code.
9393
*/
9494
produce({ topic, message, partition = null, key = null, timestamp = null }) {
@@ -105,7 +105,7 @@ class KafkaProducer extends Client {
105105
/**
106106
* Flush everything on the internal librdkafka buffer.
107107
* Good to perform before disconnect.
108-
* @param {import('node-rdkafka').NumberNullUndefined}} timeout
108+
* @param {import('@confluentinc/kafka-javascript').NumberNullUndefined}} timeout
109109
* @param {import('../types').ErrorHandlingFunction} postFlushAction
110110
* @returns {KafkaProducer}
111111
*/

types/index.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ClientMetrics, ConsumerGlobalConfig, ConsumerTopicConfig, GlobalConfig, LibrdKafkaError, Message, MessageKey, NewTopic, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig, SubscribeTopicList } from "node-rdkafka";
1+
import { ClientMetrics, ConsumerGlobalConfig, ConsumerTopicConfig, GlobalConfig, LibrdKafkaError, Message, MessageKey, NewTopic, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig, SubscribeTopicList } from "@confluentinc/kafka-javascript";
22

33
export type ConsumeActionFunction = (err: LibrdKafkaError, messages: Message[]) => void;
44

0 commit comments

Comments
 (0)