diff --git a/CHANGELOG.md b/CHANGELOG.md index dd2bbcc9..95275824 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +# confluent-kafka-javascript v1.3.1 + +v1.3.1 is a maintenance release. It is supported for all usage. + +## Fixes + +1. Avoid a race condition that causes 100% usage of a CPU core when + consuming with `partitionsConsumedConcurrently > 1` and all messages + are consumed (#300) + + # confluent-kafka-javascript v1.3.0 v1.3.0 is a feature release. It is supported for all usage. diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index c6410547..8de46c6e 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -24,7 +24,6 @@ const { const { Buffer } = require('buffer'); const MessageCache = require('./_consumer_cache'); const { hrtime } = require('process'); -const { LinkedList } = require('./_linked-list'); const ConsumerState = Object.freeze({ INIT: 0, @@ -203,11 +202,10 @@ class Consumer { * It's set to null when no fetch is in progress. */ #fetchInProgress; - /** - * List of DeferredPromises waiting on consumer queue to be non-empty. + * Are we waiting for the queue to be non-empty? */ - #queueWaiters = new LinkedList(); + #nonEmpty = null; /** * Whether any rebalance callback is in progress. @@ -363,7 +361,6 @@ class Consumer { */ async #rebalanceCallback(err, assignment) { const isLost = this.#internalClient.assignmentLost(); - this.#rebalanceCbInProgress = new DeferredPromise(); let assignmentFnCalled = false; this.#logger.info( `Received rebalance event with message: '${err.message}' and ${assignment.length} partition(s), isLost: ${isLost}`, @@ -468,7 +465,7 @@ class Consumer { */ const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount)); if (workersToSpawn !== this.#workers.length) { - this.#workerTerminationScheduled.resolve(); + this.#resolveWorkerTerminationScheduled(); /* We don't need to await the workers here. We are OK if the termination and respawning * occurs later, since even if we have a few more or few less workers for a while, it's * not a big deal. */ @@ -639,11 +636,14 @@ class Consumer { /* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks. * TODO: add trampoline method for offset commit callback. */ rdKafkaConfig['offset_commit_cb'] = true; - rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e => + rdKafkaConfig['rebalance_cb'] = (err, assignment) => { + this.#rebalanceCbInProgress = new DeferredPromise(); + this.#rebalanceCallback(err, assignment).catch(e => { if (this.#logger) this.#logger.error(`Error from rebalance callback: ${e.stack}`); }); + }; /* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this * setting and set it to false. */ @@ -904,6 +904,7 @@ class Consumer { const returnPayload = { batch, _stale: false, + _seeked: false, _lastResolvedOffset: { offset: -1, leaderEpoch: -1 }, heartbeat: async () => { /* no op */ }, pause: this.pause.bind(this, [{ topic, partitions: [partition] }]), @@ -922,9 +923,25 @@ class Consumer { async #fetchAndResolveWith(takeFromCache, size) { if (this.#fetchInProgress) { + await this.#fetchInProgress; + /* Restart with the checks as we might have + * a new fetch in progress already. */ + return null; + } + + if (this.#nonEmpty) { + await this.#nonEmpty; + /* Restart with the checks as we might have + * a new fetch in progress already. */ return null; } + if (this.#workerTerminationScheduled.resolved) { + /* Return without fetching. */ + return null; + } + + let err, messages, processedRebalance = false; try { this.#fetchInProgress = new DeferredPromise(); const fetchResult = new DeferredPromise(); @@ -933,8 +950,9 @@ class Consumer { this.#internalClient.consume(size, (err, messages) => fetchResult.resolve([err, messages])); - let [err, messages] = await fetchResult; + [err, messages] = await fetchResult; if (this.#rebalanceCbInProgress) { + processedRebalance = true; await this.#rebalanceCbInProgress; this.#rebalanceCbInProgress = null; } @@ -956,6 +974,8 @@ class Consumer { } finally { this.#fetchInProgress.resolve(); this.#fetchInProgress = null; + if (!err && !processedRebalance && this.#messageCache.assignedSize === 0) + this.#nonEmpty = new DeferredPromise(); } } @@ -973,10 +993,13 @@ class Consumer { } /* It's possible that we get msg = null, but that's because partitionConcurrency - * exceeds the number of partitions containing messages. So in this case, - * we should not call for new fetches, rather, try to focus on what we have left. + * exceeds the number of partitions containing messages. So + * we should wait for a new partition to be available. */ if (!msg && this.#messageCache.assignedSize !== 0) { + await this.#messageCache.availablePartitions(); + /* Restart with the checks as we might have + * the cache full. */ return null; } @@ -1000,10 +1023,13 @@ class Consumer { } /* It's possible that we get msgs = null, but that's because partitionConcurrency - * exceeds the number of partitions containing messages. So in this case, - * we should not call for new fetches, rather, try to focus on what we have left. + * exceeds the number of partitions containing messages. So + * we should wait for a new partition to be available. */ if (!msgs && this.#messageCache.assignedSize !== 0) { + await this.#messageCache.availablePartitions(); + /* Restart with the checks as we might have + * the cache full. */ return null; } @@ -1316,7 +1342,7 @@ class Consumer { /* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek * back to get it so it can be reprocessed. */ - if (lastOffsetProcessed.offset !== lastOffset) { + if (!payload._seeked && lastOffsetProcessed.offset !== lastOffset) { const offsetToSeekTo = lastOffsetProcessed.offset === -1 ? firstMessage.offset : (lastOffsetProcessed.offset + 1); const leaderEpoch = lastOffsetProcessed.offset === -1 ? firstMessage.leaderEpoch : lastOffsetProcessed.leaderEpoch; this.seek({ @@ -1348,36 +1374,27 @@ class Consumer { return ppc; } - #queueNonEmptyCb() { - for (const waiter of this.#queueWaiters) { - waiter.resolve(); + #notifyNonEmpty() { + if (this.#nonEmpty) { + this.#nonEmpty.resolve(); + this.#nonEmpty = null; } + if (this.#messageCache) + this.#messageCache.notifyAvailablePartitions(); } - async #nextFetchRetry() { - if (this.#fetchInProgress) { - await this.#fetchInProgress; - } else { - /* Backoff a little. If m is null, we might be without messages - * or in available partition starvation, and calling consumeSingleCached - * in a tight loop will help no one. - * In case there is any message in the queue, we'll be woken up before the - * timer expires. - * We have a per-worker promise, otherwise we end up awakening - * other workers when they've already looped and just restarted awaiting. - * The `Promise` passed to `Timer.withTimeout` cannot be reused - * in next call to this method, to avoid memory leaks caused - * by `Promise.race`. */ - const waiter = new DeferredPromise(); - const waiterNode = this.#queueWaiters.addLast(waiter); - await Timer.withTimeout(1000, waiter); - - /* Resolves the "extra" promise that has been spawned when creating the timer. */ - waiter.resolve(); - this.#queueWaiters.remove(waiterNode); - } - } + #queueNonEmptyCb() { + const nonEmptyAction = async () => { + if (this.#fetchInProgress) + await this.#fetchInProgress; + this.#notifyNonEmpty(); + }; + nonEmptyAction().catch((e) => { + this.#logger.error(`Error in queueNonEmptyCb: ${e}`, + this.#createConsumerBindingMessageMetadata()); + }); + } /** * Starts a worker to fetch messages/batches from the internal consumer and process them. * @@ -1393,27 +1410,24 @@ class Consumer { */ async #worker(config, perMessageProcessor, fetcher) { let ppc = null; - while (!this.#workerTerminationScheduled.resolved) { + try { + const ms = await fetcher(ppc); + if (!ms) + continue; - const ms = await fetcher(ppc).catch(e => { + if (this.#pendingOperations.length) { + ppc = this.#discardMessages(ms, ppc); + break; + } + + ppc = await perMessageProcessor(ms, config); + } catch (e) { /* Since this error cannot be exposed to the user in the current situation, just log and retry. * This is due to restartOnFailure being set to always true. */ if (this.#logger) this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${e} : ${e.stack}`, this.#createConsumerBindingMessageMetadata()); - }); - - if (this.#pendingOperations.length) { - ppc = this.#discardMessages(ms, ppc); - break; - } - - if (!ms) { - await this.#nextFetchRetry(); - continue; } - - ppc = await perMessageProcessor(ms, config); } if (ppc) @@ -1447,19 +1461,32 @@ class Consumer { * @private */ async #cacheExpirationLoop() { + const cacheExpirationInterval = BigInt(this.#cacheExpirationTimeoutMs * 1e6); + const maxFetchInterval = BigInt(1000 * 1e6); while (!this.#workerTerminationScheduled.resolved) { let now = hrtime.bigint(); - const cacheExpiration = this.#lastFetchClockNs + - BigInt(this.#cacheExpirationTimeoutMs * 1e6); + const cacheExpirationTimeout = this.#lastFetchClockNs + + cacheExpirationInterval; + const maxFetchTimeout = this.#lastFetchClockNs + + maxFetchInterval; - if (now > cacheExpiration) { + if (now > cacheExpirationTimeout) { this.#addPendingOperation(() => this.#clearCacheAndResetPositions()); await this.#checkMaxPollIntervalNotExceeded(now); break; } + if (now > maxFetchTimeout) { + /* We need to continue fetching even when we're + * not getting any messages, for example when all partitions are + * paused. */ + this.#notifyNonEmpty(); + } - let interval = Number(cacheExpiration - now) / 1e6; + const awakeTime = maxFetchTimeout < cacheExpirationTimeout ? + maxFetchTimeout : cacheExpirationTimeout; + + let interval = Number(awakeTime - now) / 1e6; if (interval < 100) interval = 100; await Timer.withTimeout(interval, this.#maxPollIntervalRestart); @@ -1481,6 +1508,13 @@ class Consumer { this.#pendingOperations = []; } + #resolveWorkerTerminationScheduled() { + if (this.#workerTerminationScheduled) { + this.#workerTerminationScheduled.resolve(); + this.#queueNonEmptyCb(); + } + } + /** * Internal polling loop. * Spawns and awaits workers until disconnect is initiated. @@ -1662,7 +1696,7 @@ class Consumer { #addPendingOperation(fun) { if (this.#pendingOperations.length === 0) { - this.#workerTerminationScheduled.resolve(); + this.#resolveWorkerTerminationScheduled(); } this.#pendingOperations.push(fun); } @@ -1727,11 +1761,15 @@ class Consumer { } } - #markBatchPayloadsStale(topicPartitions) { + #markBatchPayloadsStale(topicPartitions, isSeek) { for (const topicPartition of topicPartitions) { const key = partitionKey(topicPartition); - if (this.#topicPartitionToBatchPayload.has(key)) - this.#topicPartitionToBatchPayload.get(key)._stale = true; + if (this.#topicPartitionToBatchPayload.has(key)) { + const payload = this.#topicPartitionToBatchPayload.get(key); + payload._stale = true; + if (isSeek) + payload._seeked = true; + } } } @@ -1757,7 +1795,7 @@ class Consumer { } } if (seekOffsets.length) { - await this.#seekInternal(seekOffsets, false); + await this.#seekInternal(seekOffsets); } } @@ -1801,7 +1839,7 @@ class Consumer { } /* If anyone's using eachBatch, mark the batch as stale. */ - this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset]); + this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset], true); this.#addPendingOperation(() => this.#seekInternal([rdKafkaTopicPartitionOffset])); @@ -2010,7 +2048,7 @@ class Consumer { } this.#disconnectStarted = true; - this.#workerTerminationScheduled.resolve(); + this.#resolveWorkerTerminationScheduled(); this.#logger.debug("Signalling disconnection attempt to workers", this.#createConsumerBindingMessageMetadata()); await this.#lock.write(async () => { diff --git a/lib/kafkajs/_consumer_cache.js b/lib/kafkajs/_consumer_cache.js index f0ad8a00..9047688e 100644 --- a/lib/kafkajs/_consumer_cache.js +++ b/lib/kafkajs/_consumer_cache.js @@ -1,5 +1,6 @@ const { partitionKey, + DeferredPromise, } = require('./_common'); const { LinkedList } = require('./_linked-list'); @@ -74,6 +75,8 @@ class MessageCache { /* LinkedList of assigned partitions. */ #assignedPartitions; + /* Promise that is resolved when there are available partitions. */ + #availablePartitionsPromise = new DeferredPromise(); constructor(logger) { this.logger = logger ?? console; @@ -130,6 +133,7 @@ class MessageCache { cache = new PerPartitionMessageCache(key); this.#tpToPpc.set(key, cache); cache._node = this.#availablePartitions.addLast(cache); + this.notifyAvailablePartitions(); } cache._add(message); } @@ -197,6 +201,7 @@ class MessageCache { this.#assignedPartitions.remove(ppc._node); ppc._node = this.#availablePartitions.addLast(ppc); ppc._assigned = false; + this.notifyAvailablePartitions(); } } @@ -260,6 +265,21 @@ class MessageCache { } this.#reinit(); } + + /** + * Notifies awaiters that there are available partitions to take. + */ + notifyAvailablePartitions() { + this.#availablePartitionsPromise.resolve(); + this.#availablePartitionsPromise = new DeferredPromise(); + } + + /** + * Promise that resolved when there are available partitions to take. + */ + async availablePartitions() { + return this.#availablePartitionsPromise; + } } module.exports = MessageCache; diff --git a/test/promisified/admin/list_topics.spec.js b/test/promisified/admin/list_topics.spec.js index 77a3447b..25fa6c36 100644 --- a/test/promisified/admin/list_topics.spec.js +++ b/test/promisified/admin/list_topics.spec.js @@ -26,10 +26,20 @@ describe('Admin > listTopics', () => { it('should timeout', async () => { await admin.connect(); - await expect(admin.listTopics({ timeout: 1 })).rejects.toHaveProperty( - 'code', - ErrorCodes.ERR__TIMED_OUT - ); + while (true) { + try { + await admin.listTopics({ timeout: 0.00001 }); + jest.fail('Should have thrown an error'); + } catch (e) { + if (e.code === ErrorCodes.ERR__TRANSPORT) + continue; + expect(e).toHaveProperty( + 'code', + ErrorCodes.ERR__TIMED_OUT + ); + break; + } + } }); it('should list consumer topics', async () => { diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index 93a3e694..c34a04db 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -412,19 +412,6 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit partitions: partitions, }); - /* Reconfigure producer and consumer in such a way that batches are likely - * to be small and we get multiple partitions in the cache at once. - * This is to reduce flakiness. */ - producer = createProducer({}, { - 'batch.num.messages': 1, - }); - - consumer = createConsumer({ - 'groupId': groupId, - }, { - 'fetch.message.max.bytes': 1, - }); - await producer.connect(); await consumer.connect(); await consumer.subscribe({ topic: topicName }); @@ -451,7 +438,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await waitFor(() => consumer.assignment().length > 0, () => { }, 100); - const messages = Array(1024*9) + const messages = Array(4096 * 3) .fill() .map((_, i) => { const value = secureRandom(512); diff --git a/test/promisified/consumer/rebalanceCallback.spec.js b/test/promisified/consumer/rebalanceCallback.spec.js index 9269e19b..2659f538 100644 --- a/test/promisified/consumer/rebalanceCallback.spec.js +++ b/test/promisified/consumer/rebalanceCallback.spec.js @@ -18,6 +18,7 @@ describe('Consumer', () => { groupId = `consumer-group-id-${secureRandom()}`; consumerConfig = { groupId, + fromBeginning: true, }; consumer = null; await createTopic({ topic: topicName, partitions: 3 }); diff --git a/test/promisified/consumer/seek.spec.js b/test/promisified/consumer/seek.spec.js index 17e1c749..93de9596 100644 --- a/test/promisified/consumer/seek.spec.js +++ b/test/promisified/consumer/seek.spec.js @@ -179,6 +179,9 @@ describe('Consumer seek >', () => { topic: topicName, messages: [message1, message2, message3, message4], }); + // Avoids a validation that resets the offset + // with subsequent seek + await producer.flush(); await consumer.subscribe({ topic: topicName }); @@ -305,6 +308,10 @@ describe('Consumer seek >', () => { const message3 = { key: `key-0`, value: `value-${value3}`, partition: 0 }; await producer.send({ topic: topicName, messages: [message1, message2, message3] }); + // Avoids a validation that resets the offset + // with subsequent seek + await producer.flush(); + await consumer.subscribe({ topic: topicName, }); const messagesConsumed = []; @@ -348,6 +355,11 @@ describe('Consumer seek >', () => { }); describe('batch staleness >', () => { + beforeEach(async () => { + // These tests expect a single partititon + await createTopic({ topic: topicName, partitions: 1 }); + }); + it('stops consuming messages after staleness', async () => { consumer = createConsumer({ groupId, @@ -410,6 +422,13 @@ describe('Consumer seek >', () => { consumer.run({ eachBatch: async ({ batch, isStale, resolveOffset }) => { + if (offsetsConsumed.length === 0 && + batch.messages.length === 1) { + // Await a batch of at least two messages + resolveOffset(batch.messages[0].offset); + return; + } + for (const message of batch.messages) { if (isStale()) break; diff --git a/test/promisified/producer/flush.spec.js b/test/promisified/producer/flush.spec.js index c4f7daf9..64a15b9f 100644 --- a/test/promisified/producer/flush.spec.js +++ b/test/promisified/producer/flush.spec.js @@ -75,7 +75,7 @@ describe('Producer > Flush', () => { let messageSent = false; /* Larger number of messages */ - producer.send({ topic: topicName, messages: Array(100).fill(message) }).then(() => { + producer.send({ topic: topicName, messages: Array(1000).fill(message) }).then(() => { messageSent = true; });