Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,24 @@ object UnsupportedOperationChecker extends Logging {
}

case j @ Join(left, right, joinType, condition, _) =>
if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) {
throwError("Join between two streaming DataFrames/Datasets is not supported" +
s" in ${outputMode} output mode, only in Append output mode")
if (left.isStreaming && right.isStreaming) {
joinType match {
// The behavior for unmatched rows in outer joins with update mode
// hasn't been defined yet.
case LeftOuter | RightOuter | FullOuter =>
if (outputMode != InternalOutputModes.Append) {
throwError(s"$joinType join between two streaming DataFrames/Datasets" +
s" is not supported in ${outputMode} output mode, only in Append output mode")
}
case _: InnerLike | LeftSemi =>
if (outputMode != InternalOutputModes.Append &&
outputMode != InternalOutputModes.Update) {
throwError(s"$joinType join between two streaming DataFrames/Datasets" +
s" is not supported in ${outputMode} output mode, only in Append and Update " +
"output modes")
}
case _ => // we will throw an error in the next pattern match
}
}

joinType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
testBinaryOperationInStreamingPlan(
"inner join in update mode",
_.join(_, joinType = Inner),
outputMode = Update,
streamStreamSupported = false,
expectedMsg = "is not supported in Update output mode")
outputMode = Update)

// Full outer joins: stream-batch/batch-stream join are not allowed,
// and stream-stream join is allowed 'conditionally' - see below check
Expand Down Expand Up @@ -403,16 +401,25 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
streamStreamSupported = false,
expectedMsg = "RightOuter join")

// Left outer, right outer, full outer, left semi joins
Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType =>
// Update mode not allowed
// Left outer, right outer, full outer joins: Update mode not allowed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Left outer, right outer, full outer joins: Update mode not allowed
// The behavior for unmatched rows in outer joins with update mode hasn't been defined yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good for explaining rationale, but this is a test. If we want to have this comment I'd say we should have it in the place where we block the operation. I'll do that in UnsupportedOperationChecker and keep this comment as it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added that comment in UnsupportedOperationChecker.

Seq(LeftOuter, RightOuter, FullOuter).foreach { joinType =>
assertNotSupportedInStreamingPlan(
s"$joinType join with stream-stream relations and update mode",
streamRelation.join(streamRelation, joinType = joinType,
condition = Some(attribute === attribute)),
OutputMode.Update(),
Seq("is not supported in Update output mode"))
}

// LeftSemi join: Update mode allowed (equivalent to Append mode for non-outer joins)
assertSupportedInStreamingPlan(
s"LeftSemi join with stream-stream relations and update mode",
streamRelation.join(streamRelation, joinType = LeftSemi,
condition = Some(attributeWithWatermark === attribute)),
OutputMode.Update())

// Left outer, right outer, full outer, left semi joins
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems a little redundant.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if this is the consistent pattern within file or we just added this only here. If it's consistent over the file, I guess we can just leave it as it is. If not we can remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the pattern is consistent in the file (that's what Claude said), let's leave it as it is.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is just duplicating the list on the next line. Please remove it.

If you disagree, please say so, but I don't care what Claude thinks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a misunderstanding - I think I have commented already about my rationale.

Let me see if this is the consistent pattern within file or we just added this only here. If it's consistent over the file, I guess we can just leave it as it is. If not we can remove it.

I just asked Claude to "check", not Claude to "judge". So it is my disagreement.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test suite is large enough so people leverages code comment (not code) as index where the test is placed. With LLM it's going to be less and less necessary and maybe there are some ways to make it better, but I'd rather not break what existing code is trying to do - if we think we have better way, we should consider it as a refactor and review separately.

Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType =>
// Complete mode not allowed
assertNotSupportedInStreamingPlan(
s"$joinType join with stream-stream relations and complete mode",
Expand Down Expand Up @@ -671,6 +678,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
outputMode = Append)
}

assertPassOnGlobalWatermarkLimit(
"streaming aggregation after stream-stream inner join in Update mode",
streamRelation.join(streamRelation, joinType = Inner,
condition = Some(attributeWithWatermark === attribute))
.groupBy("a")(count("*")),
outputMode = Update)

assertFailOnGlobalWatermarkLimit(
"streaming aggregation on both sides followed by stream-stream inner join in Update mode",
streamRelation.groupBy("a")(count("*")).join(
streamRelation.groupBy("a")(count("*")),
joinType = Inner,
condition = Some(attributeWithWatermark === attribute)),
outputMode = Update)

// Cogroup: only batch-batch is allowed
testBinaryOperationInStreamingPlan(
"cogroup",
Expand Down Expand Up @@ -851,6 +873,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null,
Deduplicate(Seq(attribute), streamRelation)), outputMode = Append)

Seq(Append, Update).foreach { outputMode =>
assertPassOnGlobalWatermarkLimit(
s"stream-stream inner join with deduplicate on both sides " +
s"(with event-time) in ${outputMode} mode",
Deduplicate(Seq(attributeWithWatermark), streamRelation).join(
Deduplicate(Seq(attributeWithWatermark), streamRelation),
joinType = Inner,
condition = Some(attributeWithWatermark === attribute)),
outputMode = outputMode)

assertPassOnGlobalWatermarkLimit(
s"stream-stream inner join with deduplicate on both sides " +
s"(without event-time) in ${outputMode} mode",
Deduplicate(Seq(attribute), streamRelation).join(
Deduplicate(Seq(attribute), streamRelation),
joinType = Inner,
condition = Some(attributeWithWatermark === attribute)),
outputMode = outputMode)
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, GenericInternalRow, JoinedRow, Literal, Predicate, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -35,6 +36,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.join.Streamin
import org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager.KeyToValuePair
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}

Expand Down Expand Up @@ -142,7 +144,9 @@ case class StreamingSymmetricHashJoinExec(
stateWatermarkPredicates: JoinStateWatermarkPredicates,
stateFormatVersion: Int,
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with StateStoreWriter with SchemaValidationUtils {
right: SparkPlan,
outputMode: Option[OutputMode])
extends BinaryExecNode with StateStoreWriter with SchemaValidationUtils {

def this(
leftKeys: Seq[Expression],
Expand All @@ -157,7 +161,8 @@ case class StreamingSymmetricHashJoinExec(
leftKeys, rightKeys, joinType, JoinConditionSplitPredicates(condition, left, right),
stateInfo = None,
eventTimeWatermarkForLateEvents = None, eventTimeWatermarkForEviction = None,
stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right,
None)
}

if (stateFormatVersion < 2 && joinType != Inner) {
Expand All @@ -184,6 +189,13 @@ case class StreamingSymmetricHashJoinExec(
joinType == LeftSemi,
errorMessageForJoinType)

outputMode.foreach { mode =>
if (mode == InternalOutputModes.Update) {
require(joinType == Inner || joinType == LeftSemi,
s"Update output mode is not supported for stream-stream $joinType join")
}
}

// The assertion against join keys is same as hash join for batch query.
require(leftKeys.length == rightKeys.length &&
leftKeys.map(_.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ class IncrementalExecution(
j.copy(
stateInfo = Some(nextStatefulOperationStateInfo()),
eventTimeWatermarkForLateEvents = None,
eventTimeWatermarkForEviction = None
eventTimeWatermarkForEviction = None,
outputMode = Some(outputMode)
)

case l: StreamingGlobalLimitExec =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ class MultiStatefulOperatorsSuite
StateStore.stop()
}

private def testWithAppendAndUpdate(testName: String)(
testBody: OutputMode => Any): Unit = {
Seq(OutputMode.Append(), OutputMode.Update()).foreach { outputMode =>
test(s"$testName - $outputMode")(testBody(outputMode))
}
}

test("window agg -> window agg, append mode") {
val inputData = MemoryStream[Int]

Expand Down Expand Up @@ -934,6 +941,68 @@ class MultiStatefulOperatorsSuite
)
}

testWithAppendAndUpdate("dedup on both sides -> stream-stream inner join") { outputMode =>
val input1 = MemoryStream[Int]
val inputDF1 = input1.toDF()
.select($"value".as("value1"), timestamp_seconds($"value").as("eventTime1"))
.withWatermark("eventTime1", "10 seconds")
.dropDuplicates("value1", "eventTime1")

val input2 = MemoryStream[Int]
val inputDF2 = input2.toDF()
.select($"value".as("value2"), timestamp_seconds($"value").as("eventTime2"))
.withWatermark("eventTime2", "10 seconds")
.dropDuplicates("value2", "eventTime2")

val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
.select($"value1", $"value2")

testStream(stream, outputMode)(
MultiAddData(input1, 1, 2, 3, 1)(input2, 1, 2, 3, 2),
CheckNewAnswer((1, 1), (2, 2), (3, 3)),

MultiAddData(input1, 1, 2, 4)(input2, 2, 3, 4),
CheckNewAnswer((4, 4))
)
}

test("stream-stream inner join -> window agg, update mode") {
val input1 = MemoryStream[Int]
val inputDF1 = input1.toDF()
.select($"value".as("value1"), timestamp_seconds($"value").as("eventTime1"))
.withWatermark("eventTime1", "0 seconds")

val input2 = MemoryStream[Int]
val inputDF2 = input2.toDF()
.select($"value".as("value2"), timestamp_seconds($"value").as("eventTime2"))
.withWatermark("eventTime2", "0 seconds")

val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner")
.groupBy(window($"eventTime1", "5 seconds").as("window"))
.agg(count("*").as("count"))
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

testStream(stream, OutputMode.Update())(
MultiAddData(input1, 1, 2)(input2, 1, 2),
// join output: (1, 1), (2, 2)
// agg: [0, 5) count = 2
CheckNewAnswer((0, 2)),

// Add more data to the same window [0, 5)
MultiAddData(input1, 3, 4)(input2, 3, 4),
// join output: (3, 3), (4, 4)
// agg: [0, 5) count = 2 + 2 = 4
// Update mode re-emits the window with updated count
CheckNewAnswer((0, 4)),

MultiAddData(input1, 5 to 8: _*)(input2, 5 to 8: _*),
// join output: (5, 5), (6, 6), (7, 7), (8, 8)
// agg: [5, 10) count = 4
// Only the new/updated window is emitted
CheckNewAnswer((5, 4))
)
}

private def assertNumStateRows(numTotalRows: Seq[Long]): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
val progressWithData = q.recentProgress.lastOption.get
Expand Down
Loading