Description
Description
It appears that the consume
method in Kafka.KafkaConsumer
does not adhere to the specified setDefaultConsumeTimeout
. Instead, it waits for the full batch to be filled before triggering the callback. This behaviour causes delays when consuming from low-traffic topics, as the consumer remains idle until the batch size is satisfied.
Expected Behaviour
The consume
method should respect the setDefaultConsumeTimeout
and return the available messages within the given time frame, even if the batch is not fully populated.
Actual Behaviour
The consumer only returns after the batch size (1000
in this case) is completely filled, ignoring the timeout.
Code to Reproduce
const Kafka = require("@confluentinc/kafka-javascript");
var consumer = new Kafka.KafkaConsumer({
"metadata.broker.list": "localhost:19092",
"group.id": "confluent-kafka-javascript-consumer-flow-example",
});
const topicName = "slowtopic";
const batch = 1000;
consumer.setDefaultConsumeTimeout(100);
consumer.on("ready", function () {
console.log("Consumer ready.");
consumer.subscribe([topicName]);
consume();
});
function consume() {
if (!consumer.isConnected()) return;
console.time("Consume batch timing");
consumer.consume(batch, (err, messages) => {
if (err) {
console.error(err);
return;
}
console.log(`Received ${messages.length} messages`);
console.timeEnd("Consume batch timing");
consume();
});
}
consumer.connect();
Received 1000 messages
Consume batch timing: 10.068s
Received 1000 messages
Consume batch timing: 10.083s
Received 1000 messages
Consume batch timing: 10.074s
Environment
- Node.js Version:
v22.4.0
- confluent-kafka-javascript version :
1.2.0
- OS:
Linux
Impact
This issue affects consumers in low-traffic environments, causing significant delays and inefficient processing during quiet periods.
Potential Workarounds
Currently, need to write code to dynamically adjust the batch size based on traffic.
Many thanks
Pat