Skip to content

Avoid a race condition that causes 100% usage of a CPU core #300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# confluent-kafka-javascript v1.3.1

v1.3.1 is a maintenance release. It is supported for all usage.

## Fixes

1. Avoid a race condition that causes 100% usage of a CPU core when
consuming with `partitionsConsumedConcurrently > 1` and all messages
are consumed (#300)


# confluent-kafka-javascript v1.3.0

v1.3.0 is a feature release. It is supported for all usage.
Expand Down
166 changes: 102 additions & 64 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
const { Buffer } = require('buffer');
const MessageCache = require('./_consumer_cache');
const { hrtime } = require('process');
const { LinkedList } = require('./_linked-list');

const ConsumerState = Object.freeze({
INIT: 0,
Expand Down Expand Up @@ -203,11 +202,10 @@
* It's set to null when no fetch is in progress.
*/
#fetchInProgress;

/**
* List of DeferredPromises waiting on consumer queue to be non-empty.
* Are we waiting for the queue to be non-empty?
*/
#queueWaiters = new LinkedList();
#nonEmpty = null;

/**
* Whether any rebalance callback is in progress.
Expand Down Expand Up @@ -363,7 +361,6 @@
*/
async #rebalanceCallback(err, assignment) {
const isLost = this.#internalClient.assignmentLost();
this.#rebalanceCbInProgress = new DeferredPromise();
let assignmentFnCalled = false;
this.#logger.info(
`Received rebalance event with message: '${err.message}' and ${assignment.length} partition(s), isLost: ${isLost}`,
Expand Down Expand Up @@ -468,7 +465,7 @@
*/
const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount));
if (workersToSpawn !== this.#workers.length) {
this.#workerTerminationScheduled.resolve();
this.#resolveWorkerTerminationScheduled();
/* We don't need to await the workers here. We are OK if the termination and respawning
* occurs later, since even if we have a few more or few less workers for a while, it's
* not a big deal. */
Expand Down Expand Up @@ -639,11 +636,14 @@
/* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks.
* TODO: add trampoline method for offset commit callback. */
rdKafkaConfig['offset_commit_cb'] = true;
rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e =>
rdKafkaConfig['rebalance_cb'] = (err, assignment) => {
this.#rebalanceCbInProgress = new DeferredPromise();
this.#rebalanceCallback(err, assignment).catch(e =>
{
if (this.#logger)
this.#logger.error(`Error from rebalance callback: ${e.stack}`);
});
};

/* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this
* setting and set it to false. */
Expand Down Expand Up @@ -904,6 +904,7 @@
const returnPayload = {
batch,
_stale: false,
_seeked: false,
_lastResolvedOffset: { offset: -1, leaderEpoch: -1 },
heartbeat: async () => { /* no op */ },
pause: this.pause.bind(this, [{ topic, partitions: [partition] }]),
Expand All @@ -922,9 +923,25 @@

async #fetchAndResolveWith(takeFromCache, size) {
if (this.#fetchInProgress) {
await this.#fetchInProgress;
/* Restart with the checks as we might have
* a new fetch in progress already. */
return null;
}

if (this.#nonEmpty) {
await this.#nonEmpty;
/* Restart with the checks as we might have
* a new fetch in progress already. */
return null;
}

if (this.#workerTerminationScheduled.resolved) {
/* Return without fetching. */
return null;
}

let err, messages, processedRebalance = false;
try {
this.#fetchInProgress = new DeferredPromise();
const fetchResult = new DeferredPromise();
Expand All @@ -933,8 +950,9 @@
this.#internalClient.consume(size, (err, messages) =>
fetchResult.resolve([err, messages]));

let [err, messages] = await fetchResult;
[err, messages] = await fetchResult;

Check failure on line 953 in lib/kafkajs/_consumer.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

lib/kafkajs/_consumer.js#L953

Refactor this redundant 'await' on a non-promise.
if (this.#rebalanceCbInProgress) {
processedRebalance = true;
await this.#rebalanceCbInProgress;
this.#rebalanceCbInProgress = null;
}
Expand All @@ -956,6 +974,8 @@
} finally {
this.#fetchInProgress.resolve();
this.#fetchInProgress = null;
if (!err && !processedRebalance && this.#messageCache.assignedSize === 0)
this.#nonEmpty = new DeferredPromise();
}
}

Expand All @@ -973,10 +993,13 @@
}

/* It's possible that we get msg = null, but that's because partitionConcurrency
* exceeds the number of partitions containing messages. So in this case,
* we should not call for new fetches, rather, try to focus on what we have left.
* exceeds the number of partitions containing messages. So
* we should wait for a new partition to be available.
*/
if (!msg && this.#messageCache.assignedSize !== 0) {
await this.#messageCache.availablePartitions();
/* Restart with the checks as we might have
* the cache full. */
return null;
}

Expand All @@ -1000,10 +1023,13 @@
}

/* It's possible that we get msgs = null, but that's because partitionConcurrency
* exceeds the number of partitions containing messages. So in this case,
* we should not call for new fetches, rather, try to focus on what we have left.
* exceeds the number of partitions containing messages. So
* we should wait for a new partition to be available.
*/
if (!msgs && this.#messageCache.assignedSize !== 0) {
await this.#messageCache.availablePartitions();
/* Restart with the checks as we might have
* the cache full. */
return null;
}

Expand Down Expand Up @@ -1316,7 +1342,7 @@

/* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek
* back to get it so it can be reprocessed. */
if (lastOffsetProcessed.offset !== lastOffset) {
if (!payload._seeked && lastOffsetProcessed.offset !== lastOffset) {
Copy link
Preview

Copilot AI Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition now checks for the _seeked flag to avoid redundant seeks; please ensure this logic meets the intended behavior for reprocessing messages.

Copilot uses AI. Check for mistakes.

const offsetToSeekTo = lastOffsetProcessed.offset === -1 ? firstMessage.offset : (lastOffsetProcessed.offset + 1);
const leaderEpoch = lastOffsetProcessed.offset === -1 ? firstMessage.leaderEpoch : lastOffsetProcessed.leaderEpoch;
this.seek({
Expand Down Expand Up @@ -1348,36 +1374,27 @@
return ppc;
}

#queueNonEmptyCb() {
for (const waiter of this.#queueWaiters) {
waiter.resolve();
#notifyNonEmpty() {
if (this.#nonEmpty) {
this.#nonEmpty.resolve();
this.#nonEmpty = null;
}
if (this.#messageCache)
this.#messageCache.notifyAvailablePartitions();
}

async #nextFetchRetry() {
if (this.#fetchInProgress) {
await this.#fetchInProgress;
} else {
/* Backoff a little. If m is null, we might be without messages
* or in available partition starvation, and calling consumeSingleCached
* in a tight loop will help no one.
* In case there is any message in the queue, we'll be woken up before the
* timer expires.
* We have a per-worker promise, otherwise we end up awakening
* other workers when they've already looped and just restarted awaiting.
* The `Promise` passed to `Timer.withTimeout` cannot be reused
* in next call to this method, to avoid memory leaks caused
* by `Promise.race`. */
const waiter = new DeferredPromise();
const waiterNode = this.#queueWaiters.addLast(waiter);
await Timer.withTimeout(1000, waiter);

/* Resolves the "extra" promise that has been spawned when creating the timer. */
waiter.resolve();
this.#queueWaiters.remove(waiterNode);
}
}
#queueNonEmptyCb() {
const nonEmptyAction = async () => {
if (this.#fetchInProgress)
await this.#fetchInProgress;

this.#notifyNonEmpty();
};
nonEmptyAction().catch((e) => {
this.#logger.error(`Error in queueNonEmptyCb: ${e}`,
this.#createConsumerBindingMessageMetadata());
});
}
/**
* Starts a worker to fetch messages/batches from the internal consumer and process them.
*
Expand All @@ -1393,27 +1410,24 @@
*/
async #worker(config, perMessageProcessor, fetcher) {
let ppc = null;

while (!this.#workerTerminationScheduled.resolved) {
try {
const ms = await fetcher(ppc);
if (!ms)
continue;

const ms = await fetcher(ppc).catch(e => {
if (this.#pendingOperations.length) {
ppc = this.#discardMessages(ms, ppc);
break;
}

ppc = await perMessageProcessor(ms, config);
} catch (e) {
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
* This is due to restartOnFailure being set to always true. */
if (this.#logger)
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${e} : ${e.stack}`, this.#createConsumerBindingMessageMetadata());
});

if (this.#pendingOperations.length) {
ppc = this.#discardMessages(ms, ppc);
break;
}

if (!ms) {
await this.#nextFetchRetry();
continue;
}

ppc = await perMessageProcessor(ms, config);
}

if (ppc)
Expand Down Expand Up @@ -1447,19 +1461,32 @@
* @private
*/
async #cacheExpirationLoop() {
const cacheExpirationInterval = BigInt(this.#cacheExpirationTimeoutMs * 1e6);
const maxFetchInterval = BigInt(1000 * 1e6);
while (!this.#workerTerminationScheduled.resolved) {
let now = hrtime.bigint();
const cacheExpiration = this.#lastFetchClockNs +
BigInt(this.#cacheExpirationTimeoutMs * 1e6);
const cacheExpirationTimeout = this.#lastFetchClockNs +
cacheExpirationInterval;
const maxFetchTimeout = this.#lastFetchClockNs +
maxFetchInterval;

if (now > cacheExpiration) {
if (now > cacheExpirationTimeout) {
this.#addPendingOperation(() =>
this.#clearCacheAndResetPositions());
await this.#checkMaxPollIntervalNotExceeded(now);
break;
}
if (now > maxFetchTimeout) {
/* We need to continue fetching even when we're
* not getting any messages, for example when all partitions are
* paused. */
this.#notifyNonEmpty();
}

let interval = Number(cacheExpiration - now) / 1e6;
const awakeTime = maxFetchTimeout < cacheExpirationTimeout ?
maxFetchTimeout : cacheExpirationTimeout;

let interval = Number(awakeTime - now) / 1e6;
if (interval < 100)
interval = 100;
await Timer.withTimeout(interval, this.#maxPollIntervalRestart);
Expand All @@ -1481,6 +1508,13 @@
this.#pendingOperations = [];
}

#resolveWorkerTerminationScheduled() {
if (this.#workerTerminationScheduled) {
this.#workerTerminationScheduled.resolve();
this.#queueNonEmptyCb();
}
}

/**
* Internal polling loop.
* Spawns and awaits workers until disconnect is initiated.
Expand Down Expand Up @@ -1662,7 +1696,7 @@

#addPendingOperation(fun) {
if (this.#pendingOperations.length === 0) {
this.#workerTerminationScheduled.resolve();
this.#resolveWorkerTerminationScheduled();
}
this.#pendingOperations.push(fun);
}
Expand Down Expand Up @@ -1727,11 +1761,15 @@
}
}

#markBatchPayloadsStale(topicPartitions) {
#markBatchPayloadsStale(topicPartitions, isSeek) {
for (const topicPartition of topicPartitions) {
const key = partitionKey(topicPartition);
if (this.#topicPartitionToBatchPayload.has(key))
this.#topicPartitionToBatchPayload.get(key)._stale = true;
if (this.#topicPartitionToBatchPayload.has(key)) {
const payload = this.#topicPartitionToBatchPayload.get(key);
payload._stale = true;
if (isSeek)
payload._seeked = true;
}
}
}

Expand All @@ -1757,7 +1795,7 @@
}
}
if (seekOffsets.length) {
await this.#seekInternal(seekOffsets, false);
await this.#seekInternal(seekOffsets);
}
}

Expand Down Expand Up @@ -1801,7 +1839,7 @@
}

/* If anyone's using eachBatch, mark the batch as stale. */
this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset]);
this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset], true);

this.#addPendingOperation(() =>
this.#seekInternal([rdKafkaTopicPartitionOffset]));
Expand Down Expand Up @@ -2010,7 +2048,7 @@
}

this.#disconnectStarted = true;
this.#workerTerminationScheduled.resolve();
this.#resolveWorkerTerminationScheduled();
this.#logger.debug("Signalling disconnection attempt to workers", this.#createConsumerBindingMessageMetadata());
await this.#lock.write(async () => {

Expand Down
Loading