Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.hudi.source.reader;

import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;

import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand All @@ -33,7 +34,7 @@
*/
public class BatchRecords<T> implements RecordsWithSplitIds<HoodieRecordWithPosition<T>> {
private String splitId;
private String nextSprintId;
private String nextSplitId;
private final ClosableIterator<T> recordIterator;
private final Set<String> finishedSplits;
private final HoodieRecordWithPosition<T> recordAndPosition;
Expand All @@ -53,7 +54,7 @@ public class BatchRecords<T> implements RecordsWithSplitIds<HoodieRecordWithPosi
recordIterator != null, "recordIterator can be empty but not null");

this.splitId = splitId;
this.nextSprintId = splitId;
this.nextSplitId = splitId;
this.recordIterator = recordIterator;
this.finishedSplits = finishedSplits;
this.recordAndPosition = new HoodieRecordWithPosition<>();
Expand All @@ -64,13 +65,13 @@ public class BatchRecords<T> implements RecordsWithSplitIds<HoodieRecordWithPosi
@Nullable
@Override
public String nextSplit() {
if (splitId.equals(nextSprintId)) {
// set the nextSprintId to null to indicate no more splits
if (splitId.equals(nextSplitId)) {
// set the nextSplitId to null to indicate no more splits
// this class only contains record for one split
nextSprintId = null;
nextSplitId = null;
return splitId;
} else {
return nextSprintId;
return nextSplitId;
}
}

Expand All @@ -82,7 +83,6 @@ public HoodieRecordWithPosition<T> nextRecordFromSplit() {
position = position + 1;
return recordAndPosition;
} else {
finishedSplits.add(splitId);
recordIterator.close();
return null;
}
Expand Down Expand Up @@ -117,8 +117,15 @@ public void seek(long startingRecordOffset) {

public static <T> BatchRecords<T> forRecords(
String splitId, ClosableIterator<T> recordIterator, int fileOffset, long startingRecordOffset) {
return new BatchRecords<>(splitId, recordIterator, fileOffset, startingRecordOffset, Set.of());
}

return new BatchRecords<>(
splitId, recordIterator, fileOffset, startingRecordOffset, new HashSet<>());
public static <T> RecordsWithSplitIds<HoodieRecordWithPosition<T>> lastBatchRecords(String splitId) {
// Pre-populate finishedSplits with splitId so that FetchTask calls splitFinishedCallback
// immediately after enqueueing the batch. This removes the split from
// SplitFetcher.assignedSplits, causing the fetcher to idle and invoke
// elementsQueue.notifyAvailable(), which is required to drive the END_OF_INPUT signal
// in SourceReaderBase for bounded (batch) reads.
return new RecordsBySplits<>(Collections.emptyMap(), Set.of(splitId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.util.CloseableIterator;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,38 +44,46 @@
/**
* The split reader of Hoodie source.
*
* <p>Each call to {@link #fetch()} reads one split and returns it as a single
* {@link RecordsWithSplitIds} batch. Flink's {@code SourceReaderBase} is responsible for
* draining all records from the batch (via {@code nextRecordFromSplit()}) and marking
* the split finished (via {@code finishedSplits()}) before calling {@link #fetch()} again.
*
* @param <T> record type
*/
public class HoodieSourceSplitReader<T> implements SplitReader<HoodieRecordWithPosition<T>, HoodieSourceSplit> {
private static final Logger LOG = LoggerFactory.getLogger(HoodieSourceSplitReader.class);

private final SerializableComparator<HoodieSourceSplit> splitComparator;
private final SplitReaderFunction<T> readerFunction;
private final Queue<HoodieSourceSplit> splits;
private final SourceReaderContext context;
private final FlinkStreamReadMetrics readerMetrics;

private HoodieSourceSplit currentSplit;
private final SplitReaderFunction<T> readerFunction;
private transient HoodieSourceSplit currentSplit;
private transient CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> currentReader;

public HoodieSourceSplitReader(
String tableName,
SourceReaderContext context,
SplitReaderFunction<T> readerFunction,
SerializableComparator<HoodieSourceSplit> splitComparator) {
this.context = context;
this.splitComparator = splitComparator;
this.readerFunction = readerFunction;
this.splits = new ArrayDeque<>();
this.readerFunction = readerFunction;
this.readerMetrics = new FlinkStreamReadMetrics(context.metricGroup(), tableName);
this.readerMetrics.registerMetrics();
}

@Override
public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws IOException {
// finish current split.
if (currentSplit != null) {
return finishSplit();
}

HoodieSourceSplit nextSplit = splits.poll();
if (nextSplit != null) {
currentSplit = nextSplit;
return readerFunction.read(currentSplit);
return readerFunction.read(nextSplit);
} else {
// return an empty result, which will lead to split fetch to be idle.
// SplitFetcherManager will then close idle fetcher.
Expand Down Expand Up @@ -105,15 +114,10 @@ public void wakeUp() {
// Nothing to do
}

@Override
public void close() throws Exception {
readerFunction.close();
}

/**
* SourceSplitReader only reads splits sequentially. When waiting for watermark alignment
* the SourceOperator will stop processing and recycling the fetched batches. Based on this the
* `pauseOrResumeSplits` and the `wakeUp` are left empty.
* {@code pauseOrResumeSplits} and the {@code wakeUp} are left empty.
* @param splitsToPause splits to pause
* @param splitsToResume splits to resume
*/
Expand All @@ -122,4 +126,15 @@ public void pauseOrResumeSplits(
Collection<HoodieSourceSplit> splitsToPause,
Collection<HoodieSourceSplit> splitsToResume) {
}

@Override
public void close() throws Exception {
readerFunction.close();
}

private RecordsWithSplitIds<HoodieRecordWithPosition<T>> finishSplit() {
RecordsWithSplitIds<HoodieRecordWithPosition<T>> records = BatchRecords.lastBatchRecords(currentSplit.splitId());
currentSplit = null;
return records;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.hudi.source.split;

import lombok.ToString;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.flink.api.connector.source.SourceSplit;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -95,7 +95,7 @@ public HoodieSourceSplit(

@Override
public String splitId() {
return toString();
return String.join(":", String.valueOf(splitNum), fileId);
}

public void consume() {
Expand All @@ -110,5 +110,4 @@ public void updatePosition(int newFileOffset, long newRecordOffset) {
fileOffset = newFileOffset;
consumed = newRecordOffset;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public void testForRecordsWithEmptyIterator() {
assertNotNull(batchRecords);
assertEquals(splitId, batchRecords.nextSplit());
assertNull(batchRecords.nextRecordFromSplit(), "Should have no records");
assertTrue(batchRecords.finishedSplits().contains(splitId), "Should contain finished split");
assertNull(batchRecords.nextSplit(), "Second call to nextSplit should return null");
}

Expand Down Expand Up @@ -138,17 +137,6 @@ public void testFileOffsetPersistence() {
assertEquals(fileOffset, record2.fileOffset(), "File offset should remain constant");
}

@Test
public void testFinishedSplitsEmpty() {
String splitId = "test-split-6";
List<String> records = Arrays.asList("record1");
ClosableIterator<String> iterator = createClosableIterator(records);

BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, iterator, 0, 0L);

assertTrue(batchRecords.finishedSplits().isEmpty(), "Should have empty finished splits by default");
}

@Test
public void testConstructorWithFinishedSplits() {
String splitId = "test-split-7";
Expand Down Expand Up @@ -333,25 +321,6 @@ public void testIteratorClosedAfterExhaustion() {
assertTrue(mockIterator.isClosed(), "Iterator should be closed after exhaustion");
}

@Test
public void testFinishedSplitsAddedAfterExhaustion() {
String splitId = "test-split-18";
List<String> records = Arrays.asList("record1");
ClosableIterator<String> iterator = createClosableIterator(records);

BatchRecords<String> batchRecords = BatchRecords.forRecords(splitId, iterator, 0, 0L);
batchRecords.nextSplit();

assertTrue(batchRecords.finishedSplits().isEmpty());

// Read all records
batchRecords.nextRecordFromSplit();

// After exhaustion, split should be added to finished splits
assertNull(batchRecords.nextRecordFromSplit());
assertTrue(batchRecords.finishedSplits().contains(splitId));
}

/**
* Helper method to create a ClosableIterator from a list of items.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,12 @@ public void testFetchWithMultipleSplits() throws IOException {
new SplitsAddition<>(Arrays.asList(split1, split2, split3));
reader.handleSplitsChanges(splitsChange);

// Fetch first split
RecordsWithSplitIds<HoodieRecordWithPosition<String>> result1 = reader.fetch();
assertNotNull(result1);
assertEquals(split1.splitId(), result1.nextSplit());

// Fetch second split
RecordsWithSplitIds<HoodieRecordWithPosition<String>> result2 = reader.fetch();
assertNotNull(result2);
assertEquals(split2.splitId(), result2.nextSplit());

// Fetch third split
RecordsWithSplitIds<HoodieRecordWithPosition<String>> result3 = reader.fetch();
assertNotNull(result3);
assertEquals(split3.splitId(), result3.nextSplit());

// No more splits
RecordsWithSplitIds<HoodieRecordWithPosition<String>> result4 = reader.fetch();
assertNotNull(result4);
assertNull(result4.nextSplit());
// Each split produces a data batch followed by a finish-signal batch.
// fetchNextSplitId() skips finish-signal batches and returns the next split ID (or null).
assertEquals(split1.splitId(), fetchNextSplitId(reader));
assertEquals(split2.splitId(), fetchNextSplitId(reader));
assertEquals(split3.splitId(), fetchNextSplitId(reader));
assertNull(fetchNextSplitId(reader));
}

@Test
Expand All @@ -147,9 +134,9 @@ public void testHandleSplitsChangesWithComparator() throws IOException {
reader.handleSplitsChanges(splitsChange);

// Should fetch in reverse order due to comparator
assertEquals(split3.splitId(), reader.fetch().nextSplit());
assertEquals(split2.splitId(), reader.fetch().nextSplit());
assertEquals(split1.splitId(), reader.fetch().nextSplit());
assertEquals(split3.splitId(), fetchNextSplitId(reader));
assertEquals(split2.splitId(), fetchNextSplitId(reader));
assertEquals(split1.splitId(), fetchNextSplitId(reader));
}

@Test
Expand All @@ -170,10 +157,10 @@ public void testAddingSplitsInMultipleBatches() throws IOException {
reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(split2, split3)));

// Verify all splits can be fetched
assertEquals(split1.splitId(), reader.fetch().nextSplit());
assertEquals(split2.splitId(), reader.fetch().nextSplit());
assertEquals(split3.splitId(), reader.fetch().nextSplit());
assertNull(reader.fetch().nextSplit());
assertEquals(split1.splitId(), fetchNextSplitId(reader));
assertEquals(split2.splitId(), fetchNextSplitId(reader));
assertEquals(split3.splitId(), fetchNextSplitId(reader));
assertNull(fetchNextSplitId(reader));
}

@Test
Expand Down Expand Up @@ -238,6 +225,7 @@ public void testReaderFunctionCalledCorrectly() throws IOException {
assertEquals(split, readerFunction.getLastReadSplit());
}

@Test
public void testReaderFunctionClosedOnReaderClose() throws Exception {
TestSplitReaderFunction readerFunction = new TestSplitReaderFunction();
HoodieSourceSplitReader<String> reader =
Expand Down Expand Up @@ -279,9 +267,9 @@ public void testSplitOrderPreservedWithoutComparator() throws IOException {
reader.handleSplitsChanges(splitsChange);

// Should fetch in insertion order: 3, 1, 2
assertEquals(split3.splitId(), reader.fetch().nextSplit());
assertEquals(split1.splitId(), reader.fetch().nextSplit());
assertEquals(split2.splitId(), reader.fetch().nextSplit());
assertEquals(split3.splitId(), fetchNextSplitId(reader));
assertEquals(split1.splitId(), fetchNextSplitId(reader));
assertEquals(split2.splitId(), fetchNextSplitId(reader));
}

@Test
Expand All @@ -302,6 +290,22 @@ public void testReaderIteratorClosedOnSplitFinish() throws IOException {
assertEquals(split1, readerFunction.getLastReadSplit());
}

/**
* Fetches the next batch that contains actual split data, skipping split-finish signal batches.
* Split-finish batches have non-empty {@code finishedSplits()} but no records.
* Returns null when there are truly no more splits.
*/
private String fetchNextSplitId(HoodieSourceSplitReader<String> reader) throws IOException {
while (true) {
RecordsWithSplitIds<HoodieRecordWithPosition<String>> result = reader.fetch();
if (!result.finishedSplits().isEmpty()) {
// This is a split-finish signal batch; continue to get the next real batch.
continue;
}
return result.nextSplit();
}
}

/**
* Helper method to create a test HoodieSourceSplit.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,8 @@ public void testSplitId() {

// splitId() returns toString()
String splitId = split.splitId();
String toString = split.toString();

assertEquals(toString, splitId);
assertTrue(splitId.contains("HoodieSourceSplit"));
assertTrue(splitId.equals("1:file1"));
}

@Test
Expand All @@ -274,12 +272,14 @@ public void testToString() {
String result = split.toString();

assertTrue(result.contains("HoodieSourceSplit"));
assertTrue(result.contains("splitNum=1"));
assertTrue(result.contains("splitNum"));
assertTrue(result.contains("basePath"));
assertTrue(result.contains("logPaths"));
assertTrue(result.contains("tablePath"));
assertTrue(result.contains("partitionPath"));
assertTrue(result.contains("mergeType"));
assertTrue(result.contains("consumed"));
assertTrue(result.contains("fileOffset"));
}

@Test
Expand Down
Loading
Loading