Skip to content

Avoid a race condition that causes 100% usage of a CPU core #300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 16, 2025

Conversation

emasab
Copy link
Contributor

@emasab emasab commented Apr 28, 2025

when consuming with partitionsConsumedConcurrently > 1 and all messages are consumed.

Closes #195

What

When there are no messages available it's possible both workers alternate in fetching or waiting for #fetchInProgress and never reach the state where they await for #queueNonEmptyCb. Solved by having a different promise when a non-empty event is needed and avoiding resolving it when a fetch is in progress to avoid a double callback causes one of the two events to be lost.

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?

References

JIRA:

Test & Review

For reproducing before the fix or testing use the example code provided in #195

Open questions / Follow-ups

Sorry, something went wrong.

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
…uming with `partitionsConsumedConcurrently > 1` and all messages are consumed.

Closes #195
@Copilot Copilot AI review requested due to automatic review settings April 28, 2025 11:45
@emasab emasab requested review from a team as code owners April 28, 2025 11:45
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a race condition that was causing 100% CPU usage during message consumption when partitionsConsumedConcurrently > 1. The changes introduce improved synchronization via DeferredPromises, update test configurations to better stress the fix, and adjust worker termination logic.

  • Increased the volume of test messages and added flush calls to ensure proper synchronization.
  • Modified consumer internals to notify and await available partitions, and improved worker termination handling.

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
test/promisified/producer/flush.spec.js Increased the test message count to stress the condition.
test/promisified/consumer/seek.spec.js Added flush calls to avoid out-of-range errors during seeking.
test/promisified/consumer/consumeMessages.spec.js Adjusted message volume in tests to align with the new race condition fix.
lib/kafkajs/_consumer_cache.js Introduced DeferredPromise for available partitions and notification logic.
lib/kafkajs/_consumer.js Refactored consumer flow; updated error handling and worker termination logic.
CHANGELOG.md Updated changelog to document the race condition fix.
Comments suppressed due to low confidence (2)

test/promisified/producer/flush.spec.js:78

  • Increasing the number of messages from 100 to 1000 can impact test execution time; please ensure test timeouts and resource usage are adjusted accordingly.
producer.send({ topic: topicName, messages: Array(1000).fill(message) }).then(() => {

lib/kafkajs/_consumer.js:1678

  • Replacing direct resolution with a dedicated method improves consistency; please double-check that all race conditions related to worker termination are adequately handled.
this.#resolveWorkerTerminationScheduled();

@@ -1316,7 +1321,7 @@ class Consumer {

/* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek
* back to get it so it can be reprocessed. */
if (lastOffsetProcessed.offset !== lastOffset) {
if (!payload._seeked && lastOffsetProcessed.offset !== lastOffset) {
Copy link
Preview

Copilot AI Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition now checks for the _seeked flag to avoid redundant seeks; please ensure this logic meets the intended behavior for reprocessing messages.

Copilot uses AI. Check for mistakes.

@@ -348,6 +352,11 @@ describe('Consumer seek >', () => {
});

describe('batch staleness >', () => {
beforeEach(async () => {
// Theses tests expect a single partititon

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Theses

emasab added 3 commits April 29, 2025 21:27
it's not possible that we await for a different reason
than the one that caused to return null in the first place
@airlock-confluentinc airlock-confluentinc bot force-pushed the dev_fix_race_condition_high_cpu_usage branch from dd9e69c to 5dd12ee Compare April 30, 2025 16:23
@sonarqube-confluent

This comment has been minimized.

@emasab emasab marked this pull request as ready for review May 1, 2025 08:20
@sonarqube-confluent
Copy link

Passed

Analysis Details

3 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 3 Code Smells

Coverage and Duplications

  • Coverage 93.80% Coverage (47.80% Estimated after merge)
  • Duplications No duplication information (2.10% Estimated after merge)

Project ID: confluent-kafka-javascript

View in SonarQube

Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some review comments, doing another pass

@@ -410,6 +422,13 @@ describe('Consumer seek >', () => {

consumer.run({
eachBatch: async ({ batch, isStale, resolveOffset }) => {
if (offsetsConsumed.length == 0 &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ===

@@ -4,6 +4,7 @@ const {
secureRandom,
createTopic,
createAdmin,
sleep,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove as it is unused

* Promise that resolved when there are available partitions to take.
*/
async availablePartitions() {
await this.#availablePartitionsPromise;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can just return instead of await for identical behaviour

@emasab emasab requested a review from milindl May 13, 2025 14:11
Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix! I checked additionally to make sure we're not leaking promises etc.

@emasab emasab merged commit af20c7c into master May 16, 2025
1 of 2 checks passed
@emasab emasab deleted the dev_fix_race_condition_high_cpu_usage branch May 16, 2025 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

🔥 Critical Performance Issue: partitionsConsumedConcurrently > 1 Causes CPU Overload Without Consumer Lag
3 participants