Skip to content

fix(flink): enable batch read it for flink source v2#18325

Open
HuangZhenQiu wants to merge 2 commits intoapache:masterfrom
HuangZhenQiu:flink-source-v2-it
Open

fix(flink): enable batch read it for flink source v2#18325
HuangZhenQiu wants to merge 2 commits intoapache:masterfrom
HuangZhenQiu:flink-source-v2-it

Conversation

@HuangZhenQiu
Copy link
Collaborator

Describe the issue this Pull Request addresses

Fix the bug in Flink Hudi source v2 in batch mode

Summary and Changelog

  1. Fix the bug in Flink Hudi source v2 in batch mode
  2. Update test cases to reflect the change

Impact

none

Risk Level

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Mar 15, 2026
// Pre-populate finishedSplits with splitId so that FetchTask calls splitFinishedCallback
// immediately after enqueueing the batch. This removes the split from
// SplitFetcher.assignedSplits, causing the fetcher to idle and invoke
// elementsQueue.notifyAvailable(), which is required to drive the END_OF_INPUT signal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is required to drive the END_OF_INPUT signal
// in SourceReaderBase for bounded (batch) reads.

The original logic seems more reasonabe? The END_OF_INPUT should be signaled after all the records in the split are handled(or iterate over)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. The better way is to have a fake batch reader, and return the an empty batch records with finished split id inside.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cshuo did you know the standard way to handle the bounded source end up for source v2?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cause of the bug is that currently the SplitFetcher only deletes splits when the fetch result of a split reader is a RecordsWithSplitIds with empty records and a set of finished splits (to be deleted).

Actually, there is a callback onSplitFinished in HoodieSourceReader, where we can delete the finished splits explicitly.

  @Override
  protected void onSplitFinished(Map<String, HoodieSourceSplit> finishedSplitIds) {
    requestSplit(new ArrayList<>(finishedSplitIds.keySet()));
    // delete the finished splits.
    splitFetcherManager.removeSplits(new ArrayList<>(finishedSplitIds.values()));
  }

Note: there are some other related changes:

  • Bug: HoodieSourceSplit#splitId() is not idempotent. The id should not contain mutable fields consumed and fileOffset.
  • Handle SplitsRemoval event in HoodieSourceSplitReader#handleSplitsChanges, maybe just a log.

@HuangZhenQiu HuangZhenQiu force-pushed the flink-source-v2-it branch 2 times, most recently from d835741 to 8afc107 Compare March 16, 2026 04:35
@github-actions github-actions bot added size:M PR with lines of changes in (100, 300] and removed size:S PR with lines of changes in (10, 100] labels Mar 16, 2026
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 75.60976% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 66.27%. Comparing base (967e456) to head (ad2b01e).

Files with missing lines Patch % Lines
...he/hudi/source/reader/HoodieSourceSplitReader.java 68.00% 5 Missing and 3 partials ⚠️
.../apache/hudi/source/reader/DefaultBatchReader.java 77.77% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18325      +/-   ##
============================================
- Coverage     68.46%   66.27%   -2.20%     
+ Complexity    27472    26733     -739     
============================================
  Files          2427     2428       +1     
  Lines        132655   132681      +26     
  Branches      15994    15994              
============================================
- Hits          90820    87932    -2888     
- Misses        34784    37811    +3027     
+ Partials       7051     6938     -113     
Flag Coverage Δ
common-and-other-modules 37.33% <75.60%> (-7.10%) ⬇️
hadoop-mr-java-client 45.06% <ø> (-0.01%) ⬇️
spark-client-hadoop-common 48.20% <ø> (-0.01%) ⬇️
spark-java-tests 48.82% <ø> (-0.01%) ⬇️
spark-scala-tests 44.95% <ø> (+<0.01%) ⬆️
utilities 38.63% <ø> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/hudi/source/reader/BatchRecords.java 97.14% <100.00%> (ø)
...rg/apache/hudi/source/split/HoodieSourceSplit.java 100.00% <100.00%> (ø)
.../apache/hudi/source/reader/DefaultBatchReader.java 77.77% <77.77%> (ø)
...he/hudi/source/reader/HoodieSourceSplitReader.java 76.08% <68.00%> (-13.92%) ⬇️

... and 119 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants