Skip to content

fix: Apply timeout correctly to consumeNum #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

andrewhessler
Copy link

@andrewhessler andrewhessler commented Jun 8, 2025

What

Applies timeout to the entire batch instead of on a per message basis.

How

The calls to the individual consume share the batch timeout and each call to consume can only use the timeout that the batch has remaining.

Why

The existing behavior is described in detail in this issue.

In short, for d millisecond delay, c count, and b blocking time: b = d * c, given a constant topic rpm 60000 / d.

Given any rpm > 60000 / d: b = c * (60 / rpm) seconds

For example, a 1000ms delay with a batch count of 100 will cause the consumeNum loop to block for up to 100 seconds given a constant topic rpm of 60.

References

Issue: #262
PR Introduced: Blizzard/node-rdkafka#34

PRs in node-rdkafka addressing the same issue:
Blizzard/node-rdkafka#1061
Blizzard/node-rdkafka#1053

Test & Review

With c = 100, d = 1000, rpm = 700, expectation is that pre-change we'd block for about 8.6 seconds (it seems like it takes longer in the example, though) before returning the 100 messages.

My testing environment has a 50ms cooldown on calling consume, so after the change we expect a batch of messages to be returned every 1050ms.

Pre-Change
confluent-node-rdkafka-pre2

Post-Change
confluent-node-rdkafka-post2

@andrewhessler andrewhessler marked this pull request as ready for review June 8, 2025 17:36
@Copilot Copilot AI review requested due to automatic review settings June 8, 2025 17:36
@andrewhessler andrewhessler requested review from a team as code owners June 8, 2025 17:36
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR updates consumeNum to apply the overall batch timeout rather than resetting per message, ensuring the loop only uses the remaining batch time for each Consume call.

  • Introduce steady_clock start timestamp and calculate remaining timeout each iteration
  • Guard minimum timeout to allow non-blocking/fast-exit scenarios
  • Add <chrono> include for timing functions
Comments suppressed due to low confidence (2)

src/workers.cc:821

  • [nitpick] Clarify what ‘early exits’ means and why the threshold is 1 ms. For example: // Skip recalculation for timeout ≤ 1 ms to allow immediate non-blocking or minimal-delay consumes.
// Allow timeout_ms = 1 early exits to work

src/workers.cc:824

  • [nitpick] Rename variable now to current_time or now_timepoint to make its role more explicit.
auto now = std::chrono::steady_clock::now();

@andrewhessler andrewhessler changed the title Apply timeout correctly to consumeNum fix: Apply timeout correctly to consumeNum Jun 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant