Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ class SymmetricHashJoinStateManagerV4(
// pass the information. The information is in SQLConf.
allowMultipleEventTimeColumns = false)

// When there is no event time column in the value and no watermark ordinal in the key,
// the secondary index (TsWithKey) will never be used for eviction. Skip writing to it
// to avoid unnecessary RocksDB merge overhead.
// TODO: This could be further optimized by also considering whether the state watermark
// predicate is defined. Even when an event time column exists, the secondary index is
// unused if eviction is not possible (e.g., only one side defines a watermark in a time
// interval join). That would require propagating the predicate information here.
private val hasEventTime: Boolean =
eventTimeColIdxOpt.isDefined || joinKeyOrdinalForWatermark.isDefined

private val random = new scala.util.Random(System.currentTimeMillis())
private val bucketCountForNoEventTime = 1024
private val extractEventTimeFn: UnsafeRow => Long = { row =>
Expand Down Expand Up @@ -353,7 +363,9 @@ class SymmetricHashJoinStateManagerV4(
val eventTime = extractEventTimeFnFromKey(key).getOrElse(extractEventTimeFn(value))
// We always do blind merge for appending new value.
keyWithTsToValues.append(key, eventTime, value, matched)
tsWithKey.add(eventTime, key)
if (hasEventTime) {
tsWithKey.add(eventTime, key)
}
}

override def getJoinedRows(
Expand Down Expand Up @@ -508,6 +520,8 @@ class SymmetricHashJoinStateManagerV4(
}

override def evictByTimestamp(endTimestamp: Long): Long = {
require(hasEventTime,
"evictByTimestamp requires event time; secondary index was not populated")
var removed = 0L
tsWithKey.scanEvictedKeys(endTimestamp).foreach { evicted =>
val key = evicted.key
Expand All @@ -524,6 +538,8 @@ class SymmetricHashJoinStateManagerV4(
}

override def evictAndReturnByTimestamp(endTimestamp: Long): Iterator[KeyToValuePair] = {
require(hasEventTime,
"evictAndReturnByTimestamp requires event time; secondary index was not populated")
val reusableKeyToValuePair = KeyToValuePair()

tsWithKey.scanEvictedKeys(endTimestamp).flatMap { evicted =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
import org.apache.hadoop.fs.Path
import org.scalatest.Tag

import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinExec
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
Expand Down Expand Up @@ -184,6 +185,143 @@ class StreamingInnerJoinV4Suite
)
}
}

private def readStateStore(checkpointLoc: String, storeName: String): Long = {
spark.read.format("statestore")
.option(StateSourceOptions.PATH, checkpointLoc)
.option(StateSourceOptions.STORE_NAME, storeName)
.load()
.count()
}

testWithVirtualColumnFamilyJoins(
"SPARK-56406: secondary index is not populated for join without event time") {
withTempDir { checkpointDir =>
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF()
.select($"value" as "key", ($"value" * 2) as "leftValue")
val df2 = input2.toDF()
.select($"value" as "key", ($"value" * 3) as "rightValue")
val joined = df1.join(df2, "key")

testStream(joined)(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(input1, 1, 2, 3),
CheckAnswer(),
AddData(input2, 1, 2),
CheckNewAnswer((1, 2, 3), (2, 4, 6)),
Execute { _ =>
val checkpointLoc = checkpointDir.getCanonicalPath

assert(readStateStore(checkpointLoc, "left-keyWithTsToValues") > 0,
"left primary store should have rows")
assert(readStateStore(checkpointLoc, "right-keyWithTsToValues") > 0,
"right primary store should have rows")

assert(readStateStore(checkpointLoc, "left-tsWithKey") === 0,
"left secondary index should be empty without event time")
assert(readStateStore(checkpointLoc, "right-tsWithKey") === 0,
"right secondary index should be empty without event time")
},
StopStream
)
}
}

testWithVirtualColumnFamilyJoins(
"SPARK-56406: secondary index populated on both sides when watermark is on join key") {
withTempDir { checkpointDir =>
val input1 = MemoryStream[(Int, Int)]
val input2 = MemoryStream[(Int, Int)]

val df1 = input1.toDF().toDF("key", "time")
.select($"key", timestamp_seconds($"time") as "ts", ($"key" * 2) as "leftValue")
.withWatermark("ts", "10 seconds")
val df2 = input2.toDF().toDF("key", "time")
.select($"key", timestamp_seconds($"time") as "ts", ($"key" * 3) as "rightValue")
// Only left side has watermark; ts is part of the join key, so
// joinKeyOrdinalForWatermark is defined → hasEventTime = true for both sides.

val joined = df1.join(df2, Seq("key", "ts"))
.select($"key", $"ts".cast("long"), $"leftValue", $"rightValue")

testStream(joined)(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(input1, (1, 10), (2, 20)),
CheckAnswer(),
AddData(input2, (1, 10)),
CheckNewAnswer((1, 10, 2, 3)),
Execute { _ =>
val checkpointLoc = checkpointDir.getCanonicalPath

assert(readStateStore(checkpointLoc, "left-keyWithTsToValues") > 0,
"left primary store should have rows")
assert(readStateStore(checkpointLoc, "right-keyWithTsToValues") > 0,
"right primary store should have rows")

// Both secondary indexes should be populated because joinKeyOrdinalForWatermark
// is defined (watermark on join key applies to both sides).
assert(readStateStore(checkpointLoc, "left-tsWithKey") > 0,
"left secondary index should be populated when watermark is on join key")
assert(readStateStore(checkpointLoc, "right-tsWithKey") > 0,
"right secondary index should be populated when watermark is on join key")
},
StopStream
)
}
}

testWithVirtualColumnFamilyJoins(
"SPARK-56406: secondary index only populated on watermarked side for time interval join") {
withTempDir { checkpointDir =>
val leftInput = MemoryStream[(Int, Int)]
val rightInput = MemoryStream[(Int, Int)]

val df1 = leftInput.toDF().toDF("leftKey", "time")
.select($"leftKey", timestamp_seconds($"time") as "leftTime",
($"leftKey" * 2) as "leftValue")
.withWatermark("leftTime", "10 seconds")
val df2 = rightInput.toDF().toDF("rightKey", "time")
.select($"rightKey", timestamp_seconds($"time") as "rightTime",
($"rightKey" * 3) as "rightValue")
// Only left side has watermark; watermark is on a value column, not the join key.
// joinKeyOrdinalForWatermark is None → only left has hasEventTime = true.
// Neither side can actually evict: the left state watermark is derived from the right
// side's watermark via the join condition, which is absent here. The left secondary
// index is populated but never used for eviction.

val joined = df1.join(df2,
expr("leftKey = rightKey AND " +
"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime + interval 5 seconds"))
.select($"leftKey", $"leftTime".cast("int"), $"rightTime".cast("int"))

testStream(joined)(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
AddData(leftInput, (1, 10), (2, 20)),
CheckAnswer(),
AddData(rightInput, (1, 12)),
CheckNewAnswer((1, 10, 12)),
Execute { _ =>
val checkpointLoc = checkpointDir.getCanonicalPath

assert(readStateStore(checkpointLoc, "left-keyWithTsToValues") > 0,
"left primary store should have rows")
assert(readStateStore(checkpointLoc, "right-keyWithTsToValues") > 0,
"right primary store should have rows")

// Left has watermark on a value column → hasEventTime = true, secondary index populated.
assert(readStateStore(checkpointLoc, "left-tsWithKey") > 0,
"left secondary index should be populated (watermark on left value column)")
// Right has no watermark → hasEventTime = false, secondary index empty.
assert(readStateStore(checkpointLoc, "right-tsWithKey") === 0,
"right secondary index should be empty (no watermark on right side)")
},
StopStream
)
}
}
}

@SlowSQLTest
Expand Down