Closed
Description
Environment Information
- Reproduced on Mac (silicone) and confirmed in AWS ECS:
- Node Version: v18.20.4
- NPM Version: 10.7.0
- Confluent Platform :
7.7.x
,7.5.x
,7.3.x
; also confirmed withApache Kafka 3.3.x
- confluent-kafka-javascript version:
^0.5.2
Steps to Reproduce
- Launch Kafka in docker or in AWS MSK
- Create a topic with many partitions
- Instantiate a consumer using the
kafkaJs
constructor:
const consumer = kafka.consumer({
kafkaJS: {
groupId: 'test-group',
},
});
- Run the consumer with
partitionsConsumedConcurrently > 1
await consumer.run({
partitionsConsumedConcurrently: 2, // This causes the CPU load
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
- Don't write any messages to the topic or wait for the consumer to catch up with the lag
- Wait for at least 30 seconds and observe the CPU usage
confluent-kafka-javascript Configuration Settings
Here is the full minimal snippet we use to reproduce:
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const topic = 'TestTopic';
const clientId = 'test-client-id';
const kafka = new Kafka({
kafkaJS: {
clientId: clientId,
brokers: ['localhost:9092'],
logLevel: 4, // optional, but gives a hint what overuses the CPU
},
});
const consumerRun = async () => {
const consumer = kafka.consumer({
kafkaJS: {
groupId: 'test-group',
},
});
await consumer.connect();
console.log('Consumer connected');
await consumer.subscribe({ topic });
console.log('Consumer subscribed to topic');
await consumer.run({
partitionsConsumedConcurrently: 2, // This causes the CPU load
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
};
consumerRun().catch((err) => {
console.error('Error running producer or consumer:', err);
});
Additional context
When setting logLevel: 4
in the Kafka constructor and using partitionsConsumedConcurrently: 1
, the following log message
{
message: 'Attempting to fetch 1 messages to the message cache',
name: 'test-client-id#consumer-1',
fac: 'BINDING',
timestamp: 1733298033731
}
``` is printed ~ once in a second, however, when setting `partitionsConsumedConcurrently: 2` the console is spammed with the message.
Setting `logLevel: 3` makes it better as IO is used less, but the code that prints the message is still running causing high usage of CPU