Skip to content

expose client states #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 38 additions & 34 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class Consumer {
*/
#state = ConsumerState.INIT;

/**
* @returns {ConsumerState} the client state.
*/
getState() { return this.#state }

/**
* Contains a mapping of topic+partition to an offset that the user wants to seek to.
* The keys are of the type "<topic>|<partition>".
Expand Down Expand Up @@ -414,8 +419,8 @@ class Consumer {
try {
alternateAssignment = await userSpecifiedRebalanceCb(err, assignment, assignmentFns);
} catch (e) {
this.#logger.error(`Error from user's rebalance callback: ${e.stack}, `+
'continuing with the default rebalance behavior.');
this.#logger.error(`Error from user's rebalance callback: ${e.stack}, ` +
'continuing with the default rebalance behavior.');
}

if (alternateAssignment) {
Expand Down Expand Up @@ -639,11 +644,10 @@ class Consumer {
/* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks.
* TODO: add trampoline method for offset commit callback. */
rdKafkaConfig['offset_commit_cb'] = true;
rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e =>
{
if (this.#logger)
this.#logger.error(`Error from rebalance callback: ${e.stack}`);
});
rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e => {
if (this.#logger)
this.#logger.error(`Error from rebalance callback: ${e.stack}`);
});

/* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this
* setting and set it to false. */
Expand All @@ -660,16 +664,16 @@ class Consumer {
this.#autoCommit = rdKafkaConfig['enable.auto.commit'];
}

/**
* Actual max poll interval is twice the configured max poll interval,
* because we want to ensure that when we ask for worker termination,
* and there is one last message to be processed, we can process it in
* the configured max poll interval time.
* This will cause the rebalance callback timeout to be double
* the value of the configured max poll interval.
* But it's expected otherwise we cannot have a cache and need to consider
* max poll interval reached on processing the very first message.
*/
/**
* Actual max poll interval is twice the configured max poll interval,
* because we want to ensure that when we ask for worker termination,
* and there is one last message to be processed, we can process it in
* the configured max poll interval time.
* This will cause the rebalance callback timeout to be double
* the value of the configured max poll interval.
* But it's expected otherwise we cannot have a cache and need to consider
* max poll interval reached on processing the very first message.
*/
this.#maxPollIntervalMs = rdKafkaConfig['max.poll.interval.ms'] ?? 300000;
this.#cacheExpirationTimeoutMs = this.#maxPollIntervalMs;
rdKafkaConfig['max.poll.interval.ms'] = this.#maxPollIntervalMs * 2;
Expand Down Expand Up @@ -833,14 +837,14 @@ class Consumer {
* only if the size has been requested to be increased twice in a row.
* @private
*/
#increaseMaxSize() {
if (this.#messageCacheMaxSize === 1024)
return;
this.#increaseCount++;
if (this.#increaseCount <= 1)
return;
this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024);
this.#increaseCount = 0;
#increaseMaxSize() {
if (this.#messageCacheMaxSize === 1024)
return;
this.#increaseCount++;
if (this.#increaseCount <= 1)
return;
this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024);
this.#increaseCount = 0;
}

/**
Expand All @@ -850,8 +854,8 @@ class Consumer {
* @private
*/
#decreaseMaxSize(recvdSize) {
this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1);
this.#increaseCount = 0;
this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1);
this.#increaseCount = 0;
}

/**
Expand Down Expand Up @@ -1008,7 +1012,7 @@ class Consumer {
}

return this.#fetchAndResolveWith(() =>
this.#messageCache.nextN(null, size),
this.#messageCache.nextN(null, size),
this.#messageCacheMaxSize);
}

Expand Down Expand Up @@ -1404,7 +1408,7 @@ class Consumer {
});

if (this.#pendingOperations.length) {
ppc = this.#discardMessages(ms, ppc);
ppc = this.#discardMessages(ms, ppc);
break;
}

Expand All @@ -1423,7 +1427,7 @@ class Consumer {
async #checkMaxPollIntervalNotExceeded(now) {
const maxPollExpiration = this.#lastFetchClockNs +
BigInt((this.#cacheExpirationTimeoutMs + this.#maxPollIntervalMs)
* 1e6);
* 1e6);

let interval = Number(maxPollExpiration - now) / 1e6;
if (interval < 1)
Expand Down Expand Up @@ -1717,7 +1721,7 @@ class Consumer {
await Promise.allSettled(librdkafkaSeekPromises);
await Promise.all(librdkafkaSeekPromises);

for (const [key, ] of seekedPartitions) {
for (const [key,] of seekedPartitions) {
this.#pendingSeeks.delete(key);
}

Expand Down Expand Up @@ -1804,7 +1808,7 @@ class Consumer {
this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset]);

this.#addPendingOperation(() =>
this.#seekInternal([rdKafkaTopicPartitionOffset]));
this.#seekInternal([rdKafkaTopicPartitionOffset]));
}

async describeGroup() {
Expand Down Expand Up @@ -1895,7 +1899,7 @@ class Consumer {
topicPartition));

this.#addPendingOperation(() =>
this.#pauseInternal(flattenedToppars));
this.#pauseInternal(flattenedToppars));

/* Note: we don't use flattenedToppars here because resume flattens them again. */
return () => this.resume(toppars);
Expand Down Expand Up @@ -2034,4 +2038,4 @@ class Consumer {
}
}

module.exports = { Consumer, PartitionAssigners: Object.freeze(PartitionAssigners), };
module.exports = { Consumer, PartitionAssigners: Object.freeze(PartitionAssigners), ConsumerState, };
9 changes: 6 additions & 3 deletions lib/kafkajs/_kafka.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const { Producer, CompressionTypes } = require('./_producer');
const { Consumer, PartitionAssigners } = require('./_consumer');
const { Producer, CompressionTypes, ProducerState } = require('./_producer');
const { Consumer, PartitionAssigners, ConsumerState } = require('./_consumer');
const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
const error = require('./_error');
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');
Expand Down Expand Up @@ -118,7 +118,10 @@ module.exports = {
...error, logLevel,
PartitionAssigners,
PartitionAssignors: PartitionAssigners,
ConsumerState,
ProducerState,
CompressionTypes,
ConsumerGroupStates,
AclOperationTypes,
IsolationLevel};
IsolationLevel
};
7 changes: 6 additions & 1 deletion lib/kafkajs/_producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ class Producer {
*/
#state = ProducerState.INIT;

/**
* @returns {ProducerState} the client state.
*/
getState() { return this.#state }

/**
* ongoingTransaction is true if there is an ongoing transaction.
* @type {boolean}
Expand Down Expand Up @@ -845,4 +850,4 @@ class Producer {
}
}

module.exports = { Producer, CompressionTypes: Object.freeze(CompressionTypes) };
module.exports = { Producer, CompressionTypes: Object.freeze(CompressionTypes), ProducerState };
25 changes: 23 additions & 2 deletions types/kafkajs.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ export type PartitionMetadata = {

export type Transaction = Producer;

export enum ProducerState {
INIT = 0,
CONNECTING = 1,
INITIALIZING_TRANSACTIONS = 2,
INITIALIZED_TRANSACTIONS = 3,
CONNECTED = 4,
DISCONNECTING = 5,
DISCONNECTED = 6,
}

export type Producer = Client & {
send(record: ProducerRecord): Promise<RecordMetadata[]>
sendBatch(batch: ProducerBatch): Promise<RecordMetadata[]>
Expand All @@ -197,6 +207,7 @@ export type Producer = Client & {
abort(): Promise<void>
sendOffsets(args: { consumer: Consumer, topics: TopicOffsets[] }): Promise<void>
isActive(): boolean
getState(): ProducerState
}

export enum PartitionAssigners {
Expand Down Expand Up @@ -352,6 +363,14 @@ export type ITopicMetadata = {
authorizedOperations?: AclOperationTypes[]
}

export enum ConsumerState {
INIT = 0,
CONNECTING = 1,
CONNECTED = 2,
DISCONNECTING = 3,
DISCONNECTED = 4,
}

export type Consumer = Client & {
subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
stop(): Promise<void>
Expand All @@ -364,6 +383,7 @@ export type Consumer = Client & {
paused(): TopicPartitions[]
resume(topics: Array<{ topic: string; partitions?: number[] }>): void
assignment(): TopicPartition[]
getState(): ConsumerState
}

export interface AdminConfig {
Expand Down Expand Up @@ -414,8 +434,9 @@ export type Admin = {
groupId: string,
topics?: TopicInput,
timeout?: number,
requireStableOffsets?: boolean }):
Promise<Array<{topic: string; partitions:FetchOffsetsPartition[]}>>
requireStableOffsets?: boolean
}):
Promise<Array<{ topic: string; partitions: FetchOffsetsPartition[] }>>
deleteTopicRecords(options: {
topic: string; partitions: SeekEntry[];
timeout?: number; operationTimeout?: number
Expand Down