[SPARK-56406][SS] Stream-stream join v4: skip writing secondary index if the operator will not evict from that side#55271
Open
HeartSaVioR wants to merge 2 commits intoapache:masterfrom
Open
[SPARK-56406][SS] Stream-stream join v4: skip writing secondary index if the operator will not evict from that side#55271HeartSaVioR wants to merge 2 commits intoapache:masterfrom
HeartSaVioR wants to merge 2 commits intoapache:masterfrom
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR proposes to skip writing secondary index if the operator will not evict from that side. For simplicity, we keep creating column family and just skip writing to secondary index.
The way to understand whether the operator won't evict from that side is following:
It's very obvious for the case where both sides do not have event time column - both sides do not evict any state at all.
The tricky case is when one side has event time column and another side can deduce from it to evict the state row.
For the former, the logic is able to detect the ability for non-watermarked side and write secondary index for it. (See how joinKeyOrdinalForWatermark is constructed.)
For the latter, technically, we can skip "both" sides to skip writing secondary index, but that's fairly minor case and the logic only enables non-watermarked side to skip writing secondary index. (The PR left the potential optimization as TODO code comment, but we are not going to file a JIRA ticket since we don't know whether we ever demand it.)
The main coverage of this optimization is a regular join where both sides do not have event time column; the coverage of watermark on only one side is a sort of bonus.
For safety net,
evict***methods will raise an exception if the operator has skipped writing secondary index, since it is NOT expected for these methods to be called.Why are the changes needed?
There are several cases where the operator never leverages the secondary index on one (or both) side(s), so there is no value to write to the secondary index.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude 4.6 Opus