Skip to content

Conversation

zikangh
Copy link

@zikangh zikangh commented Oct 7, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR is Part I of implementing SparkMicroBatchStream.getFileChanges() to support Kernel-based dsv2 Delta streaming (M1 milestone).

  • Reads Delta commit range and converts actions to KernelIndexedFile objects with proper indexing and sentinel values.
  • Basic 1-pass commit validation.

Followups include schema evolution support and initial snapshot support (marked TODO(M1) in code)

How was this patch tested?

Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).

Does this PR introduce any user-facing changes?

No

@zikangh zikangh mentioned this pull request Oct 7, 2025
5 tasks
@zikangh zikangh changed the title [kernel dsv2 streaming] Add logic that reads the delta commit log in preparation for streaming read (Part I) [kernel dsv2 streaming] Add logic that reads the delta commit log to determine offsets for streaming read (Part I) Oct 7, 2025
@zikangh zikangh changed the title [kernel dsv2 streaming] Add logic that reads the delta commit log to determine offsets for streaming read (Part I) [kernel-spark] Add getFileChanges() to support Kernel-based DSv2 streaming (Part I) Oct 7, 2025
@zikangh
Copy link
Author

zikangh commented Oct 8, 2025

Hi @huan233usc @gengliangwang @jerrypeng, could you please help review this PR?

*
* <p>Indexed: refers to the index in DeltaSourceOffset, assigned by the streaming engine.
*/
public class KernelIndexedFile {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: just call it IndexedFile? Kernel is just an impl details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* from the first row (rowId=0).
*/
public static long getVersion(ColumnarBatch batch) {
assert batch.getSize() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's follow https://github.com/delta-io/delta/blob/master/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/RowBackedAction.java#L46 and create a new helper function here

protected int getFieldIndex(String fieldName) {
    int index = row.getSchema().indexOf(fieldName);
    checkArgument(index >= 0, "Field '%s' not found in schema: %s", fieldName, row.getSchema());
    return index;
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you!

int id = i * 100 + j;
insertValues.append(String.format("(%d, 'User%d')", id, id));
}
spark.sql(String.format("INSERT INTO %s VALUES %s", testTableName, insertValues.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be overkill at this point, especially when there are so many unsupported table types. I agree we should eventually add a test like this. Added a TODO.

continue;
}
long version = StreamingHelper.getVersion(batch);
validateCommit(batch, version, endOffset);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should validate happen after processing previous version?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks!

}
CommitRange commitRange = builder.build(engine);
// Required by kernel: perform protocol validation by creating a snapshot at startVersion.
Snapshot startSnapshot =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to get a snapshot even if we start reading from a specific delta log version?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required by the kernel to fetch actions:

* @param startSnapshot the snapshot for startVersion, required to ensure the table is readable by

Snapshot startSnapshot =
TableManager.loadSnapshot(tablePath).atVersion(startVersion).build(engine);
// TODO(M1): This is not working with ccv2 table
Set<DeltaAction> actionSet = new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this is class static variable so it will only be allocated once per query run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we also need to get the "REMOVE" actions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

See validateCommit() -- the current behavior of the delta connector is that we fail the pipeline if any commit contains a REMOVE (unless skipDeletes or skipChangeCommits are specified). Streaming jobs are meant to process append-only data

}
long version = StreamingHelper.getVersion(batch);
// TODO(M1): migrate to kernel's commit-level iterator (WIP).
// The current one-pass algorithm assumes REMOVE actions proceed ADD actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you filtering out the "REMOVE" actions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We throw an error whenever we encounter a REMOVE -- because ETL jobs should process append-only data. We fail explicitly to avoid correctness issues.
In M2, we'll also support ignoreChangedCommits and ignoreDeletes to skip these commits silently.
To properly handle update actions, users would need to use CDF.

@zikangh zikangh requested a review from jerrypeng October 9, 2025 18:03
.dependsOn(kernelApi)
.dependsOn(kernelDefaults)
.dependsOn(spark % "test->test")
.dependsOn(spark % "compile->compile;test->test")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, compilation would fail because the program cannot find org.apache.spark.sql.delta.DeltaErrors and org.apache.spark.sql.delta.sources.DeltaSourceOffset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency between V2 & V1 is still under discussion. However, I don't want to block your development because of this. We can still create a new copy of the DeltaErrors and DeltaSourceOffset if we decide not to have code reuse

@zikangh
Copy link
Author

zikangh commented Oct 9, 2025

This comment is generated by AI.

Yes, this change is necessary. The main source code in SparkMicroBatchStream.java (not just tests) now uses classes from the spark module:

  • org.apache.spark.sql.delta.DeltaErrors (line 241 in validateCommit())
  • org.apache.spark.sql.delta.sources.DeltaSourceOffset (used throughout getFileChanges() and helper methods)

These are production dependencies used in the implementation, so we need compile->compile in addition to the existing test->test dependency.

@huan233usc huan233usc requested review from tdas and removed request for jerrypeng October 9, 2025 21:02
}

Row addFileRow = StructRow.fromStructVector(addVector, rowId);
if (addFileRow == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw here? given -- addVector.isNullAt(rowId) is false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call this method even on REMOVE rows in extractIndexedFilesFromBatch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is a remove row, iiuc the method will return on L175? Did I miss something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. You are right. Done.

// A version can be split across multiple batches.
long currentVersion = -1;
long currentIndex = 0;
List<IndexedFile> currentVersionFiles = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its more performant to use a linked list if we don't actually know the size the list will be

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The per-node overhead of a linked list outweighs the cost of resizing of an arraylist, especially for my use case (addAll(), add(), clear()). We would maybe get a performance benefit if we do a lot of deletes and inserts at the beginning or in the middle (which we are not).


// TODO(#5319): check trackingMetadataChange flag and compare with stream metadata.

result.addAll(dataFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very efficient. "dataFiles" should just be a linked list and you can append and prepend in constant time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto -- I don't think linked lists would help here.

// The current one-pass algorithm assumes REMOVE actions proceed ADD actions
// in a commit; we should implement a proper two-pass approach once kernel API is ready.

if (currentVersion != -1 && version != currentVersion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic here is kind of confusing. All you are trying to do is sandwich the index files between the BASE_INDEX sentinel file and END_INDEX sentinel file right? Why not simplify the logic to be

allIndexedFiles.add(beginSentinelFile)
allIndexedFiles.addAll(allIndexFilesInBatch)
allIndexedFiles.add(endSentinelFile)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only insert sentinels before and after a version. The code is complex because the kernel breaks up a commit into batches (ColumnarBatch) to avoid overwhelming memory. I reorganized the code a bit to make this clear. Could you take another look?

*/
private List<IndexedFile> extractIndexedFilesFromBatch(
ColumnarBatch batch, long version, long startIndex) {
List<IndexedFile> indexedFiles = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use linkedlist

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same rationale as above -- we are only doing addAll() and add(), arrayList would be faster and more memory-efficient.

Arguments.of(
0L, BASE_INDEX, isInitialSnapshot, Optional.of(2L), Optional.of(5L), "v0 to v2 id:5"),
Arguments.of(
1L, 5L, isInitialSnapshot, Optional.of(3L), Optional.of(10L), "v1 id:5 to v3 id:10"),
Copy link
Contributor

@jerrypeng jerrypeng Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about to and from END_INDEX?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

*/
@ParameterizedTest
@MethodSource("getFileChangesParameters")
public void testGetFileChanges(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test with other types of actions in the delta log?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do test REMOVEs & ADDs, I added METADATA too (which now will yield empty commits).

"Index mismatch at index %d: dsv1=%d, dsv2=%d",
i, deltaFile.index(), kernelFile.getIndex()));

String deltaPath = deltaFile.add() != null ? deltaFile.add().path() : null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: document that deltaFile.add() != null could happen when it is starting/ending index

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator

@huan233usc huan233usc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good as a starting point.

@zikangh zikangh requested a review from jerrypeng October 13, 2025 18:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants