Skip to content

🔥 Critical Performance Issue: partitionsConsumedConcurrently > 1 Causes CPU Overload Without Consumer Lag #195

Closed
@maksym-opanasenko-ft

Description

@maksym-opanasenko-ft

Environment Information

  • Reproduced on Mac (silicone) and confirmed in AWS ECS:
  • Node Version: v18.20.4
  • NPM Version: 10.7.0
  • Confluent Platform : 7.7.x, 7.5.x, 7.3.x; also confirmed with Apache Kafka 3.3.x
  • confluent-kafka-javascript version: ^0.5.2

Steps to Reproduce

  1. Launch Kafka in docker or in AWS MSK
  2. Create a topic with many partitions
  3. Instantiate a consumer using the kafkaJs constructor:
	const consumer = kafka.consumer({
		kafkaJS: {
			groupId: 'test-group',
		},
	});
  1. Run the consumer with partitionsConsumedConcurrently > 1
	await consumer.run({
		partitionsConsumedConcurrently: 2, // This causes the CPU load
		eachMessage: async ({ topic, partition, message }) => {
			console.log({
				topic,
				partition,
				offset: message.offset,
				value: message.value.toString(),
			});
		},
	});
  1. Don't write any messages to the topic or wait for the consumer to catch up with the lag
  2. Wait for at least 30 seconds and observe the CPU usage

confluent-kafka-javascript Configuration Settings

Here is the full minimal snippet we use to reproduce:

const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;

const topic = 'TestTopic';
const clientId = 'test-client-id';

const kafka = new Kafka({
	kafkaJS: {
		clientId: clientId,
		brokers: ['localhost:9092'],
		logLevel: 4, // optional, but gives a hint what overuses the CPU
	},
});

const consumerRun = async () => {
	const consumer = kafka.consumer({
		kafkaJS: {
			groupId: 'test-group',
		},
	});

	await consumer.connect();
	console.log('Consumer connected');

	await consumer.subscribe({ topic });
	console.log('Consumer subscribed to topic');

	await consumer.run({
		partitionsConsumedConcurrently: 2, // This causes the CPU load
		eachMessage: async ({ topic, partition, message }) => {
			console.log({
				topic,
				partition,
				offset: message.offset,
				value: message.value.toString(),
			});
		},
	});
};

consumerRun().catch((err) => {
	console.error('Error running producer or consumer:', err);
});

Additional context

When setting logLevel: 4 in the Kafka constructor and using partitionsConsumedConcurrently: 1, the following log message

{
  message: 'Attempting to fetch 1 messages to the message cache',
  name: 'test-client-id#consumer-1',
  fac: 'BINDING',
  timestamp: 1733298033731
}
```  is printed ~ once in a second, however, when setting `partitionsConsumedConcurrently: 2` the console is spammed with the message. 
Setting `logLevel: 3` makes it better as IO is used less, but the code that prints the message is still running causing high usage of CPU

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions