-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8518][HUDI-8741] Fix RLI and Secondary index with custom payload or merge mode #12525
base: master
Are you sure you want to change the base?
Conversation
// For log records, we only need to process deletes. However, deletes may or may not be part of delete blocks (delete using custom merge mode). | ||
// So, we need to process the log files to get the record keys that are deleted. We can then generate RLI records for those keys. | ||
// 1. Get all merged record keys - any custom merger which handles delete outside delete block should not be present in merged keys. | ||
// 2. Get all un-merged record keys - this will contain all valid and deleted keys, irrespective of delete block or merge mode. | ||
// 3. Get deleted record keys - this will be the difference of merged and un-merged keys. | ||
Set<String> mergedRecordKeys = getRecordKeys(logFilePaths, dataTableMetaClient, | ||
finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, true, engineType); | ||
Set<String> unMergedRecordKeys = getRecordKeys(logFilePaths, dataTableMetaClient, | ||
finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, false, engineType); | ||
Set<String> deletedRecordKeys = new HashSet<>(unMergedRecordKeys); | ||
deletedRecordKeys.removeAll(mergedRecordKeys); | ||
return deletedRecordKeys.stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the logic to process deletes across all log files irrespective of merge mode. This is going to be a bit costly because we're doing both merged and unmerged log records scanning to figure out the deletes.
finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, true, engineType); | ||
Set<String> unMergedRecordKeys = getRecordKeys(logFilePaths, dataTableMetaClient, | ||
finalWriterSchemaOpt, maxBufferSize, instantTime, true, true, false, engineType); | ||
Set<String> deletedRecordKeys = new HashSet<>(unMergedRecordKeys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this also give us deleted records from a previous log file even if not touched in this commit of interest?
Change Logs
Impact
Index correctness with custom mergers.
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist