Skip to content

Commit b003650

Browse files
jackywang-dbsryza
authored andcommitted
[SPARK-54191][SDP] Add once to Defineflow Proto
### What changes were proposed in this pull request? Add `once` to Defineflow Proto, to allow creating one-time back-fill flow ### Why are the changes needed? Allow new API argument for SDP flow. ### Does this PR introduce _any_ user-facing change? No, no API change yet ### How was this patch tested? Proto changes Closes #52890 from JiaqiWang18/SPARK-54191-StandaloneFlowDetails-proto. Authored-by: Jacky Wang <[email protected]> Signed-off-by: Sandy Ryza <[email protected]> (cherry picked from commit 1a724ba) Signed-off-by: Sandy Ryza <[email protected]>
1 parent 7069858 commit b003650

File tree

6 files changed

+92
-35
lines changed

6 files changed

+92
-35
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,6 +1407,12 @@
14071407
],
14081408
"sqlState" : "42623"
14091409
},
1410+
"DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED" : {
1411+
"message" : [
1412+
"Defining a one-time flow <flowName> with the 'once' option is not supported."
1413+
],
1414+
"sqlState" : "0A000"
1415+
},
14101416
"DESCRIBE_JSON_NOT_EXTENDED" : {
14111417
"message" : [
14121418
"DESCRIBE TABLE ... AS JSON only supported when [EXTENDED|FORMATTED] is specified.",

python/pyspark/sql/connect/proto/pipelines_pb2.py

Lines changed: 35 additions & 35 deletions
Large diffs are not rendered by default.

python/pyspark/sql/connect/proto/pipelines_pb2.pyi

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ class PipelineCommand(google.protobuf.message.Message):
588588
SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
589589
RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int
590590
EXTENSION_FIELD_NUMBER: builtins.int
591+
ONCE_FIELD_NUMBER: builtins.int
591592
dataflow_graph_id: builtins.str
592593
"""The graph to attach this flow to."""
593594
flow_name: builtins.str
@@ -612,6 +613,13 @@ class PipelineCommand(google.protobuf.message.Message):
612613
) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ...
613614
@property
614615
def extension(self) -> google.protobuf.any_pb2.Any: ...
616+
once: builtins.bool
617+
"""If true, define the flow as a one-time flow, such as for backfill.
618+
Set to true changes the flow in two ways:
619+
- The flow is run one time by default. If the pipeline is ran with a full refresh,
620+
the flow will run again.
621+
- The flow function must be a batch DataFrame, not a streaming DataFrame.
622+
"""
615623
def __init__(
616624
self,
617625
*,
@@ -624,6 +632,7 @@ class PipelineCommand(google.protobuf.message.Message):
624632
relation_flow_details: global___PipelineCommand.DefineFlow.WriteRelationFlowDetails
625633
| None = ...,
626634
extension: google.protobuf.any_pb2.Any | None = ...,
635+
once: builtins.bool | None = ...,
627636
) -> None: ...
628637
def HasField(
629638
self,
@@ -634,6 +643,8 @@ class PipelineCommand(google.protobuf.message.Message):
634643
b"_dataflow_graph_id",
635644
"_flow_name",
636645
b"_flow_name",
646+
"_once",
647+
b"_once",
637648
"_source_code_location",
638649
b"_source_code_location",
639650
"_target_dataset_name",
@@ -648,6 +659,8 @@ class PipelineCommand(google.protobuf.message.Message):
648659
b"extension",
649660
"flow_name",
650661
b"flow_name",
662+
"once",
663+
b"once",
651664
"relation_flow_details",
652665
b"relation_flow_details",
653666
"source_code_location",
@@ -665,6 +678,8 @@ class PipelineCommand(google.protobuf.message.Message):
665678
b"_dataflow_graph_id",
666679
"_flow_name",
667680
b"_flow_name",
681+
"_once",
682+
b"_once",
668683
"_source_code_location",
669684
b"_source_code_location",
670685
"_target_dataset_name",
@@ -679,6 +694,8 @@ class PipelineCommand(google.protobuf.message.Message):
679694
b"extension",
680695
"flow_name",
681696
b"flow_name",
697+
"once",
698+
b"once",
682699
"relation_flow_details",
683700
b"relation_flow_details",
684701
"source_code_location",
@@ -703,6 +720,10 @@ class PipelineCommand(google.protobuf.message.Message):
703720
self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"]
704721
) -> typing_extensions.Literal["flow_name"] | None: ...
705722
@typing.overload
723+
def WhichOneof(
724+
self, oneof_group: typing_extensions.Literal["_once", b"_once"]
725+
) -> typing_extensions.Literal["once"] | None: ...
726+
@typing.overload
706727
def WhichOneof(
707728
self,
708729
oneof_group: typing_extensions.Literal[

sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ message PipelineCommand {
152152
optional spark.connect.Relation relation = 1;
153153
}
154154

155+
// If true, define the flow as a one-time flow, such as for backfill.
156+
// Set to true changes the flow in two ways:
157+
// - The flow is run one time by default. If the pipeline is ran with a full refresh,
158+
// the flow will run again.
159+
// - The flow function must be a batch DataFrame, not a streaming DataFrame.
160+
optional bool once = 8;
161+
155162
message Response {
156163
// Fully qualified flow name that uniquely identify a flow in the Dataflow graph.
157164
optional string flow_name = 1;

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,11 @@ private[connect] object PipelinesHandler extends Logging {
267267
flow: proto.PipelineCommand.DefineFlow,
268268
transformRelationFunc: Relation => LogicalPlan,
269269
sessionHolder: SessionHolder): TableIdentifier = {
270+
if (flow.hasOnce) {
271+
throw new AnalysisException(
272+
"DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED",
273+
Map("flowName" -> flow.getFlowName))
274+
}
270275
val dataflowGraphId = flow.getDataflowGraphId
271276
val graphElementRegistry =
272277
sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,24 @@ class SparkDeclarativePipelinesServerSuite
7171

7272
}
7373

74+
gridTest("Define flow 'once' argument not supported")(Seq(true, false)) { onceValue =>
75+
val ex = intercept[Exception] {
76+
withRawBlockingStub { implicit stub =>
77+
val graphId = createDataflowGraph
78+
sendPlan(
79+
buildPlanFromPipelineCommand(
80+
PipelineCommand
81+
.newBuilder()
82+
.setDefineFlow(DefineFlow
83+
.newBuilder()
84+
.setDataflowGraphId(graphId)
85+
.setOnce(onceValue))
86+
.build()))
87+
}
88+
}
89+
assert(ex.getMessage.contains("DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED"))
90+
}
91+
7492
test(
7593
"Cross dependency between SQL dataset and non-SQL dataset is valid and can be registered") {
7694
withRawBlockingStub { implicit stub =>

0 commit comments

Comments
 (0)