-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QUERY] Managing event consumption from EventHub in batch mode with multiple consumers #47654
Comments
Thank you for your feedback. Tagging and routing to the team member best able to assist. |
@davidedg87: Thanks for reaching out and we regret that you're experiencing difficulties. Event Hubs has an at-least-once delivery guarantee; your application must be tolerant of duplicates and idempotent in its processing. General informationWhen you "kill" a processor - whether by graceful stop or terminating the process - any partitions that it had owned will be claimed by other processors in the group and restart processing from the last recorded checkpoint. When a partition moves between processors, there is a potential for 1-2 batches of overlap in which the new owner has taken control, but the old owner is not yet aware and has a batch held in memory that it is being dispatched for processing. When old owner's load balancing loop runs (every 30 seconds by default) or it attempts to read the next batch from the partition, it recognizes the new owner and updates its state to reflect the change. During this period of overlap, there will be two processor instances processing data from the same partition. Either the old or new owner may write a checkpoint during this time. Checkpointing guidance
Offsets are opaque data, and they do not change in a predictable pattern from one event to another. You cannot safely reason about the highest offset being later in the stream than lower. It is quite possible that the opposite is true. Sequence numbers, on the other hand, do change in a predictable pattern and are safe to infer a relationship between. Snippet analysis
The Event Hubs clients prioritize data integrity above all else; they will always resend events unless your application has explicitly created a checkpoint. There is no client scenario where data loss takes place. Unless your application is creating checkpoints for events that it has not yet processed or has not fully processed, the stream will always rewind for ownership changes or recovery, and you would see duplicates. In the snippet that you shared, your error handling seems problematic when processing events. On L119, you're catching all exceptions as you process the payload of an event. If any exception takes place, you ignore it and move on. You don't account for this later in the logic and will create a checkpoint for the batch (L139) and cache entry for every event in the batch - including those with errors. (L148-152) As a result, you will skip any events that triggered an exception and would not see an entry in your database for them. Likewise, when an event does not have a payload, you'll explicitly skip over that event (L106) and write the checkpoint and cache entry. Those will also not appear in your database. I'm going to assume that your publisher is explicitly setting the Next stepsThere's not much else that I can offer with the available context. If you'd like to collect a +/- 5-minute slice of Azure SDK logs around the behavior that you're asking about, we'd be happy to take a look and offer thoughts. You'll want to capture logs at the "Verbose" level and filter to the ids discussed in this section of the Troubleshooting Guide. Discussion of capturing logs can be found in this sample and in the article Logging with the Azure SDK for .NET. I'm going to mark this as addressed. If you'd like us to take a look at a logs, please unresolve once logs are available. |
Hi @davidedg87. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation. |
/unresolve |
Hi @jsquire first of all thank you for time dedicated to my issue. |
Hi @davidedg87. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue. |
Hi @davidedg87, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you! |
Hi @jsquire. Attached, you’ll find a ZIP file containing the logs collected over approximately 7 minutes at the verbose level, filtered for the exceptions you indicated. During the test, I used a maximum parallelism of 4 consumers for 4 partitions, and I repeatedly killed running consumers to restart them. You’ll find several .txt files because each time a consumer restarted, a new file was generated. In this version, I corrected the handling of the event search for checkpointing, ensuring it selects the one with the maximum sequenceNumber instead of the maximum offset. Could you take a look? Thanks! |
Library name and version
Azure.Messaging.EventHubs.Processor 5.11.5
Query/Question
In my usage scenario, I need to consume events from an EventHub with 4 partitions.
The goal is to process the events in batch mode to optimize the underlying business logic by aggregating queries to handle 'batches' of events rather than individual events.
To achieve this, I created a CustomEventProcessor by extending the EventProcessor class because the EventProcessorClient does not allow processing multiple events at once.
The issues I am currently facing are as follows:
When I kill a consumer and restart it, there is a moment when it seems that multiple consumers read the same data from the same partition. This forces me to perform a check to verify whether the event has already been processed or not.
[Currently, this check is done in the database by verifying a unique property of the message.] Is this approach correct, or am I managing it incorrectly?
When the situation described in point 1 occurs, I skip that event and process only the events that have not been processed. At the end, I update the checkpoint using UpdateCheckpointAsync for the event [not skipped] with the highest offset.
This approach seems to work, but occasionally, I have noticed that some events are 'lost,' as if a checkpoint for a subsequent offset was performed, causing some events to not be correctly processed.
This is the code of Consumer
EventBatchProcessor.txt
The goal is to create a robust consumer that can scale with multiple replicas of the consumers (always a maximum equal to the number of partitions) and to correctly manage checkpoints and duplicate control.
Thanks in advance for your help.
Environment
No response
The text was updated successfully, but these errors were encountered: