Skip to content

Commit f772b2c

Browse files
committed
[SPARK-XXXXX] Support stream-stream non-outer join in Update mode
1 parent 590b0d5 commit f772b2c

File tree

5 files changed

+120
-13
lines changed

5 files changed

+120
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,21 @@ object UnsupportedOperationChecker extends Logging {
441441
}
442442

443443
case j @ Join(left, right, joinType, condition, _) =>
444-
if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) {
445-
throwError("Join between two streaming DataFrames/Datasets is not supported" +
446-
s" in ${outputMode} output mode, only in Append output mode")
444+
if (left.isStreaming && right.isStreaming) {
445+
joinType match {
446+
case LeftOuter | RightOuter | FullOuter =>
447+
if (outputMode != InternalOutputModes.Append) {
448+
throwError(s"$joinType join between two streaming DataFrames/Datasets" +
449+
s" is not supported in ${outputMode} output mode, only in Append output mode")
450+
}
451+
case _ =>
452+
if (outputMode != InternalOutputModes.Append &&
453+
outputMode != InternalOutputModes.Update) {
454+
throwError(s"$joinType join between two streaming DataFrames/Datasets" +
455+
s" is not supported in ${outputMode} output mode, only in Append and Update " +
456+
"output modes")
457+
}
458+
}
447459
}
448460

449461
joinType match {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -370,9 +370,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
370370
testBinaryOperationInStreamingPlan(
371371
"inner join in update mode",
372372
_.join(_, joinType = Inner),
373-
outputMode = Update,
374-
streamStreamSupported = false,
375-
expectedMsg = "is not supported in Update output mode")
373+
outputMode = Update)
376374

377375
// Full outer joins: stream-batch/batch-stream join are not allowed,
378376
// and stream-stream join is allowed 'conditionally' - see below check
@@ -403,16 +401,25 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
403401
streamStreamSupported = false,
404402
expectedMsg = "RightOuter join")
405403

406-
// Left outer, right outer, full outer, left semi joins
407-
Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType =>
408-
// Update mode not allowed
404+
// Left outer, right outer, full outer joins: Update mode not allowed
405+
Seq(LeftOuter, RightOuter, FullOuter).foreach { joinType =>
409406
assertNotSupportedInStreamingPlan(
410407
s"$joinType join with stream-stream relations and update mode",
411408
streamRelation.join(streamRelation, joinType = joinType,
412409
condition = Some(attribute === attribute)),
413410
OutputMode.Update(),
414411
Seq("is not supported in Update output mode"))
412+
}
413+
414+
// LeftSemi join: Update mode allowed (equivalent to Append mode for non-outer joins)
415+
assertSupportedInStreamingPlan(
416+
s"LeftSemi join with stream-stream relations and update mode",
417+
streamRelation.join(streamRelation, joinType = LeftSemi,
418+
condition = Some(attributeWithWatermark === attribute)),
419+
OutputMode.Update())
415420

421+
// Left outer, right outer, full outer, left semi joins
422+
Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType =>
416423
// Complete mode not allowed
417424
assertNotSupportedInStreamingPlan(
418425
s"$joinType join with stream-stream relations and complete mode",

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper
2727
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, GenericInternalRow, JoinedRow, Literal, Predicate, UnsafeProjection, UnsafeRow}
2828
import org.apache.spark.sql.catalyst.plans._
2929
import org.apache.spark.sql.catalyst.plans.physical._
30+
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
3031
import org.apache.spark.sql.catalyst.types.DataTypeUtils
3132
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
3233
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -35,6 +36,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.join.Streamin
3536
import org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager.KeyToValuePair
3637
import org.apache.spark.sql.execution.streaming.state._
3738
import org.apache.spark.sql.internal.{SessionState, SQLConf}
39+
import org.apache.spark.sql.streaming.OutputMode
3840
import org.apache.spark.sql.types.StructType
3941
import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}
4042

@@ -142,7 +144,9 @@ case class StreamingSymmetricHashJoinExec(
142144
stateWatermarkPredicates: JoinStateWatermarkPredicates,
143145
stateFormatVersion: Int,
144146
left: SparkPlan,
145-
right: SparkPlan) extends BinaryExecNode with StateStoreWriter with SchemaValidationUtils {
147+
right: SparkPlan,
148+
outputMode: Option[OutputMode] = None)
149+
extends BinaryExecNode with StateStoreWriter with SchemaValidationUtils {
146150

147151
def this(
148152
leftKeys: Seq[Expression],
@@ -184,6 +188,13 @@ case class StreamingSymmetricHashJoinExec(
184188
joinType == LeftSemi,
185189
errorMessageForJoinType)
186190

191+
outputMode.foreach { mode =>
192+
if (mode == InternalOutputModes.Update) {
193+
require(joinType == Inner || joinType == LeftSemi,
194+
s"Update output mode is not supported for stream-stream $joinType join")
195+
}
196+
}
197+
187198
// The assertion against join keys is same as hash join for batch query.
188199
require(leftKeys.length == rightKeys.length &&
189200
leftKeys.map(_.dataType)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ class IncrementalExecution(
411411
j.copy(
412412
stateInfo = Some(nextStatefulOperationStateInfo()),
413413
eventTimeWatermarkForLateEvents = None,
414-
eventTimeWatermarkForEviction = None
414+
eventTimeWatermarkForEviction = None,
415+
outputMode = Some(outputMode)
415416
)
416417

417418
case l: StreamingGlobalLimitExec =>

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.scalatest.{BeforeAndAfter, Tag}
3030

3131
import org.apache.spark.SparkUnsupportedOperationException
3232
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
33-
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
33+
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
3434
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
3535
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
3636
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
@@ -352,6 +352,28 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite {
352352
)
353353
}
354354

355+
// Stream-stream non-outer join produces the same behavior between Append mode and Update mode.
356+
// We only run a sanity test here rather than replicating the full Append mode test suite.
357+
test("stream stream inner join with Update mode on non-time column") {
358+
val input1 = MemoryStream[Int]
359+
val input2 = MemoryStream[Int]
360+
361+
val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue")
362+
val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue")
363+
val joined = df1.join(df2, "key")
364+
365+
testStream(joined, OutputMode.Update())(
366+
AddData(input1, 1),
367+
CheckAnswer(),
368+
AddData(input2, 1, 10),
369+
CheckNewAnswer((1, 2, 3)),
370+
AddData(input1, 10),
371+
CheckNewAnswer((10, 20, 30)),
372+
AddData(input2, 1),
373+
CheckNewAnswer((1, 2, 3))
374+
)
375+
}
376+
355377
test("stream stream inner join on windows - without watermark") {
356378
val input1 = MemoryStream[Int]
357379
val input2 = MemoryStream[Int]
@@ -669,7 +691,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite {
669691
assert(query.lastExecution.executedPlan.collect {
670692
case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
671693
ShuffleExchangeExec(opA: HashPartitioning, _, _, _),
672-
ShuffleExchangeExec(opB: HashPartitioning, _, _, _))
694+
ShuffleExchangeExec(opB: HashPartitioning, _, _, _), _)
673695
if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
674696
&& partitionExpressionsColumns(opB.expressions) === Seq("a", "b")
675697
&& opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j
@@ -1242,6 +1264,25 @@ abstract class StreamingOuterJoinBase extends StreamingJoinSuite {
12421264
import testImplicits._
12431265
import org.apache.spark.sql.functions._
12441266

1267+
Seq("left_outer", "right_outer").foreach { joinType =>
1268+
test(s"stream-stream $joinType join does not support Update mode") {
1269+
val input1 = MemoryStream[Int]
1270+
val input2 = MemoryStream[Int]
1271+
1272+
val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue")
1273+
val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue")
1274+
val joined = df1.join(df2, Seq("key"), joinType)
1275+
1276+
val e = intercept[AnalysisException] {
1277+
testStream(joined, OutputMode.Update())(
1278+
AddData(input1, 1),
1279+
CheckAnswer()
1280+
)
1281+
}
1282+
assert(e.getMessage.contains("is not supported in Update output mode"))
1283+
}
1284+
}
1285+
12451286
test("left outer early state exclusion on left") {
12461287
withTempDir { checkpointDir =>
12471288
val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_outer")
@@ -1954,6 +1995,25 @@ abstract class StreamingOuterJoinSuite extends StreamingOuterJoinBase {
19541995
@SlowSQLTest
19551996
abstract class StreamingFullOuterJoinBase extends StreamingJoinSuite {
19561997

1998+
import testImplicits._
1999+
2000+
test("stream-stream full outer join does not support Update mode") {
2001+
val input1 = MemoryStream[Int]
2002+
val input2 = MemoryStream[Int]
2003+
2004+
val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue")
2005+
val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue")
2006+
val joined = df1.join(df2, Seq("key"), "full_outer")
2007+
2008+
val e = intercept[AnalysisException] {
2009+
testStream(joined, OutputMode.Update())(
2010+
AddData(input1, 1),
2011+
CheckAnswer()
2012+
)
2013+
}
2014+
assert(e.getMessage.contains("is not supported in Update output mode"))
2015+
}
2016+
19572017
test("windowed full outer join") {
19582018
withTempDir { checkpointDir =>
19592019
val (leftInput, rightInput, joined) = setupWindowedJoin("full_outer")
@@ -2176,6 +2236,22 @@ abstract class StreamingLeftSemiJoinBase extends StreamingJoinSuite {
21762236

21772237
import testImplicits._
21782238

2239+
// Stream-stream non-outer join produces the same behavior between Append mode and Update mode.
2240+
// We only run a sanity test here rather than replicating the full Append mode test suite.
2241+
test("windowed left semi join with Update mode") {
2242+
withTempDir { checkpointDir =>
2243+
val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
2244+
2245+
testStream(joined, OutputMode.Update())(
2246+
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
2247+
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
2248+
CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
2249+
MultiAddData(leftInput, 21)(rightInput, 22),
2250+
CheckNewAnswer()
2251+
)
2252+
}
2253+
}
2254+
21792255
test("windowed left semi join") {
21802256
withTempDir { checkpointDir =>
21812257
val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")

0 commit comments

Comments
 (0)