-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add more robust commit strategy to Kafka.Consumer #149
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Autocommit is nice and simple, but it can cause both dropped messages and duplicate processing. Concretely, we found that deployment of a new consumer will start and, if the existing consumer has not (auto)committed in a bit, reprocess a backlog of messages that it doesn't need to. This results in an increase in lag and alerts on every deployment. Of the various strategies available, we're implementing this one[^1]: - When a message is processed, commit its offset asynchronously This is fast enough to do on every loop, but of course trades on reliability and may not succeed every time -- particularly if we're crashing or shutting down. So... - On shutdown, commit the offset of the last-processed message (for every partition) synchronously This will be robust and cover for any failures we had from the asynchronous commits. This required we track the last message we processed on each commit as we go. We could've used the "commit for all messages of the last poll" function instead, but this would risk dropping messages if we've not yet processed them. We seem to only poll for 1 message at a time, so this risk is low, but I suspect we'll want to increase that and gain free performance, so it's good to be ready. I attempted to do the tracking with `StateT`, but it lacks instances like `MonadUnliftIO`, so couldn't. Instead, I went with an `IORef`. It is encapsulated, so we could move to something else (e.g. `TVar` or `Chan`) without disrupting the calling code at all, provided it only requires `MonadIO`. [^1]: https://medium.com/@rramiz.rraza/kafka-programming-different-ways-to-commit-offsets-7bcd179b225a#ac4e
We handle commits intelligently now and can be opinionated about how it works.
It's all we need, and converting it on insert instead of read means we don't have to build the `ConsumerRecord () ()` in the meantime.
z0isch
reviewed
Feb 29, 2024
By disabling `auto.offset.store` the `commitAllOffsets` function will only commit the offsets we explictly store (vs the max of the last poll). This means we can then call `storeOffsetMessage` after processing and be sure we don't commit things we haven't processed. Basically, it's the same (desired) semantics, without the manual tracking. Win.
z0isch
approved these changes
Feb 29, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Autocommit is nice and simple, but it can cause both dropped messages
and duplicate processing.
Concretely, we found that deployment of a new consumer will start and,
if the existing consumer has not (auto)committed in a bit, reprocess a
backlog of messages that it doesn't need to. This results in an increase
in lag and alerts on every deployment.
Of the various strategies available, we're implementing this one1:
When a message is processed, commit its offset asynchronously
This is fast enough to do on every loop, but of course trades on
reliability and may not succeed every time -- particularly if we're
crashing or shutting down. So...
On shutdown, commit the offset of the last-processed message (for
every partition) synchronously
This will be robust and cover for any failures we had from the
asynchronous commits.
This required we track the last message we processed on each commit as
we go. We could've used the "commit for all messages of the last poll"
function instead, but this would risk dropping messages if we've not
yet processed them. We seem to only poll for 1 message at a time, so
this risk is low, but I suspect we'll want to increase that and gain
free performance, so it's good to be ready.
I attempted to do the tracking with
StateT
, but it lacks instanceslike
MonadUnliftIO
, so couldn't. Instead, I went with anIORef
. Itis encapsulated, so we could move to something else (e.g.
TVar
orChan
) without disrupting the calling code at all, provided it onlyrequires
MonadIO
.Footnotes
https://medium.com/@rramiz.rraza/kafka-programming-different-ways-to-commit-offsets-7bcd179b225a#ac4e ↩