-
Couldn't load subscription status.
- Fork 1.9k
[Kernel]Provide api to return per commit action iterator in get change api #5290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| * <p>This will also close the actions iterator if it hasn't been fully consumed. | ||
| */ | ||
| @Override | ||
| void close() throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this separate close()? is anything allocated other than the iterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is this just so it can be AutoCloseable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CommitAction will own the action iterator and transfer the ownership to caller when getActions. I am trying to handle the logic of closing this action iterator if getActions not called.
I made AutoCloseable only for impl class as it is a bit strange if we put it in the interface
| * | ||
| * @return a {@link CloseableIterator} over columnar batches containing this commit's actions | ||
| */ | ||
| CloseableIterator<ColumnarBatch> getActions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it needs to be clear whether you can re-call this multiple times (new iterator each time) or if it always returns the same one. I think maybe this is a potential design decision? From an implementation standpoint either should be do-able.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is a good point, I rethink a bit what will be the best to serve the use case ---
The pseudo code of the actual use case is
try (CommitActions commitActions = ...) {
// First pass: validate and decide
try (CloseableIterator iter = commitActions.getActions()) {
boolean shouldSkip = validateCommitAndDecideSkipping(iter);
}
// Second pass: process the data (re-reads from file)
if (!shouldSkip) {
try (CloseableIterator iter = commitActions.getActions()) {
processActions(iter);
}
}
}
So basically three options
Only allow 1 time call (this implementation)
+Simple in kernel
-Difficult in connector: Connector needs to create a list to collect all add files so that they could do a second pass. "Create a list to collect all add files" may lead to memory pressure if commit file is large and there's no way to get around it.
-The complexity/memory pressure is just pushed to the connector layer
2.1 Allow multiple time call, implementation: kernel collects all batches in a List in CommitAction
+Simple in connector
-Memory pressure in kernel: All batches for a commit must be kept in memory
- For large commits with many actions, this could be problematic
- The memory is held even if caller only needs one pass
2.2 Allow multiple time call, implementation: kernel re-reads the commit file on each getActions() call
+Memory efficient: No need to cache batches in memory
+Flexible: Caller can choose to call once or multiple times based on their needs
+Supports the two-pass use case naturally
-Slight I/O overhead: Re-reads file if getActions() called multiple times
-More code complex implementation in kernel
Slight I/O overhead could actually be further improved by connector implementing a cache or if needed, we could add a hint parameter like
CommitActions.Builder.withCaching(boolean) or size threshold to let kernel
decide whether to cache in memory or re-read from file.
With those --- seems that 2.2 is the preferred one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 2.2 is preferred. We don't want to hold memory in Kernel, that's something the connector should implement.
In theory, 2.2 should be easy to support, but i'm not sure if it is actually that way considering we reuse internal classes that maybe don't make that sort of practice easy.
| engine, dataPath.toString(), getDeltaFiles(), actionSet); | ||
| } | ||
|
|
||
| private io.delta.kernel.CommitActions convertToCommitActions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a valid approach that will work. But doesn't it probably make more sense to wrap in the other direction? (just a larger refactor to existing code?)
common code - returns a Iterator<CommitActions>
former method - wraps this and flattens the iterators + adds the timestamp/version columns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the implementation to make underlying internal library returns Iterator.
Then let getActions depends on getCommits
| public CloseableIterator<ColumnarBatch> getActions( | ||
| Engine engine, Snapshot startSnapshot, Set<DeltaLogActionUtils.DeltaAction> actionSet) { | ||
| validateParameters(engine, startSnapshot, actionSet); | ||
| // Build on top of getCommits() by flattening and adding version/timestamp columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getActions now depends on getCommits for the future API consolidation.
| CloseableIterator<ActionWrapper> wrappers = | ||
| new ActionsIterator( | ||
| engine, Collections.singletonList(commitFile), readSchema, Optional.empty()); | ||
| ; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra semi-colon
| () -> new RuntimeException("timestamp should always exist for Delta File")); | ||
|
|
||
| // Process first batch and cache the iterator | ||
| ColumnarBatch firstBatch = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be done lazily in getActions? Just get a reference to the firstWrapper here but build out the batch in the getActions?
|
|
||
| @Override | ||
| public synchronized CloseableIterator<ColumnarBatch> getActions() { | ||
| if (!firstCallDone) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is essentially about caching the iterator and transferring ownership, should we use that terminology instead of firstCallDone and firstReadIterator? Perhaps something like cachedIterator and cachedIteratorTransferred ? Would be good to add comments to the member vars above as well as to what they mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reorg this file, feel like using supplier might be cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall approach lgtm, left some suggestions
| } | ||
|
|
||
| @Override | ||
| public CloseableIterator<CommitActions> getCommits( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name this getCommitActions or something actually? This is misleading, I thought maybe it was getting a list of commit files or something
| * @param shouldDropCommitInfoColumn whether to drop the commitInfo column | ||
| * @return the processed batch | ||
| */ | ||
| public static ColumnarBatch processAndDropColumns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validateProtocolAndDropInternalColumns? more descriptive
|
|
||
| // Validate protocol | ||
| int protocolIdx = batch.getSchema().indexOf("protocol"); | ||
| if (protocolIdx >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the scenario where the protocol column was not read? does that exist (before I think it was impossible)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no, so I make this utils inside CommitActionsImpl and add precondition.
| * @param commits the iterator of CommitActions to flatten | ||
| * @return an iterator of ColumnarBatch with version and timestamp columns added | ||
| */ | ||
| public static CloseableIterator<ColumnarBatch> flattenCommitsAndAddMetadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this would be simpler if we added a util for CloseableIterator<CloseableIterator<T>>::flatMap (or something along those lines)? With unit tests. Then this code should be easier, and I think we could reuse this elsewhere. I really don't like us re-implementing common iterator concepts like flatMap in multiple places, seems dangerous for little bugs
| .withNewColumn(0, new StructField("version", LongType.LONG, false), commitVersionVector) | ||
| .withNewColumn( | ||
| 1, new StructField("timestamp", LongType.LONG, false), commitTimestampVector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - still use the static fields as before?
| testGetChangesVsSpark(tablePath, 0, 2, FULL_ACTION_SET) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are missing a lot of important test cases based on the code
- call
CommitActions::getCommitsmultiple times, we correctly get a new iterator each time and it contains the same rows - empty commit file? (test the code on CommitActionImpl line 90-95 where there is no actions), also make sure this works for the traditional getActions API to make sure the flatMap iterator is fine for empty iterators
- wondering if we should re-run some of the existing tests using this API? Or is it sufficient since we use it underneath for the current one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added cases for first 2.
For the third one, I think it is sufficient since we use it underneath for the current one
- Fix typo in comment: 'will null' -> 'with null' - Fix double period in JavaDoc - Add exception handling in hasNext() to ensure cleanup on error - Update test assertions to use === for consistency - Add test case for exception handling during iteration - Verify that outer iterator is closed when exception occurs
…ocumentation - Rename flatMap() to flatten() for more accurate semantics - Add important note in JavaDoc about calling close() even if iterator is not fully consumed - Remove unnecessary 'throws IOException' from close() method - Update all test case names from 'flatMap' to 'flatten' Addresses reviewer feedback: 1. Method name should be 'flatten' not 'flatMap' (semantics) 2. Callers need to know they must call close() for resource cleanup 3. Exception handling in next() is correct as-is
Sync with the rename from flatMap to flatten in Utils.java
| return new FilteredBatchToRowIter(sourceBatches); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from #5388, will merge that one first
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Adds a `Utils.flatMap` utility method for flattening nested `CloseableIterator<CloseableIterator<T>>` structures into `CloseableIterator<T>`. It will be helpful for use case like #5290 ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Unit tests ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
| String tablePath, | ||
| List<FileStatus> commitFiles, | ||
| Set<DeltaLogActionUtils.DeltaAction> actionSet) { | ||
| // Create a new action set which is a super set of the requested actions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to CommitAction's constructor
| .combine( | ||
| wrappers.map( | ||
| // Create supplier that will build the iterator on demand | ||
| this.iteratorSupplier = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I like this pattern any better, in fact I think I find it quite confusing. Maybe can we come up with some good internal class that abstracts all this away?
PeekableActionIterator(commitFile) {
getTimestamp
getVersion
getNewIterator // this can privately implement a peaking iterator?
}
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PeekableIterator is a good idea and we could build commit action based on that.
PeekableActionIterator specifically seems a bit over kill.
Updated the impl
Which Delta project/connector is this regarding?
Description
This PR introduces api to return per commit action iterator in kernel get change api, it will help to implementing current structure streaming logic using kernel
Case 1: need to find commit boundary and inject offset

Case 2: skip commits without read through iterator: no need to iterate all action in 2.json and directly go to 3.json

How was this patch tested?
Unit
Does this PR introduce any user-facing changes?
No