Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Produce reordering protection #852

Open
YellowCataclysm opened this issue Nov 1, 2024 · 3 comments
Open

Question: Produce reordering protection #852

YellowCataclysm opened this issue Nov 1, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@YellowCataclysm
Copy link

YellowCataclysm commented Nov 1, 2024

Hello and thank you for your great work!

Lets say we have a following scenario

  • one background process continuously producing records with keys
  • at some point we get some kind of failure while sending batch(all retry attempts exceed for example) to a specific partition
  • as far as i understand, the buffer for failed partition will be discarded to maintain ordering
  • after that the promises for failed records(and all records discarded from buffer) will be called in separate goroutine

So now is the moment to make decision what to do with failed message. If a message has key the producing code expects that records will be sent to kafka in order it appeared.
But if a promise-invoking goroutine will be scheduled with some significant delay, the producing thread can go ahead producing after partition buffer was discarded and before the promises with error were called to stop producing at all(for example).
So if failed/discarded messages and newly pushed messages has the same keys the ordering could be broken.

The same problem may occur if i will try to send 2 messages with the same key but the first one will be too big - in such case the promise will be scheduled and the second message could be produced before the promise for the first one will fire.

I thought there is some option/callback that will stop producing/buffering reliably to prevent reordering but didnt find one

Am i wrong and there is some way to handle such cases? Or maybe such scenarios just not possible at all?
I really think im missing something and will appreciate your help!

@twmb
Copy link
Owner

twmb commented Nov 4, 2024

I think the summary is:

  • You're producing
  • Record fails, causing all buffered records on the partition to fail
  • You produce new (later) record after all buffered records are failed while you are producing where you could successfully produce a later record, causing things to be out of order

This is not preventable currently.

To make it preventable, a new feature would need to be added where, one a record on a partition fails, the partition is set to a failed state until you manually recover it. You would then also need to take a lot of care instrumenting your application to:

  • Stall your producer pipeline on any record failure
  • Ensure that you are not producing anymore (you have to count your own inflight before producing; you cannot rely on the library internal inflight count)
  • Handle all record promises
  • Wait for your own inflight count to reach zero
  • call the API to un-fail the partition
  • Finally, resume producing

This would require a new option and a new client method:

func FailPartitionOnProduceFailure() ProducerOpt

func (*Client) RecoverFailedProducePartitions()

(perhaps better names are possible). Is this what you'd like? (and perhaps, if you want to dig in / implement, let me know)

@twmb twmb added the enhancement New feature or request label Nov 4, 2024
@YellowCataclysm
Copy link
Author

YellowCataclysm commented Nov 6, 2024

Thank your for your reply!
I will dig in and try to implement this.

Your suggestion sounds reasonable. I think i see three important points here:

  1. Using just options like you suggested will leave the user in ignorance about the actual cause of failure. User can only guess the reason of partition stop and assume which one was actually stopped.
  2. The consequence of the previous point is if user has no information about actual message that caused failure he cannot do any reliable handling. As an example, user may decide to:
    • just skip failed message if partitioner does not requires consistency for this message
    • filter out input messages which contains key equal to the failed one and/or send them somewhere else(deferred/failed queue maybe or whatever)
    • stop producing completely and cry
  3. In case of "pre-buffering" errors(like MessageTooLarge/NoTopic/NotInTransation), currently we could fail even before picking a partition so there is no chance to "pause" producing for specific one.

Taking it into account i think the solution could be:

  1. Move code that clears sink buffers on non-retriable errors into a separate function
  2. Add an callback-option(or "sync-promise-call" option) to catch message that causes partition fail and call it synchronously without clearing buffers in resp-handling goroutine(if it was set of course). This will give a user option to just ignore message failure if he wants to and continue producing.
  3. Give the user at interface to:
    • pause producing on a specific partition (that clears sink buffers as well with ErrAborted error or something else)
    • resume producing on a specific partition

Also i have a question about handling "pre-buffering" errors - why you decided to not return error from Produce()? Currently they also can lead to reordering i guess.
Looks like such errors could be handled effectively right in place. The other option is to use callback from p.2 and finish promise synchronously

@twmb
Copy link
Owner

twmb commented Nov 11, 2024

I'm not sure how (2) addresses the reordering protection problem brought up above. I thought the problem is:

  • You produced A and B, and are producing C
  • A fails, failing B
  • You produce C before A and B have failed
  • Now C is at the front, rather than A and B

My response is about how it is possible to instrument code surrounding kgo library calls such that you can re-produce A and B before C, however it is a hassle.

The sync callback option allows you to skip A while preserving B, such that B then C ordering is preserved, even though A is failed. This is starting to sound like you're trying to have a dead letter queue for A, while ordering is preserved for all non-failing records (rather than reproducing A, assuming something went wrong and we can make it successful the second time). Is that your goal here?

Note that A, B, and C are actually batches rather than individual records. Entire batches fail at a time.

(3) sounds like my proposal above.

As far as the final question, Produce originally did return an error. I found it more awkward to use: you had to have nearly/exactly the same logic to handle produce failures at the Produce callsite that you used in your promise. It forced you to handle the same thing in two locations, rather than unifying into always handling record failures in the promise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants