-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56384][SS] Support stream-stream non-outer join in Update mode #55249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
f772b2c
fab4a3a
7160f2b
808550e
25390da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -370,9 +370,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { | |
| testBinaryOperationInStreamingPlan( | ||
| "inner join in update mode", | ||
| _.join(_, joinType = Inner), | ||
| outputMode = Update, | ||
| streamStreamSupported = false, | ||
| expectedMsg = "is not supported in Update output mode") | ||
| outputMode = Update) | ||
|
|
||
| // Full outer joins: stream-batch/batch-stream join are not allowed, | ||
| // and stream-stream join is allowed 'conditionally' - see below check | ||
|
|
@@ -403,16 +401,25 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { | |
| streamStreamSupported = false, | ||
| expectedMsg = "RightOuter join") | ||
|
|
||
| // Left outer, right outer, full outer, left semi joins | ||
| Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType => | ||
| // Update mode not allowed | ||
| // Left outer, right outer, full outer joins: Update mode not allowed | ||
| Seq(LeftOuter, RightOuter, FullOuter).foreach { joinType => | ||
| assertNotSupportedInStreamingPlan( | ||
| s"$joinType join with stream-stream relations and update mode", | ||
| streamRelation.join(streamRelation, joinType = joinType, | ||
| condition = Some(attribute === attribute)), | ||
| OutputMode.Update(), | ||
| Seq("is not supported in Update output mode")) | ||
| } | ||
|
|
||
| // LeftSemi join: Update mode allowed (equivalent to Append mode for non-outer joins) | ||
| assertSupportedInStreamingPlan( | ||
| s"LeftSemi join with stream-stream relations and update mode", | ||
| streamRelation.join(streamRelation, joinType = LeftSemi, | ||
| condition = Some(attributeWithWatermark === attribute)), | ||
| OutputMode.Update()) | ||
|
|
||
| // Left outer, right outer, full outer, left semi joins | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment seems a little redundant.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me see if this is the consistent pattern within file or we just added this only here. If it's consistent over the file, I guess we can just leave it as it is. If not we can remove it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess the pattern is consistent in the file (that's what Claude said), let's leave it as it is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is just duplicating the list on the next line. Please remove it. If you disagree, please say so, but I don't care what Claude thinks.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a misunderstanding - I think I have commented already about my rationale.
I just asked Claude to "check", not Claude to "judge". So it is my disagreement.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test suite is large enough so people leverages code comment (not code) as index where the test is placed. With LLM it's going to be less and less necessary and maybe there are some ways to make it better, but I'd rather not break what existing code is trying to do - if we think we have better way, we should consider it as a refactor and review separately. |
||
| Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType => | ||
| // Complete mode not allowed | ||
| assertNotSupportedInStreamingPlan( | ||
| s"$joinType join with stream-stream relations and complete mode", | ||
|
|
@@ -671,6 +678,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { | |
| outputMode = Append) | ||
| } | ||
|
|
||
| assertPassOnGlobalWatermarkLimit( | ||
| "streaming aggregation after stream-stream inner join in Update mode", | ||
| streamRelation.join(streamRelation, joinType = Inner, | ||
| condition = Some(attributeWithWatermark === attribute)) | ||
| .groupBy("a")(count("*")), | ||
| outputMode = Update) | ||
|
|
||
| assertFailOnGlobalWatermarkLimit( | ||
| "streaming aggregation on both sides followed by stream-stream inner join in Update mode", | ||
| streamRelation.groupBy("a")(count("*")).join( | ||
| streamRelation.groupBy("a")(count("*")), | ||
| joinType = Inner, | ||
| condition = Some(attributeWithWatermark === attribute)), | ||
| outputMode = Update) | ||
|
|
||
| // Cogroup: only batch-batch is allowed | ||
| testBinaryOperationInStreamingPlan( | ||
| "cogroup", | ||
|
|
@@ -851,6 +873,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { | |
| null, att, att, Seq(att), Seq(att), att, null, Append, | ||
| isMapGroupsWithState = false, null, | ||
| Deduplicate(Seq(attribute), streamRelation)), outputMode = Append) | ||
|
|
||
| Seq(Append, Update).foreach { outputMode => | ||
| assertPassOnGlobalWatermarkLimit( | ||
| s"stream-stream inner join with deduplicate on both sides " + | ||
| s"(with event-time) in ${outputMode} mode", | ||
| Deduplicate(Seq(attributeWithWatermark), streamRelation).join( | ||
| Deduplicate(Seq(attributeWithWatermark), streamRelation), | ||
| joinType = Inner, | ||
| condition = Some(attributeWithWatermark === attribute)), | ||
| outputMode = outputMode) | ||
|
|
||
| assertPassOnGlobalWatermarkLimit( | ||
| s"stream-stream inner join with deduplicate on both sides " + | ||
| s"(without event-time) in ${outputMode} mode", | ||
| Deduplicate(Seq(attribute), streamRelation).join( | ||
| Deduplicate(Seq(attribute), streamRelation), | ||
| joinType = Inner, | ||
| condition = Some(attributeWithWatermark === attribute)), | ||
| outputMode = outputMode) | ||
| } | ||
| } | ||
|
|
||
| /* | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's good for explaining rationale, but this is a test. If we want to have this comment I'd say we should have it in the place where we block the operation. I'll do that in UnsupportedOperationChecker and keep this comment as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added that comment in UnsupportedOperationChecker.