Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,12 @@
],
"sqlState" : "42623"
},
"DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED" : {
"message" : [
"Defining a one-time flow <flowName> with the 'once' option is not supported."
],
"sqlState" : "0A000"
},
"DESCRIBE_JSON_NOT_EXTENDED" : {
"message" : [
"DESCRIBE TABLE ... AS JSON only supported when [EXTENDED|FORMATTED] is specified.",
Expand Down
70 changes: 35 additions & 35 deletions python/pyspark/sql/connect/proto/pipelines_pb2.py

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions python/pyspark/sql/connect/proto/pipelines_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ class PipelineCommand(google.protobuf.message.Message):
SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
ONCE_FIELD_NUMBER: builtins.int
dataflow_graph_id: builtins.str
"""The graph to attach this flow to."""
flow_name: builtins.str
Expand All @@ -612,6 +613,13 @@ class PipelineCommand(google.protobuf.message.Message):
) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ...
@property
def extension(self) -> google.protobuf.any_pb2.Any: ...
once: builtins.bool
"""If true, define the flow as a one-time flow, such as for backfill.
Set to true changes the flow in two ways:
- The flow is run one time by default. If the pipeline is ran with a full refresh,
the flow will run again.
- The flow function must be a batch DataFrame, not a streaming DataFrame.
"""
def __init__(
self,
*,
Expand All @@ -624,6 +632,7 @@ class PipelineCommand(google.protobuf.message.Message):
relation_flow_details: global___PipelineCommand.DefineFlow.WriteRelationFlowDetails
| None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
once: builtins.bool | None = ...,
) -> None: ...
def HasField(
self,
Expand All @@ -634,6 +643,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"_dataflow_graph_id",
"_flow_name",
b"_flow_name",
"_once",
b"_once",
"_source_code_location",
b"_source_code_location",
"_target_dataset_name",
Expand All @@ -648,6 +659,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"extension",
"flow_name",
b"flow_name",
"once",
b"once",
"relation_flow_details",
b"relation_flow_details",
"source_code_location",
Expand All @@ -665,6 +678,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"_dataflow_graph_id",
"_flow_name",
b"_flow_name",
"_once",
b"_once",
"_source_code_location",
b"_source_code_location",
"_target_dataset_name",
Expand All @@ -679,6 +694,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"extension",
"flow_name",
b"flow_name",
"once",
b"once",
"relation_flow_details",
b"relation_flow_details",
"source_code_location",
Expand All @@ -703,6 +720,10 @@ class PipelineCommand(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"]
) -> typing_extensions.Literal["flow_name"] | None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_once", b"_once"]
) -> typing_extensions.Literal["once"] | None: ...
@typing.overload
def WhichOneof(
self,
oneof_group: typing_extensions.Literal[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ message PipelineCommand {
optional spark.connect.Relation relation = 1;
}

// If true, define the flow as a one-time flow, such as for backfill.
// Set to true changes the flow in two ways:
// - The flow is run one time by default. If the pipeline is ran with a full refresh,
// the flow will run again.
// - The flow function must be a batch DataFrame, not a streaming DataFrame.
optional bool once = 8;

message Response {
// Fully qualified flow name that uniquely identify a flow in the Dataflow graph.
optional string flow_name = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ private[connect] object PipelinesHandler extends Logging {
flow: proto.PipelineCommand.DefineFlow,
transformRelationFunc: Relation => LogicalPlan,
sessionHolder: SessionHolder): TableIdentifier = {
if (flow.hasOnce) {
throw new AnalysisException(
"DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED",
Map("flowName" -> flow.getFlowName))
}
val dataflowGraphId = flow.getDataflowGraphId
val graphElementRegistry =
sessionHolder.dataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ class SparkDeclarativePipelinesServerSuite

}

gridTest("Define flow 'once' argument not supported")(Seq(true, false)) { onceValue =>
val ex = intercept[Exception] {
withRawBlockingStub { implicit stub =>
val graphId = createDataflowGraph
sendPlan(
buildPlanFromPipelineCommand(
PipelineCommand
.newBuilder()
.setDefineFlow(DefineFlow
.newBuilder()
.setDataflowGraphId(graphId)
.setOnce(onceValue))
.build()))
}
}
assert(ex.getMessage.contains("DEFINE_FLOW_ONCE_OPTION_NOT_SUPPORTED"))
}

test(
"Cross dependency between SQL dataset and non-SQL dataset is valid and can be registered") {
withRawBlockingStub { implicit stub =>
Expand Down