From 9a13db3b3f4483cce947077b684cd0467812ccb4 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 8 Jun 2025 09:51:47 -0500 Subject: [PATCH 1/3] apply timeout to batch --- src/workers.cc | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/workers.cc b/src/workers.cc index 4655458d..0eab51b5 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -9,6 +9,7 @@ */ #include "src/workers.h" +#include #include #include @@ -811,10 +812,23 @@ KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} void KafkaConsumerConsumeNum::Execute() { std::size_t max = static_cast(m_num_messages); bool looping = true; - int timeout_ms = m_timeout_ms; std::size_t eof_event_count = 0; + auto start_time = std::chrono::steady_clock::now(); + int timeout_ms = m_timeout_ms; + while (m_messages.size() - eof_event_count < max && looping) { + // Allow timeout_ms = 1 early exits to work + if (timeout_ms > 1) { + // Calc next single consume timeout remaining for batch + auto now = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(now - start_time) + .count(); + // timeout_ms of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 + timeout_ms = std::max(1, m_timeout_ms - static_cast(elapsed)); + } + // Get a message Baton b = m_consumer->Consume(timeout_ms); if (b.err() == RdKafka::ERR_NO_ERROR) { From 7d6d811a22aa019bfc75d41884ba98f5e494efb2 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 8 Jun 2025 10:11:26 -0500 Subject: [PATCH 2/3] allow 0 timeout in calc --- src/workers.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/workers.cc b/src/workers.cc index 0eab51b5..38b4caf7 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -825,8 +825,9 @@ void KafkaConsumerConsumeNum::Execute() { auto elapsed = std::chrono::duration_cast(now - start_time) .count(); - // timeout_ms of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 - timeout_ms = std::max(1, m_timeout_ms - static_cast(elapsed)); + // `timeout_ms` of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 + // This still returns ERR_TIMED_OUT if no message available + timeout_ms = std::max(0, m_timeout_ms - static_cast(elapsed)); } // Get a message From adc61614a3e0a4ce8dc807017765e8e561e12ae6 Mon Sep 17 00:00:00 2001 From: Andrew Hessler Date: Sun, 8 Jun 2025 12:51:56 -0500 Subject: [PATCH 3/3] extract 1 into descriptive var --- src/workers.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/workers.cc b/src/workers.cc index 38b4caf7..8bffcf65 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -816,10 +816,12 @@ void KafkaConsumerConsumeNum::Execute() { auto start_time = std::chrono::steady_clock::now(); int timeout_ms = m_timeout_ms; + int early_exit_ms = 1; while (m_messages.size() - eof_event_count < max && looping) { - // Allow timeout_ms = 1 early exits to work - if (timeout_ms > 1) { + // Allow timeout_ms = early_exit_ms to take precedence + // timeout_ms > 1 + if (timeout_ms > early_exit_ms) { // Calc next single consume timeout remaining for batch auto now = std::chrono::steady_clock::now(); auto elapsed = @@ -840,7 +842,7 @@ void KafkaConsumerConsumeNum::Execute() { // If partition EOF and have consumed messages, retry with timeout 1 // This allows getting ready messages, while not waiting for new ones if (m_messages.size() > eof_event_count) { - timeout_ms = 1; + timeout_ms = early_exit_ms; } // We will only go into this code path when `enable.partition.eof` @@ -863,7 +865,7 @@ void KafkaConsumerConsumeNum::Execute() { // within the timeout but not wait if we already have one or more // messages. if (m_timeout_only_for_first_message) { - timeout_ms = 1; + timeout_ms = early_exit_ms; } break;