-
Notifications
You must be signed in to change notification settings - Fork 17
Avoid a race condition that causes 100% usage of a CPU core #300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…uming with `partitionsConsumedConcurrently > 1` and all messages are consumed. Closes #195
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a race condition that was causing 100% CPU usage during message consumption when partitionsConsumedConcurrently > 1. The changes introduce improved synchronization via DeferredPromises, update test configurations to better stress the fix, and adjust worker termination logic.
- Increased the volume of test messages and added flush calls to ensure proper synchronization.
- Modified consumer internals to notify and await available partitions, and improved worker termination handling.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
test/promisified/producer/flush.spec.js | Increased the test message count to stress the condition. |
test/promisified/consumer/seek.spec.js | Added flush calls to avoid out-of-range errors during seeking. |
test/promisified/consumer/consumeMessages.spec.js | Adjusted message volume in tests to align with the new race condition fix. |
lib/kafkajs/_consumer_cache.js | Introduced DeferredPromise for available partitions and notification logic. |
lib/kafkajs/_consumer.js | Refactored consumer flow; updated error handling and worker termination logic. |
CHANGELOG.md | Updated changelog to document the race condition fix. |
Comments suppressed due to low confidence (2)
test/promisified/producer/flush.spec.js:78
- Increasing the number of messages from 100 to 1000 can impact test execution time; please ensure test timeouts and resource usage are adjusted accordingly.
producer.send({ topic: topicName, messages: Array(1000).fill(message) }).then(() => {
lib/kafkajs/_consumer.js:1678
- Replacing direct resolution with a dedicated method improves consistency; please double-check that all race conditions related to worker termination are adequately handled.
this.#resolveWorkerTerminationScheduled();
@@ -1316,7 +1321,7 @@ class Consumer { | |||
|
|||
/* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek | |||
* back to get it so it can be reprocessed. */ | |||
if (lastOffsetProcessed.offset !== lastOffset) { | |||
if (!payload._seeked && lastOffsetProcessed.offset !== lastOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition now checks for the _seeked flag to avoid redundant seeks; please ensure this logic meets the intended behavior for reprocessing messages.
Copilot uses AI. Check for mistakes.
@@ -348,6 +352,11 @@ describe('Consumer seek >', () => { | |||
}); | |||
|
|||
describe('batch staleness >', () => { | |||
beforeEach(async () => { | |||
// Theses tests expect a single partititon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: Theses
it's not possible that we await for a different reason than the one that caused to return null in the first place
…if all partitions are paused
dd9e69c
to
5dd12ee
Compare
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some review comments, doing another pass
@@ -410,6 +422,13 @@ describe('Consumer seek >', () => { | |||
|
|||
consumer.run({ | |||
eachBatch: async ({ batch, isStale, resolveOffset }) => { | |||
if (offsetsConsumed.length == 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ===
@@ -4,6 +4,7 @@ const { | |||
secureRandom, | |||
createTopic, | |||
createAdmin, | |||
sleep, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove as it is unused
lib/kafkajs/_consumer_cache.js
Outdated
* Promise that resolved when there are available partitions to take. | ||
*/ | ||
async availablePartitions() { | ||
await this.#availablePartitionsPromise; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can just return instead of await for identical behaviour
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix! I checked additionally to make sure we're not leaking promises etc.
when consuming with
partitionsConsumedConcurrently > 1
and all messages are consumed.Closes #195
What
When there are no messages available it's possible both workers alternate in fetching or waiting for
#fetchInProgress
and never reach the state where they await for#queueNonEmptyCb
. Solved by having a different promise when a non-empty event is needed and avoiding resolving it when a fetch is in progress to avoid a double callback causes one of the two events to be lost.Checklist
References
JIRA:
Test & Review
For reproducing before the fix or testing use the example code provided in #195
Open questions / Follow-ups