Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more robust commit strategy to Kafka.Consumer #149

Merged
merged 8 commits into from
Feb 29, 2024
Merged

Conversation

pbrisbin
Copy link
Member

@pbrisbin pbrisbin commented Feb 29, 2024

Autocommit is nice and simple, but it can cause both dropped messages
and duplicate processing.

Concretely, we found that deployment of a new consumer will start and,
if the existing consumer has not (auto)committed in a bit, reprocess a
backlog of messages that it doesn't need to. This results in an increase
in lag and alerts on every deployment.

Of the various strategies available, we're implementing this one1:

  • When a message is processed, commit its offset asynchronously

    This is fast enough to do on every loop, but of course trades on
    reliability and may not succeed every time -- particularly if we're
    crashing or shutting down. So...

  • On shutdown, commit the offset of the last-processed message (for
    every partition) synchronously

    This will be robust and cover for any failures we had from the
    asynchronous commits.

    This required we track the last message we processed on each commit as
    we go. We could've used the "commit for all messages of the last poll"
    function instead, but this would risk dropping messages if we've not
    yet processed them. We seem to only poll for 1 message at a time, so
    this risk is low, but I suspect we'll want to increase that and gain
    free performance, so it's good to be ready.

    I attempted to do the tracking with StateT, but it lacks instances
    like MonadUnliftIO, so couldn't. Instead, I went with an IORef. It
    is encapsulated, so we could move to something else (e.g. TVar or
    Chan) without disrupting the calling code at all, provided it only
    requires MonadIO.

Footnotes

  1. https://medium.com/@rramiz.rraza/kafka-programming-different-ways-to-commit-offsets-7bcd179b225a#ac4e

Autocommit is nice and simple, but it can cause both dropped messages
and duplicate processing.

Concretely, we found that deployment of a new consumer will start and,
if the existing consumer has not (auto)committed in a bit, reprocess a
backlog of messages that it doesn't need to. This results in an increase
in lag and alerts on every deployment.

Of the various strategies available, we're implementing this one[^1]:

- When a message is processed, commit its offset asynchronously

  This is fast enough to do on every loop, but of course trades on
  reliability and may not succeed every time -- particularly if we're
  crashing or shutting down. So...

- On shutdown, commit the offset of the last-processed message (for
  every partition) synchronously

  This will be robust and cover for any failures we had from the
  asynchronous commits.

  This required we track the last message we processed on each commit as
  we go. We could've used the "commit for all messages of the last poll"
  function instead, but this would risk dropping messages if we've not
  yet processed them. We seem to only poll for 1 message at a time, so
  this risk is low, but I suspect we'll want to increase that and gain
  free performance, so it's good to be ready.

  I attempted to do the tracking with `StateT`, but it lacks instances
  like `MonadUnliftIO`, so couldn't. Instead, I went with an `IORef`. It
  is encapsulated, so we could move to something else (e.g. `TVar` or
  `Chan`) without disrupting the calling code at all, provided it only
  requires `MonadIO`.

[^1]: https://medium.com/@rramiz.rraza/kafka-programming-different-ways-to-commit-offsets-7bcd179b225a#ac4e
We handle commits intelligently now and can be opinionated about how it
works.
@pbrisbin pbrisbin changed the title pb/offsets Add more robust commit strategy to Kafka.Consumer Feb 29, 2024
@pbrisbin pbrisbin requested a review from z0isch February 29, 2024 20:16
@pbrisbin pbrisbin marked this pull request as ready for review February 29, 2024 20:17
It's all we need, and converting it on insert instead of read means we
don't have to build the `ConsumerRecord () ()` in the meantime.
By disabling `auto.offset.store` the `commitAllOffsets` function will
only commit the offsets we explictly store (vs the max of the last
poll). This means we can then call `storeOffsetMessage` after processing
and be sure we don't commit things we haven't processed.

Basically, it's the same (desired) semantics, without the manual
tracking. Win.
Copy link
Contributor

@z0isch z0isch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@pbrisbin pbrisbin merged commit ccee6ea into main Feb 29, 2024
6 of 7 checks passed
@pbrisbin pbrisbin deleted the pb/offsets branch February 29, 2024 22:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants