Skip to content

Commit b9de82f

Browse files
committed
validation
1 parent 8253416 commit b9de82f

File tree

3 files changed

+29
-0
lines changed

3 files changed

+29
-0
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,12 @@
14121412
],
14131413
"sqlState" : "42623"
14141414
},
1415+
"DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED" : {
1416+
"message" : [
1417+
"Defining a one-time flow <flowName> with the 'once' option is not supported."
1418+
],
1419+
"sqlState" : "0A000"
1420+
},
14151421
"DESCRIBE_JSON_NOT_EXTENDED" : {
14161422
"message" : [
14171423
"DESCRIBE TABLE ... AS JSON only supported when [EXTENDED|FORMATTED] is specified.",

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ private[connect] object PipelinesHandler extends Logging {
265265
flow: proto.PipelineCommand.DefineFlow,
266266
transformRelationFunc: Relation => LogicalPlan,
267267
sessionHolder: SessionHolder): TableIdentifier = {
268+
if (flow.hasOnce) {
269+
throw new AnalysisException(
270+
"DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED",
271+
Map("flowName" -> flow.getFlowName))
272+
}
268273
val dataflowGraphId = flow.getDataflowGraphId
269274
val graphElementRegistry =
270275
sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,24 @@ class SparkDeclarativePipelinesServerSuite
7171

7272
}
7373

74+
gridTest("Define flow 'once' argument not supported")(Seq(true, false)) { onceValue =>
75+
val ex = intercept[Exception] {
76+
withRawBlockingStub { implicit stub =>
77+
val graphId = createDataflowGraph
78+
sendPlan(
79+
buildPlanFromPipelineCommand(
80+
PipelineCommand
81+
.newBuilder()
82+
.setDefineFlow(DefineFlow
83+
.newBuilder()
84+
.setDataflowGraphId(graphId)
85+
.setOnce(onceValue))
86+
.build()))
87+
}
88+
}
89+
assert(ex.getMessage.contains("DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED"))
90+
}
91+
7492
test(
7593
"Cross dependency between SQL dataset and non-SQL dataset is valid and can be registered") {
7694
withRawBlockingStub { implicit stub =>

0 commit comments

Comments
 (0)