diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java index 9f12b23b3a420..99e01e45aabf1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java @@ -18,10 +18,11 @@ package org.apache.hudi.source.reader; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.ClosableIterator; -import java.util.HashSet; +import java.util.Collections; import java.util.Set; import javax.annotation.Nullable; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -33,7 +34,7 @@ */ public class BatchRecords implements RecordsWithSplitIds> { private String splitId; - private String nextSprintId; + private String nextSplitId; private final ClosableIterator recordIterator; private final Set finishedSplits; private final HoodieRecordWithPosition recordAndPosition; @@ -53,7 +54,7 @@ public class BatchRecords implements RecordsWithSplitIds(); @@ -64,13 +65,13 @@ public class BatchRecords implements RecordsWithSplitIds nextRecordFromSplit() { position = position + 1; return recordAndPosition; } else { - finishedSplits.add(splitId); recordIterator.close(); return null; } @@ -117,8 +117,15 @@ public void seek(long startingRecordOffset) { public static BatchRecords forRecords( String splitId, ClosableIterator recordIterator, int fileOffset, long startingRecordOffset) { + return new BatchRecords<>(splitId, recordIterator, fileOffset, startingRecordOffset, Set.of()); + } - return new BatchRecords<>( - splitId, recordIterator, fileOffset, startingRecordOffset, new HashSet<>()); + public static RecordsWithSplitIds> lastBatchRecords(String splitId) { + // Pre-populate finishedSplits with splitId so that FetchTask calls splitFinishedCallback + // immediately after enqueueing the batch. This removes the split from + // SplitFetcher.assignedSplits, causing the fetcher to idle and invoke + // elementsQueue.notifyAvailable(), which is required to drive the END_OF_INPUT signal + // in SourceReaderBase for bounded (batch) reads. + return new RecordsBySplits<>(Collections.emptyMap(), Set.of(splitId)); } } \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java index 7a3d9435c2821..615709f1131bd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java @@ -24,6 +24,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.util.CloseableIterator; import org.apache.hudi.metrics.FlinkStreamReadMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,38 +44,46 @@ /** * The split reader of Hoodie source. * + *

Each call to {@link #fetch()} reads one split and returns it as a single + * {@link RecordsWithSplitIds} batch. Flink's {@code SourceReaderBase} is responsible for + * draining all records from the batch (via {@code nextRecordFromSplit()}) and marking + * the split finished (via {@code finishedSplits()}) before calling {@link #fetch()} again. + * * @param record type */ public class HoodieSourceSplitReader implements SplitReader, HoodieSourceSplit> { private static final Logger LOG = LoggerFactory.getLogger(HoodieSourceSplitReader.class); private final SerializableComparator splitComparator; - private final SplitReaderFunction readerFunction; private final Queue splits; - private final SourceReaderContext context; private final FlinkStreamReadMetrics readerMetrics; - - private HoodieSourceSplit currentSplit; + private final SplitReaderFunction readerFunction; + private transient HoodieSourceSplit currentSplit; + private transient CloseableIterator>> currentReader; public HoodieSourceSplitReader( String tableName, SourceReaderContext context, SplitReaderFunction readerFunction, SerializableComparator splitComparator) { - this.context = context; this.splitComparator = splitComparator; - this.readerFunction = readerFunction; this.splits = new ArrayDeque<>(); + this.readerFunction = readerFunction; this.readerMetrics = new FlinkStreamReadMetrics(context.metricGroup(), tableName); this.readerMetrics.registerMetrics(); } @Override public RecordsWithSplitIds> fetch() throws IOException { + // finish current split. + if (currentSplit != null) { + return finishSplit(); + } + HoodieSourceSplit nextSplit = splits.poll(); if (nextSplit != null) { currentSplit = nextSplit; - return readerFunction.read(currentSplit); + return readerFunction.read(nextSplit); } else { // return an empty result, which will lead to split fetch to be idle. // SplitFetcherManager will then close idle fetcher. @@ -105,15 +114,10 @@ public void wakeUp() { // Nothing to do } - @Override - public void close() throws Exception { - readerFunction.close(); - } - /** * SourceSplitReader only reads splits sequentially. When waiting for watermark alignment * the SourceOperator will stop processing and recycling the fetched batches. Based on this the - * `pauseOrResumeSplits` and the `wakeUp` are left empty. + * {@code pauseOrResumeSplits} and the {@code wakeUp} are left empty. * @param splitsToPause splits to pause * @param splitsToResume splits to resume */ @@ -122,4 +126,15 @@ public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { } + + @Override + public void close() throws Exception { + readerFunction.close(); + } + + private RecordsWithSplitIds> finishSplit() { + RecordsWithSplitIds> records = BatchRecords.lastBatchRecords(currentSplit.splitId()); + currentSplit = null; + return records; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java index 2ba5db8427fc3..6b81ba31bd97e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java @@ -18,13 +18,13 @@ package org.apache.hudi.source.split; +import lombok.ToString; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; -import lombok.ToString; import org.apache.flink.api.connector.source.SourceSplit; import javax.annotation.Nullable; @@ -95,7 +95,7 @@ public HoodieSourceSplit( @Override public String splitId() { - return toString(); + return String.join(":", String.valueOf(splitNum), fileId); } public void consume() { @@ -110,5 +110,4 @@ public void updatePosition(int newFileOffset, long newRecordOffset) { fileOffset = newFileOffset; consumed = newRecordOffset; } - } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java index 069d7c9aedb5c..69af5702f1f42 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestBatchRecords.java @@ -50,7 +50,6 @@ public void testForRecordsWithEmptyIterator() { assertNotNull(batchRecords); assertEquals(splitId, batchRecords.nextSplit()); assertNull(batchRecords.nextRecordFromSplit(), "Should have no records"); - assertTrue(batchRecords.finishedSplits().contains(splitId), "Should contain finished split"); assertNull(batchRecords.nextSplit(), "Second call to nextSplit should return null"); } @@ -138,17 +137,6 @@ public void testFileOffsetPersistence() { assertEquals(fileOffset, record2.fileOffset(), "File offset should remain constant"); } - @Test - public void testFinishedSplitsEmpty() { - String splitId = "test-split-6"; - List records = Arrays.asList("record1"); - ClosableIterator iterator = createClosableIterator(records); - - BatchRecords batchRecords = BatchRecords.forRecords(splitId, iterator, 0, 0L); - - assertTrue(batchRecords.finishedSplits().isEmpty(), "Should have empty finished splits by default"); - } - @Test public void testConstructorWithFinishedSplits() { String splitId = "test-split-7"; @@ -333,25 +321,6 @@ public void testIteratorClosedAfterExhaustion() { assertTrue(mockIterator.isClosed(), "Iterator should be closed after exhaustion"); } - @Test - public void testFinishedSplitsAddedAfterExhaustion() { - String splitId = "test-split-18"; - List records = Arrays.asList("record1"); - ClosableIterator iterator = createClosableIterator(records); - - BatchRecords batchRecords = BatchRecords.forRecords(splitId, iterator, 0, 0L); - batchRecords.nextSplit(); - - assertTrue(batchRecords.finishedSplits().isEmpty()); - - // Read all records - batchRecords.nextRecordFromSplit(); - - // After exhaustion, split should be added to finished splits - assertNull(batchRecords.nextRecordFromSplit()); - assertTrue(batchRecords.finishedSplits().contains(splitId)); - } - /** * Helper method to create a ClosableIterator from a list of items. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java index 09c60e6fca2e0..bb8bebdb44262 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/TestHoodieSourceSplitReader.java @@ -104,25 +104,12 @@ public void testFetchWithMultipleSplits() throws IOException { new SplitsAddition<>(Arrays.asList(split1, split2, split3)); reader.handleSplitsChanges(splitsChange); - // Fetch first split - RecordsWithSplitIds> result1 = reader.fetch(); - assertNotNull(result1); - assertEquals(split1.splitId(), result1.nextSplit()); - - // Fetch second split - RecordsWithSplitIds> result2 = reader.fetch(); - assertNotNull(result2); - assertEquals(split2.splitId(), result2.nextSplit()); - - // Fetch third split - RecordsWithSplitIds> result3 = reader.fetch(); - assertNotNull(result3); - assertEquals(split3.splitId(), result3.nextSplit()); - - // No more splits - RecordsWithSplitIds> result4 = reader.fetch(); - assertNotNull(result4); - assertNull(result4.nextSplit()); + // Each split produces a data batch followed by a finish-signal batch. + // fetchNextSplitId() skips finish-signal batches and returns the next split ID (or null). + assertEquals(split1.splitId(), fetchNextSplitId(reader)); + assertEquals(split2.splitId(), fetchNextSplitId(reader)); + assertEquals(split3.splitId(), fetchNextSplitId(reader)); + assertNull(fetchNextSplitId(reader)); } @Test @@ -147,9 +134,9 @@ public void testHandleSplitsChangesWithComparator() throws IOException { reader.handleSplitsChanges(splitsChange); // Should fetch in reverse order due to comparator - assertEquals(split3.splitId(), reader.fetch().nextSplit()); - assertEquals(split2.splitId(), reader.fetch().nextSplit()); - assertEquals(split1.splitId(), reader.fetch().nextSplit()); + assertEquals(split3.splitId(), fetchNextSplitId(reader)); + assertEquals(split2.splitId(), fetchNextSplitId(reader)); + assertEquals(split1.splitId(), fetchNextSplitId(reader)); } @Test @@ -170,10 +157,10 @@ public void testAddingSplitsInMultipleBatches() throws IOException { reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split2, split3))); // Verify all splits can be fetched - assertEquals(split1.splitId(), reader.fetch().nextSplit()); - assertEquals(split2.splitId(), reader.fetch().nextSplit()); - assertEquals(split3.splitId(), reader.fetch().nextSplit()); - assertNull(reader.fetch().nextSplit()); + assertEquals(split1.splitId(), fetchNextSplitId(reader)); + assertEquals(split2.splitId(), fetchNextSplitId(reader)); + assertEquals(split3.splitId(), fetchNextSplitId(reader)); + assertNull(fetchNextSplitId(reader)); } @Test @@ -238,6 +225,7 @@ public void testReaderFunctionCalledCorrectly() throws IOException { assertEquals(split, readerFunction.getLastReadSplit()); } + @Test public void testReaderFunctionClosedOnReaderClose() throws Exception { TestSplitReaderFunction readerFunction = new TestSplitReaderFunction(); HoodieSourceSplitReader reader = @@ -279,9 +267,9 @@ public void testSplitOrderPreservedWithoutComparator() throws IOException { reader.handleSplitsChanges(splitsChange); // Should fetch in insertion order: 3, 1, 2 - assertEquals(split3.splitId(), reader.fetch().nextSplit()); - assertEquals(split1.splitId(), reader.fetch().nextSplit()); - assertEquals(split2.splitId(), reader.fetch().nextSplit()); + assertEquals(split3.splitId(), fetchNextSplitId(reader)); + assertEquals(split1.splitId(), fetchNextSplitId(reader)); + assertEquals(split2.splitId(), fetchNextSplitId(reader)); } @Test @@ -302,6 +290,22 @@ public void testReaderIteratorClosedOnSplitFinish() throws IOException { assertEquals(split1, readerFunction.getLastReadSplit()); } + /** + * Fetches the next batch that contains actual split data, skipping split-finish signal batches. + * Split-finish batches have non-empty {@code finishedSplits()} but no records. + * Returns null when there are truly no more splits. + */ + private String fetchNextSplitId(HoodieSourceSplitReader reader) throws IOException { + while (true) { + RecordsWithSplitIds> result = reader.fetch(); + if (!result.finishedSplits().isEmpty()) { + // This is a split-finish signal batch; continue to get the next real batch. + continue; + } + return result.nextSplit(); + } + } + /** * Helper method to create a test HoodieSourceSplit. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java index 6355aa508f9a6..1cb244245ce82 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java @@ -259,10 +259,8 @@ public void testSplitId() { // splitId() returns toString() String splitId = split.splitId(); - String toString = split.toString(); - assertEquals(toString, splitId); - assertTrue(splitId.contains("HoodieSourceSplit")); + assertTrue(splitId.equals("1:file1")); } @Test @@ -274,12 +272,14 @@ public void testToString() { String result = split.toString(); assertTrue(result.contains("HoodieSourceSplit")); - assertTrue(result.contains("splitNum=1")); + assertTrue(result.contains("splitNum")); assertTrue(result.contains("basePath")); assertTrue(result.contains("logPaths")); assertTrue(result.contains("tablePath")); assertTrue(result.contains("partitionPath")); assertTrue(result.contains("mergeType")); + assertTrue(result.contains("consumed")); + assertTrue(result.contains("fileOffset")); } @Test diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 4a240490f7d4f..03fd014a9b781 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -296,14 +296,16 @@ void testStreamReadAppendData(HoodieTableType tableType) throws Exception { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED); } - @Test - void testStreamWriteBatchRead() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStreamWriteBatchRead(boolean useSourceV2) { // create filesystem table named source String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .options(getDefaultKeys()) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -354,13 +356,15 @@ void testStreamWriteBatchReadOptimized() throws Exception { } } - @Test - void testStreamWriteBatchReadOptimizedWithoutCompaction() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStreamWriteBatchReadOptimizedWithoutCompaction(boolean useSourceV2) { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .options(getDefaultKeys()) .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) .option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .end(); streamTableEnv.executeSql(hoodieTableDDL); final String insertInto = "insert into t1 values\n" @@ -372,8 +376,9 @@ void testStreamWriteBatchReadOptimizedWithoutCompaction() { assertTrue(rows.isEmpty()); } - @Test - void testStreamWriteReadSkippingCompaction() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStreamWriteReadSkippingCompaction(boolean useSourceV2) throws Exception { // create filesystem table named source String createSource = TestConfigurations.getFileSourceDDL("source", 4); streamTableEnv.executeSql(createSource); @@ -385,6 +390,7 @@ void testStreamWriteReadSkippingCompaction() throws Exception { .option(FlinkOptions.READ_AS_STREAMING, true) .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) .option(FlinkOptions.COMPACTION_TASKS, 1) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; @@ -399,8 +405,9 @@ void testStreamWriteReadSkippingCompaction() throws Exception { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); } - @Test - void testAppendWriteReadSkippingClustering() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testAppendWriteReadSkippingClustering(boolean useSourceV2) throws Exception { // create filesystem table named source String createSource = TestConfigurations.getFileSourceDDL("source", 4); streamTableEnv.executeSql(createSource); @@ -414,6 +421,7 @@ void testAppendWriteReadSkippingClustering() throws Exception { .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true) .option(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1) .option(FlinkOptions.CLUSTERING_TASKS, 1) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; @@ -553,8 +561,8 @@ void testStreamReadWithDeletes() throws Exception { } @ParameterizedTest - @MethodSource("tableTypeAndBooleanTrueFalseParams") - void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType, boolean mdtCompactionEnabled) throws Exception { + @MethodSource("tableTypeAndSourceV2AndBooleanTrueFalseParams") + void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType, boolean useSourceV2, boolean mdtCompactionEnabled) throws Exception { TableEnvironment tableEnv = batchTableEnv; String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) @@ -564,6 +572,7 @@ void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType, boolean mdt .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) .option(FlinkOptions.TABLE_TYPE, tableType.name()) .option(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, mdtCompactionEnabled ? 1 : 10) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .end(); tableEnv.executeSql(hoodieTableDDL); execInsertSql(tableEnv, TestSQL.INSERT_T1); @@ -590,8 +599,8 @@ void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType, boolean mdt } @ParameterizedTest - @MethodSource("tableTypeAndBooleanTrueFalseParams") - void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception { + @MethodSource("tableTypeAndSourceV2AndBooleanTrueFalseParams") + void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean useSourceV2, boolean hiveStylePartitioning) throws Exception { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .options(getDefaultKeys()) @@ -601,6 +610,7 @@ void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean hiveSt .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) .option(FlinkOptions.TABLE_TYPE, tableType) .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .end(); streamTableEnv.executeSql(hoodieTableDDL); Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); @@ -644,8 +654,8 @@ void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean hiveSt } @ParameterizedTest - @MethodSource("tableTypeAndBooleanTrueFalseParams") - void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception { + @MethodSource("tableTypeAndSourceV2AndBooleanTrueFalseParams") + void testStreamReadFilterByPartition(HoodieTableType tableType, boolean useSourceV2, boolean hiveStylePartitioning) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.set(FlinkOptions.TABLE_NAME, "t1"); conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid"); @@ -664,6 +674,7 @@ void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStyl .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2) .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, false) .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -675,8 +686,9 @@ void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStyl assertRowsEquals(result, expected, true); } - @Test - void testStreamReadMorTableWithCompactionPlan() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testStreamReadMorTableWithCompactionPlan(boolean useSourceV2) throws Exception { String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); @@ -691,6 +703,7 @@ void testStreamReadMorTableWithCompactionPlan() throws Exception { .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false) // generate compaction plan for each commit .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) .noPartition() .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -3192,6 +3205,23 @@ private static Stream tableTypeAndBooleanTrueFalseParams() { return Stream.of(data).map(Arguments::of); } + /** + * Return test params => (HoodieTableType, true/false, true/fase). + */ + private static Stream tableTypeAndSourceV2AndBooleanTrueFalseParams() { + Object[][] data = + new Object[][] { + {HoodieTableType.COPY_ON_WRITE, false, false}, + {HoodieTableType.COPY_ON_WRITE, true, true}, + {HoodieTableType.MERGE_ON_READ, false, false}, + {HoodieTableType.MERGE_ON_READ, true, true}, + {HoodieTableType.COPY_ON_WRITE, false, true}, + {HoodieTableType.COPY_ON_WRITE, true, false}, + {HoodieTableType.MERGE_ON_READ, false, true}, + {HoodieTableType.MERGE_ON_READ, true, false}}; + return Stream.of(data).map(Arguments::of); + } + public static List testBulkInsertWithPartitionBucketIndexParams() { return asList( Arguments.of("bulk_insert", COPY_ON_WRITE.name()),