-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56961][SQL] Pass all options while loading changelog #56044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
998761f
d43dd71
050ea35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.catalyst.rules.Rule | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogContext} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.errors.QueryCompilationErrors | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -123,7 +123,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) if !table.resolved => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val changelog = table.changelog | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val req = evaluateRequirements(changelog, table.changelogInfo) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val req = evaluateRequirements(changelog, table.changelogContext) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val resolvedRel = rel.copy(table = table.copy(resolved = true)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var updatedRel: LogicalPlan = resolvedRel | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -140,14 +140,14 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val rowIdExprs = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| V2ExpressionUtils.resolveRefs[NamedExpression](changelog.rowId().toSeq, resolvedRel) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel = injectNetChangeComputation( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel, rowIdExprs, table.changelogInfo.computeUpdates()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel, rowIdExprs, table.changelogContext.computeUpdates()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, _) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !table.resolved => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val changelog = table.changelog | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val req = evaluateRequirements(changelog, table.changelogInfo) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val req = evaluateRequirements(changelog, table.changelogContext) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val resolvedRel = rel.copy(table = table.copy(resolved = true)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var updatedRel: LogicalPlan = resolvedRel | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -164,7 +164,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // output, so name-based resolution against `updatedRel` recovers the right | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // attributes regardless of any preceding wrapping. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel = addStreamingNetChangeComputation( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel, changelog, table.changelogInfo.computeUpdates()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel, changelog, table.changelogContext.computeUpdates()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| updatedRel | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -175,7 +175,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Captures which post-processing passes a CDC query requires, derived from the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * user-provided [[ChangelogInfo]] options and the connector-declared [[Changelog]] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * user-provided [[ChangelogContext]] options and the connector-declared [[Changelog]] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * capability flags. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private case class PostProcessingRequirements( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -194,22 +194,22 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private def evaluateRequirements( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| changelog: Changelog, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options: ChangelogInfo): PostProcessingRequirements = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options: ChangelogContext): PostProcessingRequirements = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val requiresCarryOverRemoval = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| changelog.containsCarryoverRows() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val requiresUpdateDetection = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val requiresNetChanges = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| changelog.containsIntermediateChanges() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If carry-overs are surfaced and update detection is enabled without carry-over | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // removal, carry-overs would be falsely classified as updates, leading to wrong | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // results. Hence we throw. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (requiresUpdateDetection && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| changelog.containsCarryoverRows() && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| options.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
195
to
+212
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw QueryCompilationErrors.cdcUpdateDetectionRequiresCarryOverRemoval( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| changelog.name()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rename this given that we will have to pass options to
loadTableas well. Right now, we already haveTableInfothat we use for CREATE table cases. We will NOT be able to useTableInfofor loading tables. SoChangelogInfoconflicts withTableInfoas it is used for loading, not creation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative name to consider is
ChangelogParametersbut context seems a better fit.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts, @gengliangwang @cloud-fan @johanl-db?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My proposal would be:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We concluded that passing all options to load methods for tables and changelog is a requirement for external connectors like Delta and Iceberg.