diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index c6410547..c937961f 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -104,6 +104,11 @@ class Consumer { */ #state = ConsumerState.INIT; + /** + * @returns {ConsumerState} the client state. + */ + getState() { return this.#state } + /** * Contains a mapping of topic+partition to an offset that the user wants to seek to. * The keys are of the type "<topic>|<partition>". @@ -414,8 +419,8 @@ class Consumer { try { alternateAssignment = await userSpecifiedRebalanceCb(err, assignment, assignmentFns); } catch (e) { - this.#logger.error(`Error from user's rebalance callback: ${e.stack}, `+ - 'continuing with the default rebalance behavior.'); + this.#logger.error(`Error from user's rebalance callback: ${e.stack}, ` + + 'continuing with the default rebalance behavior.'); } if (alternateAssignment) { @@ -639,11 +644,10 @@ class Consumer { /* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks. * TODO: add trampoline method for offset commit callback. */ rdKafkaConfig['offset_commit_cb'] = true; - rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e => - { - if (this.#logger) - this.#logger.error(`Error from rebalance callback: ${e.stack}`); - }); + rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e => { + if (this.#logger) + this.#logger.error(`Error from rebalance callback: ${e.stack}`); + }); /* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this * setting and set it to false. */ @@ -660,16 +664,16 @@ class Consumer { this.#autoCommit = rdKafkaConfig['enable.auto.commit']; } - /** - * Actual max poll interval is twice the configured max poll interval, - * because we want to ensure that when we ask for worker termination, - * and there is one last message to be processed, we can process it in - * the configured max poll interval time. - * This will cause the rebalance callback timeout to be double - * the value of the configured max poll interval. - * But it's expected otherwise we cannot have a cache and need to consider - * max poll interval reached on processing the very first message. - */ + /** + * Actual max poll interval is twice the configured max poll interval, + * because we want to ensure that when we ask for worker termination, + * and there is one last message to be processed, we can process it in + * the configured max poll interval time. + * This will cause the rebalance callback timeout to be double + * the value of the configured max poll interval. + * But it's expected otherwise we cannot have a cache and need to consider + * max poll interval reached on processing the very first message. + */ this.#maxPollIntervalMs = rdKafkaConfig['max.poll.interval.ms'] ?? 300000; this.#cacheExpirationTimeoutMs = this.#maxPollIntervalMs; rdKafkaConfig['max.poll.interval.ms'] = this.#maxPollIntervalMs * 2; @@ -833,14 +837,14 @@ class Consumer { * only if the size has been requested to be increased twice in a row. * @private */ - #increaseMaxSize() { - if (this.#messageCacheMaxSize === 1024) - return; - this.#increaseCount++; - if (this.#increaseCount <= 1) - return; - this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024); - this.#increaseCount = 0; + #increaseMaxSize() { + if (this.#messageCacheMaxSize === 1024) + return; + this.#increaseCount++; + if (this.#increaseCount <= 1) + return; + this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024); + this.#increaseCount = 0; } /** @@ -850,8 +854,8 @@ class Consumer { * @private */ #decreaseMaxSize(recvdSize) { - this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1); - this.#increaseCount = 0; + this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1); + this.#increaseCount = 0; } /** @@ -1008,7 +1012,7 @@ class Consumer { } return this.#fetchAndResolveWith(() => - this.#messageCache.nextN(null, size), + this.#messageCache.nextN(null, size), this.#messageCacheMaxSize); } @@ -1404,7 +1408,7 @@ class Consumer { }); if (this.#pendingOperations.length) { - ppc = this.#discardMessages(ms, ppc); + ppc = this.#discardMessages(ms, ppc); break; } @@ -1423,7 +1427,7 @@ class Consumer { async #checkMaxPollIntervalNotExceeded(now) { const maxPollExpiration = this.#lastFetchClockNs + BigInt((this.#cacheExpirationTimeoutMs + this.#maxPollIntervalMs) - * 1e6); + * 1e6); let interval = Number(maxPollExpiration - now) / 1e6; if (interval < 1) @@ -1717,7 +1721,7 @@ class Consumer { await Promise.allSettled(librdkafkaSeekPromises); await Promise.all(librdkafkaSeekPromises); - for (const [key, ] of seekedPartitions) { + for (const [key,] of seekedPartitions) { this.#pendingSeeks.delete(key); } @@ -1804,7 +1808,7 @@ class Consumer { this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset]); this.#addPendingOperation(() => - this.#seekInternal([rdKafkaTopicPartitionOffset])); + this.#seekInternal([rdKafkaTopicPartitionOffset])); } async describeGroup() { @@ -1895,7 +1899,7 @@ class Consumer { topicPartition)); this.#addPendingOperation(() => - this.#pauseInternal(flattenedToppars)); + this.#pauseInternal(flattenedToppars)); /* Note: we don't use flattenedToppars here because resume flattens them again. */ return () => this.resume(toppars); @@ -2034,4 +2038,4 @@ class Consumer { } } -module.exports = { Consumer, PartitionAssigners: Object.freeze(PartitionAssigners), }; +module.exports = { Consumer, PartitionAssigners: Object.freeze(PartitionAssigners), ConsumerState, }; diff --git a/lib/kafkajs/_kafka.js b/lib/kafkajs/_kafka.js index e671d94a..525c0469 100644 --- a/lib/kafkajs/_kafka.js +++ b/lib/kafkajs/_kafka.js @@ -1,5 +1,5 @@ -const { Producer, CompressionTypes } = require('./_producer'); -const { Consumer, PartitionAssigners } = require('./_consumer'); +const { Producer, CompressionTypes, ProducerState } = require('./_producer'); +const { Consumer, PartitionAssigners, ConsumerState } = require('./_consumer'); const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin'); const error = require('./_error'); const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common'); @@ -118,7 +118,10 @@ module.exports = { ...error, logLevel, PartitionAssigners, PartitionAssignors: PartitionAssigners, + ConsumerState, + ProducerState, CompressionTypes, ConsumerGroupStates, AclOperationTypes, - IsolationLevel}; + IsolationLevel +}; diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js index bf1e058d..b1a2e0d3 100644 --- a/lib/kafkajs/_producer.js +++ b/lib/kafkajs/_producer.js @@ -96,6 +96,11 @@ class Producer { */ #state = ProducerState.INIT; + /** + * @returns {ProducerState} the client state. + */ + getState() { return this.#state } + /** * ongoingTransaction is true if there is an ongoing transaction. * @type {boolean} @@ -845,4 +850,4 @@ class Producer { } } -module.exports = { Producer, CompressionTypes: Object.freeze(CompressionTypes) }; +module.exports = { Producer, CompressionTypes: Object.freeze(CompressionTypes), ProducerState }; diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 58d03a5b..6f157753 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -186,6 +186,16 @@ export type PartitionMetadata = { export type Transaction = Producer; +export enum ProducerState { + INIT = 0, + CONNECTING = 1, + INITIALIZING_TRANSACTIONS = 2, + INITIALIZED_TRANSACTIONS = 3, + CONNECTED = 4, + DISCONNECTING = 5, + DISCONNECTED = 6, +} + export type Producer = Client & { send(record: ProducerRecord): Promise<RecordMetadata[]> sendBatch(batch: ProducerBatch): Promise<RecordMetadata[]> @@ -197,6 +207,7 @@ export type Producer = Client & { abort(): Promise<void> sendOffsets(args: { consumer: Consumer, topics: TopicOffsets[] }): Promise<void> isActive(): boolean + getState(): ProducerState } export enum PartitionAssigners { @@ -352,6 +363,14 @@ export type ITopicMetadata = { authorizedOperations?: AclOperationTypes[] } +export enum ConsumerState { + INIT = 0, + CONNECTING = 1, + CONNECTED = 2, + DISCONNECTING = 3, + DISCONNECTED = 4, +} + export type Consumer = Client & { subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void> stop(): Promise<void> @@ -364,6 +383,7 @@ export type Consumer = Client & { paused(): TopicPartitions[] resume(topics: Array<{ topic: string; partitions?: number[] }>): void assignment(): TopicPartition[] + getState(): ConsumerState } export interface AdminConfig { @@ -414,8 +434,9 @@ export type Admin = { groupId: string, topics?: TopicInput, timeout?: number, - requireStableOffsets?: boolean }): - Promise<Array<{topic: string; partitions:FetchOffsetsPartition[]}>> + requireStableOffsets?: boolean + }): + Promise<Array<{ topic: string; partitions: FetchOffsetsPartition[] }>> deleteTopicRecords(options: { topic: string; partitions: SeekEntry[]; timeout?: number; operationTimeout?: number