RxKafka - Reactive wrapper for well known C++ kafka library rdkafka
- Extending great library node rdkafka with reactive RxJs extension. Easy implementation using a simple Subscriptions to well known Observables and subscriptions from ReactiveX.
- Added Typescript for cleaner manipulation with API and configurations
Simple start consist only from 3 steps.
npm i rxkafka
const producer = new KafkaProducer({"metadata.broker.list": "localhost:9092"}, {});
producer.send({topic: "my-topic", message: "message"});
const consumer = new KafkaMessageConsumer({"metadata.broker.list": "localhost:9092", "group.id": "my-group"}, {});
consumer.connect(["my-topic"]).subscribe((next) => {
console.log(next.value.toString());
}, (error) => {
console.log(error);
});
Some other examples of using RxKafka library with all possibilities of configuration.
consumer.watermarkOffsets({topic: "my-topic", timeout: 5000, partition: 1}).subscribe((data) => {
console.log("My offset:");
console.log(data);
}, (error) => {
console.log(error);
});
consumer.metadata({topic: "my-topic", timeout: 5000}).subscribe((data) => {
console.log("My producer metadata:");
console.log(data);
}, (error) => {
console.log(error);
});
You can setup same configuration as explained in librdkafka configuration using prepared interfaces for easier usage.
export interface TopicConfig {
"request.required.acks"?: number;
"request.timeout.ms"?: number;
"message.timeout.ms"?: number;
"queuing.strategy"?: string;
"produce.offset.report"?: boolean;
"auto.commit.enable"?: boolean;
"compression.codec"?: string;
"auto.commit.interval.ms"?: number;
"auto.offset.reset"?: string;
"offset.store.path"?: string;
"offset.store.sync.interval.ms"?: number;
"offset.store.method"?: string;
"consume.callback.max.messages"?: number;
}
export interface ConsumerConfig extends BaseConfig {
"group.id": string;
"metadata.broker.list": string;
}
export interface ProducerConfig extends BaseConfig {
"metadata.broker.list": string;
bufferSize?: number;
}
export interface BaseConfig {
"group.id"?: string;
"metadata.broker.list"?: string;
"bootstrap.servers"?: string;
"message.max.bytes"?: number;
"message.copy.max.bytes"?: number;
"receive.message.max.bytes"?: number;
"max.in.flight.requests.per.connection"?: number;
"max.in.flight"?: number;
"metadata.request.timeout.ms"?: number;
"topic.metadata.refresh.interval.ms"?: number;
"metadata.max.age.ms"?: number;
"topic.metadata.refresh.fast.interval.ms"?: number;
"topic.metadata.refresh.sparse"?: boolean;
"debug"?: string;
"socket.timeout.ms"?: number;
"socket.blocking.max.ms"?: number;
"socket.send.buffer.bytes"?: number;
"socket.receive.buffer.bytes"?: number;
"socket.keepalive.enable"?: boolean;
"socket.nagle.disable"?: boolean;
"socket.max.fails"?: number;
"broker.address.ttl"?: number;
"broker.address.family"?: string;
"reconnect.backoff.jitter.ms"?: number;
"statistics.interval.ms"?: number;
"enabled_events"?: number;
"log_level"?: number;
"log.queue"?: boolean;
"log.thread.name"?: boolean;
"log.connection.close"?: boolean;
"internal.termination.signal"?: number;
"api.version.request"?: boolean;
"api.version.request.timeout.ms"?: number;
"api.version.fallback.ms"?: number;
"broker.version.fallback"?: string;
"security.protocol"?: string;
"ssl.cipher.suites"?: string;
"ssl.curves.list"?: string;
"ssl.key.location"?: string;
"ssl.key.password"?: string;
"ssl.certificate.location"?: string;
"ssl.ca.location"?: string;
"ssl.crl.location"?: string;
"ssl.keystore.location"?: string;
"ssl.keystore.password"?: string;
"sasl.mechanisms"?: string;
"sasl.mechanism"?: string;
"sasl.kerberos.service.name"?: string;
"sasl.kerberos.principal"?: string;
"sasl.kerberos.kinit.cmd"?: string;
"sasl.kerberos.keytab"?: string;
"sasl.kerberos.min.time.before.relogin"?: number;
"sasl.username"?: string;
"sasl.password"?: string;
"plugin.library.paths"?: string;
"partition.assignment.strategy"?: string;
"session.timeout.ms"?: number;
"heartbeat.interval.ms"?: number;
"group.protocol.type"?: string;
"coordinator.query.interval.ms"?: number;
"enable.auto.commit"?: boolean;
"auto.commit.interval.ms"?: number;
"enable.auto.offset.store"?: boolean;
"queued.min.messages"?: number;
"queued.max.messages.kbytes"?: number;
"fetch.wait.max.ms"?: number;
"fetch.message.max.bytes"?: number;
"max.partition.fetch.bytes"?: number;
"fetch.max.bytes"?: number;
"fetch.min.bytes"?: number;
"fetch.error.backoff.ms"?: number;
"offset.store.method"?: number;
"enable.partition.eof"?: boolean;
"check.crcs"?: boolean;
"queue.buffering.max.messages"?: number;
"queue.buffering.max.kbytes"?: number;
"queue.buffering.max.ms"?: number;
"linger.ms"?: number;
"message.send.max.retries"?: number;
"retries"?: number;
"retry.backoff.ms"?: number;
"queue.buffering.backpressure.threshold"?: number;
"compression.codec"?: string;
"compression.type"?: string;
"batch.num.messages"?: number;
"delivery.report.only.error"?: boolean;
}
export interface MetadataConfig {
timeout: number;
topic: string;
}
export interface StatsConsumerConfig extends ConsumerConfig {
"statistics.interval.ms": number;
}
Require git, docker
git clone https://github.com/wurstmeister/kafka-docker.git
Override two settings in docker-compose-single-broker.yml
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_CREATE_TOPICS: "my-topic:1:1"
docker-compose -f docker-compose-single-broker.yml up -d
KafkaIntegrationTests.test.ts
Remove skip
phase from tests (Skipped because of travis CI).
npm run test
All contributors are welcome. If you never contributed to the open-source, start with reading the Github Flow.
- Create an issue
- Create a pull request with reference to the issue
- Rest and enjoy the great feeling of being a contributor.