Skip to content

Commit 7069858

Browse files
jerrypengdongjoon-hyun
authored andcommitted
[SPARK-53823][SS] Implement allow list for real time mode
### What changes were proposed in this pull request? Add an allowlist and some guardrails to help users understand what is supported and what is not supported in Real-time Mode. This should improve the user experience of real-time mode and minimize the chance of a new user use it in a way that is unexpected or untested which may produce undesirable outcomes. ### Why are the changes needed? To help guide users on what is currently supported and what is not. ### Does this PR introduce _any_ user-facing change? Yes, running a query with currently unsupported features in RTM will throw an exception. ### How was this patch tested? Several tests are added ### Was this patch authored or co-authored using generative AI tooling? No Closes #52891 from jerrypeng/SPARK-53823. Authored-by: Jerry Peng <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 178f0f4) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7b7bca8 commit 7069858

File tree

11 files changed

+433
-7
lines changed

11 files changed

+433
-7
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5629,6 +5629,21 @@
56295629
"message" : [
56305630
"The input stream <className> is not supported in Real-time Mode."
56315631
]
5632+
},
5633+
"OPERATOR_OR_SINK_NOT_IN_ALLOWLIST" : {
5634+
"message" : [
5635+
"The <errorType>(s): <message> not in the <errorType> allowlist for Real-Time Mode. To bypass this check, set spark.sql.streaming.realTimeMode.allowlistCheck to false. By changing this, you agree to run the query at your own risk."
5636+
]
5637+
},
5638+
"OUTPUT_MODE_NOT_SUPPORTED" : {
5639+
"message" : [
5640+
"The output mode <outputMode> is not supported. To work around this limitation, set the output mode to Update. In the future, <outputMode> may be supported."
5641+
]
5642+
},
5643+
"SINK_NOT_SUPPORTED" : {
5644+
"message" : [
5645+
"The <className> sink is currently not supported. See the Real-Time Mode User Guide for a list of supported sinks."
5646+
]
56325647
}
56335648
},
56345649
"sqlState" : "0A000"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,25 @@ object UnsupportedOperationChecker extends Logging {
584584
}
585585
}
586586

587+
// Verifies that a query using real-time mode is valid. It is meant to be used in addition to
588+
// the checkForStreaming method: for this reason, we call this method check *additional*
589+
// real-time mode constraints.
590+
//
591+
// It should be called during resolution of the WriteToStreamStatement if and only if
592+
// the query is using the real-time trigger.
593+
def checkAdditionalRealTimeModeConstraints(plan: LogicalPlan, outputMode: OutputMode): Unit = {
594+
if (outputMode != InternalOutputModes.Update) {
595+
throwRealTimeError("OUTPUT_MODE_NOT_SUPPORTED", Map("outputMode" -> outputMode.toString))
596+
}
597+
}
598+
599+
private def throwRealTimeError(subClass: String, args: Map[String, String]): Unit = {
600+
throw new AnalysisException(
601+
errorClass = s"STREAMING_REAL_TIME_MODE.$subClass",
602+
messageParameters = args
603+
)
604+
}
605+
587606
private def throwErrorIf(
588607
condition: Boolean,
589608
msg: String)(implicit operator: LogicalPlan): Unit = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
2323
import org.apache.spark.sql.catalyst.expressions.Attribute
2424
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
2525
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
26-
import org.apache.spark.sql.streaming.OutputMode
26+
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
2727

2828
/**
2929
* A statement for Stream writing. It contains all neccessary param and will be resolved in the
@@ -39,7 +39,9 @@ import org.apache.spark.sql.streaming.OutputMode
3939
* @param sink Sink to write the streaming outputs.
4040
* @param outputMode Output mode for the sink.
4141
* @param hadoopConf The Hadoop Configuration to get a FileSystem instance
42-
* @param isContinuousTrigger Whether the statement is triggered by a continuous query or not.
42+
* @param trigger The trigger being used for this streaming query. It is not used to create the
43+
* resolved [[WriteToStream]] node; rather, it is only used while checking the plan
44+
* for unsupported operations, which happens during resolution.
4345
* @param inputQuery The analyzed query plan from the streaming DataFrame.
4446
* @param catalogAndIdent Catalog and identifier for the sink, set when it is a V2 catalog table
4547
*/
@@ -51,7 +53,7 @@ case class WriteToStreamStatement(
5153
sink: Table,
5254
outputMode: OutputMode,
5355
hadoopConf: Configuration,
54-
isContinuousTrigger: Boolean,
56+
trigger: Trigger,
5557
inputQuery: LogicalPlan,
5658
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
5759
catalogTable: Option[CatalogTable] = None) extends UnaryNode {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3039,6 +3039,13 @@ object SQLConf {
30393039
.timeConf(TimeUnit.MILLISECONDS)
30403040
.createWithDefault(5000)
30413041

3042+
val STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK = buildConf(
3043+
"spark.sql.streaming.realTimeMode.allowlistCheck")
3044+
.doc("Whether to check all operators, sinks used in real-time mode are in the allowlist.")
3045+
.version("4.1.0")
3046+
.booleanConf
3047+
.createWithDefault(true)
3048+
30423049
val VARIABLE_SUBSTITUTE_ENABLED =
30433050
buildConf("spark.sql.variable.substitute")
30443051
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
853853
Deduplicate(Seq(attribute), streamRelation)), outputMode = Append)
854854
}
855855

856+
/*
857+
=======================================================================================
858+
REAL-TIME STREAMING
859+
=======================================================================================
860+
*/
861+
862+
{
863+
assertNotSupportedForRealTime(
864+
"real-time without operators - append mode",
865+
streamRelation,
866+
Append,
867+
"STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED"
868+
)
869+
870+
assertSupportedForRealTime(
871+
"real-time with stream-batch join - update mode",
872+
streamRelation.join(batchRelation, joinType = Inner),
873+
Update
874+
)
875+
}
876+
856877
/*
857878
=======================================================================================
858879
TESTING FUNCTIONS
@@ -1017,6 +1038,31 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
10171038
}
10181039
}
10191040

1041+
/** Assert that the logical plan is supported for real-time mode */
1042+
def assertSupportedForRealTime(name: String, plan: LogicalPlan, outputMode: OutputMode): Unit = {
1043+
test(s"real-time trigger - $name: supported") {
1044+
UnsupportedOperationChecker.checkAdditionalRealTimeModeConstraints(plan, outputMode)
1045+
}
1046+
}
1047+
1048+
/**
1049+
* Assert that the logical plan is not supported inside a streaming plan with the
1050+
* real-time trigger.
1051+
*/
1052+
def assertNotSupportedForRealTime(
1053+
name: String,
1054+
plan: LogicalPlan,
1055+
outputMode: OutputMode,
1056+
condition: String): Unit = {
1057+
testError(
1058+
s"real-time trigger - $name: not supported",
1059+
Seq("Streaming real-time mode"),
1060+
condition
1061+
) {
1062+
UnsupportedOperationChecker.checkAdditionalRealTimeModeConstraints(plan, outputMode)
1063+
}
1064+
}
1065+
10201066
/**
10211067
* Assert that the logical plan is not supported inside a streaming plan.
10221068
*

sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDat
4444
import org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2
4545
import org.apache.spark.sql.execution.streaming._
4646
import org.apache.spark.sql.execution.streaming.sources._
47+
import org.apache.spark.sql.internal.SQLConf
4748
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
4849
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4950
import org.apache.spark.util.ArrayImplicits._
@@ -299,6 +300,14 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
299300
recoverFromCheckpoint: Boolean = true,
300301
catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
301302
catalogTable: Option[CatalogTable] = None): StreamingQuery = {
303+
if (trigger.isInstanceOf[RealTimeTrigger]) {
304+
RealTimeModeAllowlist.checkAllowedSink(
305+
sink,
306+
ds.sparkSession.sessionState.conf.getConf(
307+
SQLConf.STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK)
308+
)
309+
}
310+
302311
val useTempCheckpointLocation = DataStreamWriter.SOURCES_ALLOW_ONE_TIME_QUERY.contains(source)
303312

304313
ds.sparkSession.sessionState.streamingQueryManager.startQuery(

sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ class StreamingQueryManager private[sql] (
213213
sink,
214214
outputMode,
215215
df.sparkSession.sessionState.newHadoopConf(),
216-
trigger.isInstanceOf[ContinuousTrigger],
216+
trigger,
217217
analyzedPlan,
218218
catalogAndIdent,
219219
catalogTable)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
4141
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
4242
import org.apache.spark.sql.execution.datasources.LogicalRelation
4343
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
44-
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
44+
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
4545
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeq, OffsetSeqMetadata}
4646
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
4747
import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler
@@ -436,7 +436,10 @@ class MicroBatchExecution(
436436
}
437437
}
438438

439-
if (containsStatefulOperator(analyzedPlan)) {
439+
if (trigger.isInstanceOf[RealTimeTrigger]) {
440+
logWarning(log"Disabling AQE since AQE is not supported for Real-time Mode.")
441+
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
442+
} else if (containsStatefulOperator(analyzedPlan)) {
440443
// SPARK-53941: We disable AQE for stateful workloads as of now.
441444
logWarning(log"Disabling AQE since AQE is not supported in stateful workloads.")
442445
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
@@ -1042,6 +1045,14 @@ class MicroBatchExecution(
10421045

10431046
markMicroBatchExecutionStart(execCtx)
10441047

1048+
if (trigger.isInstanceOf[RealTimeTrigger]) {
1049+
RealTimeModeAllowlist.checkAllowedPhysicalOperator(
1050+
execCtx.executionPlan.executedPlan,
1051+
sparkSession.sessionState.conf.getConf(
1052+
SQLConf.STREAMING_REAL_TIME_MODE_ALLOWLIST_CHECK)
1053+
)
1054+
}
1055+
10451056
if (execCtx.previousContext.isEmpty) {
10461057
purgeStatefulMetadataAsync(execCtx.executionPlan.executedPlan)
10471058
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.apache.spark.SparkIllegalArgumentException
21+
import org.apache.spark.internal.{Logging, LogKeys, MessageWithContext}
22+
import org.apache.spark.sql.connector.catalog.Table
23+
import org.apache.spark.sql.execution.SparkPlan
24+
import org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec
25+
import org.apache.spark.sql.execution.streaming.operators.stateful._
26+
27+
object RealTimeModeAllowlist extends Logging {
28+
private val allowedSinks = Set(
29+
"org.apache.spark.sql.execution.streaming.ConsoleTable$",
30+
"org.apache.spark.sql.execution.streaming.sources.ContinuousMemorySink",
31+
"org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable",
32+
"org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable"
33+
)
34+
35+
private val allowedOperators = Set(
36+
"org.apache.spark.sql.execution.AppendColumnsExec",
37+
"org.apache.spark.sql.execution.CollectMetricsExec",
38+
"org.apache.spark.sql.execution.ColumnarToRowExec",
39+
"org.apache.spark.sql.execution.DeserializeToObjectExec",
40+
"org.apache.spark.sql.execution.ExpandExec",
41+
"org.apache.spark.sql.execution.FileSourceScanExec",
42+
"org.apache.spark.sql.execution.FilterExec",
43+
"org.apache.spark.sql.execution.GenerateExec",
44+
"org.apache.spark.sql.execution.InputAdapter",
45+
"org.apache.spark.sql.execution.LocalTableScanExec",
46+
"org.apache.spark.sql.execution.MapElementsExec",
47+
"org.apache.spark.sql.execution.MapPartitionsExec",
48+
"org.apache.spark.sql.execution.PlanLater",
49+
"org.apache.spark.sql.execution.ProjectExec",
50+
"org.apache.spark.sql.execution.RangeExec",
51+
"org.apache.spark.sql.execution.SerializeFromObjectExec",
52+
"org.apache.spark.sql.execution.UnionExec",
53+
"org.apache.spark.sql.execution.WholeStageCodegenExec",
54+
"org.apache.spark.sql.execution.datasources.v2.RealTimeStreamScanExec",
55+
"org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec",
56+
"org.apache.spark.sql.execution.exchange.BroadcastExchangeExec",
57+
"org.apache.spark.sql.execution.exchange.ReusedExchangeExec",
58+
"org.apache.spark.sql.execution.joins.BroadcastHashJoinExec",
59+
classOf[EventTimeWatermarkExec].getName
60+
)
61+
62+
private def classNamesString(classNames: Seq[String]): MessageWithContext = {
63+
val sortedClassNames = classNames.sorted
64+
var message = log"${MDC(LogKeys.CLASS_NAME, sortedClassNames.head)}"
65+
sortedClassNames.tail.foreach(
66+
name => message += log", ${MDC(LogKeys.CLASS_NAME, name)}"
67+
)
68+
if (sortedClassNames.size > 1) {
69+
message + log" are"
70+
} else {
71+
message + log" is"
72+
}
73+
}
74+
75+
private def notInRTMAllowlistException(
76+
errorType: String,
77+
classNames: Seq[String]): SparkIllegalArgumentException = {
78+
assert(classNames.nonEmpty)
79+
new SparkIllegalArgumentException(
80+
errorClass = "STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST",
81+
messageParameters = Map(
82+
"errorType" -> errorType,
83+
"message" -> classNamesString(classNames).message
84+
)
85+
)
86+
}
87+
88+
def checkAllowedSink(sink: Table, throwException: Boolean): Unit = {
89+
if (!allowedSinks.contains(sink.getClass.getName)) {
90+
if (throwException) {
91+
throw notInRTMAllowlistException("sink", Seq(sink.getClass.getName))
92+
} else {
93+
logWarning(
94+
log"The sink: " + classNamesString(Seq(sink.getClass.getName)) +
95+
log" not in the sink allowlist for Real-Time Mode."
96+
)
97+
}
98+
}
99+
}
100+
101+
// Collect ALL nodes whose entire subtree contains RealTimeStreamScanExec.
102+
private def collectRealtimeNodes(root: SparkPlan): Seq[SparkPlan] = {
103+
104+
def collectNodesWhoseSubtreeHasRTS(n: SparkPlan): (Boolean, List[SparkPlan]) = {
105+
n match {
106+
case _: RealTimeStreamScanExec =>
107+
// Subtree has RTS, but we don't collect the RTS node itself.
108+
(true, Nil)
109+
case _ if n.children.isEmpty =>
110+
(false, Nil)
111+
case _ =>
112+
val kidResults = n.children.map(collectNodesWhoseSubtreeHasRTS)
113+
val anyChildHasRTS = kidResults.exists(_._1)
114+
val collectedKids = kidResults.iterator.flatMap(_._2).toList
115+
val collectedHere = if (anyChildHasRTS) n :: collectedKids else collectedKids
116+
(anyChildHasRTS, collectedHere)
117+
}
118+
}
119+
120+
collectNodesWhoseSubtreeHasRTS(root)._2
121+
}
122+
123+
def checkAllowedPhysicalOperator(operator: SparkPlan, throwException: Boolean): Unit = {
124+
val nodesToCheck = collectRealtimeNodes(operator)
125+
val violations = nodesToCheck
126+
.collect {
127+
case node =>
128+
if (allowedOperators.contains(node.getClass.getName)) {
129+
None
130+
} else {
131+
Some(node.getClass.getName)
132+
}
133+
}
134+
.flatten
135+
.distinct
136+
137+
if (violations.nonEmpty) {
138+
if (throwException) {
139+
throw notInRTMAllowlistException("operator", violations.toSet.toSeq)
140+
} else {
141+
logWarning(
142+
log"The operator(s): " + classNamesString(violations.toSet.toSeq) +
143+
log" not in the operator allowlist for Real-Time Mode."
144+
)
145+
}
146+
}
147+
}
148+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ResolveWriteToStream.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
3030
import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement}
3131
import org.apache.spark.sql.connector.catalog.SupportsWrite
3232
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
33+
import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, RealTimeTrigger}
3334
import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
3435
import org.apache.spark.sql.internal.SQLConf
3536
import org.apache.spark.util.Utils
@@ -48,7 +49,12 @@ object ResolveWriteToStream extends Rule[LogicalPlan] {
4849
}
4950

5051
if (conf.isUnsupportedOperationCheckEnabled) {
51-
if (s.sink.isInstanceOf[SupportsWrite] && s.isContinuousTrigger) {
52+
if (s.trigger.isInstanceOf[RealTimeTrigger]) {
53+
UnsupportedOperationChecker.
54+
checkAdditionalRealTimeModeConstraints(s.inputQuery, s.outputMode)
55+
}
56+
57+
if (s.sink.isInstanceOf[SupportsWrite] && s.trigger.isInstanceOf[ContinuousTrigger]) {
5258
UnsupportedOperationChecker.checkForContinuous(s.inputQuery, s.outputMode)
5359
} else {
5460
UnsupportedOperationChecker.checkForStreaming(s.inputQuery, s.outputMode)

0 commit comments

Comments
 (0)