fix(flink): enable batch read it for flink source v2#18325
fix(flink): enable batch read it for flink source v2#18325HuangZhenQiu wants to merge 2 commits intoapache:masterfrom
Conversation
b21af66 to
666586b
Compare
| // Pre-populate finishedSplits with splitId so that FetchTask calls splitFinishedCallback | ||
| // immediately after enqueueing the batch. This removes the split from | ||
| // SplitFetcher.assignedSplits, causing the fetcher to idle and invoke | ||
| // elementsQueue.notifyAvailable(), which is required to drive the END_OF_INPUT signal |
There was a problem hiding this comment.
which is required to drive the END_OF_INPUT signal
// in SourceReaderBase for bounded (batch) reads.
The original logic seems more reasonabe? The END_OF_INPUT should be signaled after all the records in the split are handled(or iterate over)?
There was a problem hiding this comment.
I agree. The better way is to have a fake batch reader, and return the an empty batch records with finished split id inside.
There was a problem hiding this comment.
cc @cshuo did you know the standard way to handle the bounded source end up for source v2?
There was a problem hiding this comment.
The cause of the bug is that currently the SplitFetcher only deletes splits when the fetch result of a split reader is a RecordsWithSplitIds with empty records and a set of finished splits (to be deleted).
Actually, there is a callback onSplitFinished in HoodieSourceReader, where we can delete the finished splits explicitly.
@Override
protected void onSplitFinished(Map<String, HoodieSourceSplit> finishedSplitIds) {
requestSplit(new ArrayList<>(finishedSplitIds.keySet()));
// delete the finished splits.
splitFetcherManager.removeSplits(new ArrayList<>(finishedSplitIds.values()));
}
Note: there are some other related changes:
- Bug:
HoodieSourceSplit#splitId()is not idempotent. The id should not contain mutable fieldsconsumedandfileOffset. - Handle
SplitsRemovalevent inHoodieSourceSplitReader#handleSplitsChanges, maybe just a log.
d835741 to
8afc107
Compare
8afc107 to
50937ad
Compare
b63cb4d to
ad2b01e
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18325 +/- ##
============================================
- Coverage 68.46% 66.27% -2.20%
+ Complexity 27472 26733 -739
============================================
Files 2427 2428 +1
Lines 132655 132681 +26
Branches 15994 15994
============================================
- Hits 90820 87932 -2888
- Misses 34784 37811 +3027
+ Partials 7051 6938 -113
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Fix the bug in Flink Hudi source v2 in batch mode
Summary and Changelog
Impact
none
Risk Level
none
Documentation Update
none
Contributor's checklist