-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[kernel-spark] Add getFileChanges() to support Kernel-based DSv2 streaming (Part I) #5313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Hi @huan233usc @gengliangwang @jerrypeng, could you please help review this PR? |
* | ||
* <p>Indexed: refers to the index in DeltaSourceOffset, assigned by the streaming engine. | ||
*/ | ||
public class KernelIndexedFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: just call it IndexedFile? Kernel is just an impl details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
* from the first row (rowId=0). | ||
*/ | ||
public static long getVersion(ColumnarBatch batch) { | ||
assert batch.getSize() > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's follow https://github.com/delta-io/delta/blob/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RowBackedAction.java#L46 and create a new helper function here
protected int getFieldIndex(String fieldName) {
int index = row.getSchema().indexOf(fieldName);
checkArgument(index >= 0, "Field '%s' not found in schema: %s", fieldName, row.getSchema());
return index;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thank you!
int id = i * 100 + j; | ||
insertValues.append(String.format("(%d, 'User%d')", id, id)); | ||
} | ||
spark.sql(String.format("INSERT INTO %s VALUES %s", testTableName, insertValues.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can following https://github.com/delta-io/delta/blob/master/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java#L579 and test many of the golden tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be overkill at this point, especially when there are so many unsupported table types. I agree we should eventually add a test like this. Added a TODO.
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
continue; | ||
} | ||
long version = StreamingHelper.getVersion(batch); | ||
validateCommit(batch, version, endOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should validate happen after processing previous version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks!
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java
Outdated
Show resolved
Hide resolved
} | ||
CommitRange commitRange = builder.build(engine); | ||
// Required by kernel: perform protocol validation by creating a snapshot at startVersion. | ||
Snapshot startSnapshot = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to get a snapshot even if we start reading from a specific delta log version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's required by the kernel to fetch actions:
* @param startSnapshot the snapshot for startVersion, required to ensure the table is readable by |
Snapshot startSnapshot = | ||
TableManager.loadSnapshot(tablePath).atVersion(startVersion).build(engine); | ||
// TODO(M1): This is not working with ccv2 table | ||
Set<DeltaAction> actionSet = new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally this is class static variable so it will only be allocated once per query run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we also need to get the "REMOVE" actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
See validateCommit() -- the current behavior of the delta connector is that we fail the pipeline if any commit contains a REMOVE (unless skipDeletes or skipChangeCommits are specified). Streaming jobs are meant to process append-only data
} | ||
long version = StreamingHelper.getVersion(batch); | ||
// TODO(M1): migrate to kernel's commit-level iterator (WIP). | ||
// The current one-pass algorithm assumes REMOVE actions proceed ADD actions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are you filtering out the "REMOVE" actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We throw an error whenever we encounter a REMOVE -- because ETL jobs should process append-only data. We fail explicitly to avoid correctness issues.
In M2, we'll also support ignoreChangedCommits and ignoreDeletes to skip these commits silently.
To properly handle update actions, users would need to use CDF.
.dependsOn(kernelApi) | ||
.dependsOn(kernelDefaults) | ||
.dependsOn(spark % "test->test") | ||
.dependsOn(spark % "compile->compile;test->test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, compilation would fail because the program cannot find org.apache.spark.sql.delta.DeltaErrors and org.apache.spark.sql.delta.sources.DeltaSourceOffset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dependency between V2 & V1 is still under discussion. However, I don't want to block your development because of this. We can still create a new copy of the DeltaErrors and DeltaSourceOffset if we decide not to have code reuse
This comment is generated by AI. Yes, this change is necessary. The main source code in
These are production dependencies used in the implementation, so we need |
} | ||
|
||
Row addFileRow = StructRow.fromStructVector(addVector, rowId); | ||
if (addFileRow == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw here? given -- addVector.isNullAt(rowId) is false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call this method even on REMOVE rows in extractIndexedFilesFromBatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is a remove row, iiuc the method will return on L175? Did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes. You are right. Done.
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Show resolved
Hide resolved
kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java
Outdated
Show resolved
Hide resolved
// A version can be split across multiple batches. | ||
long currentVersion = -1; | ||
long currentIndex = 0; | ||
List<IndexedFile> currentVersionFiles = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its more performant to use a linked list if we don't actually know the size the list will be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. The per-node overhead of a linked list outweighs the cost of resizing of an arraylist, especially for my use case (addAll(), add(), clear()). We would maybe get a performance benefit if we do a lot of deletes and inserts at the beginning or in the middle (which we are not).
|
||
// TODO(#5319): check trackingMetadataChange flag and compare with stream metadata. | ||
|
||
result.addAll(dataFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very efficient. "dataFiles" should just be a linked list and you can append and prepend in constant time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto -- I don't think linked lists would help here.
// The current one-pass algorithm assumes REMOVE actions proceed ADD actions | ||
// in a commit; we should implement a proper two-pass approach once kernel API is ready. | ||
|
||
if (currentVersion != -1 && version != currentVersion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic here is kind of confusing. All you are trying to do is sandwich the index files between the BASE_INDEX sentinel file and END_INDEX sentinel file right? Why not simplify the logic to be
allIndexedFiles.add(beginSentinelFile)
allIndexedFiles.addAll(allIndexFilesInBatch)
allIndexedFiles.add(endSentinelFile)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only insert sentinels before and after a version. The code is complex because the kernel breaks up a commit into batches (ColumnarBatch) to avoid overwhelming memory. I reorganized the code a bit to make this clear. Could you take another look?
*/ | ||
private List<IndexedFile> extractIndexedFilesFromBatch( | ||
ColumnarBatch batch, long version, long startIndex) { | ||
List<IndexedFile> indexedFiles = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use linkedlist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same rationale as above -- we are only doing addAll() and add(), arrayList would be faster and more memory-efficient.
Arguments.of( | ||
0L, BASE_INDEX, isInitialSnapshot, Optional.of(2L), Optional.of(5L), "v0 to v2 id:5"), | ||
Arguments.of( | ||
1L, 5L, isInitialSnapshot, Optional.of(3L), Optional.of(10L), "v1 id:5 to v3 id:10"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about to and from END_INDEX?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks!
*/ | ||
@ParameterizedTest | ||
@MethodSource("getFileChangesParameters") | ||
public void testGetFileChanges( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also test with other types of actions in the delta log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do test REMOVEs & ADDs, I added METADATA too (which now will yield empty commits).
"Index mismatch at index %d: dsv1=%d, dsv2=%d", | ||
i, deltaFile.index(), kernelFile.getIndex())); | ||
|
||
String deltaPath = deltaFile.add() != null ? deltaFile.add().path() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: document that deltaFile.add() != null could happen when it is starting/ending index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is good as a starting point.
Which Delta project/connector is this regarding?
Description
This PR is Part I of implementing
SparkMicroBatchStream.getFileChanges()
to support Kernel-based dsv2 Delta streaming (M1 milestone).Followups include schema evolution support and initial snapshot support (marked
TODO(M1)
in code)How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?
No