-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54172][SQL] Merge Into Schema Evolution should only add referenced columns #52866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-54172][SQL] Merge Into Schema Evolution should only add referenced columns #52866
Conversation
41731d2 to
6c6de51
Compare
| notMatchedActions = newNotMatchedActions, | ||
| notMatchedBySourceActions = newNotMatchedBySourceActions) | ||
| notMatchedBySourceActions = newNotMatchedBySourceActions, | ||
| originalSourceActions = newMatchedActions ++ newNotMatchedActions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: because (not)matchedActions actions get changed by rule: ResolveRowLevelCommandAssignments, i need to preserve the original user actions/assignments so that MergeIntoTable.referencedSourceSchema and needsSchemaEvolution is idempotent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's turn it into a code comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we keep referencedSourceSchema directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, i tried referencedSchema, like:
def apply(
targetTable: LogicalPlan,
sourceTable: LogicalPlan,
mergeCondition: Expression,
matchedActions: Seq[MergeAction],
notMatchedActions: Seq[MergeAction],
notMatchedBySourceActions: Seq[MergeAction],
withSchemaEvolution: Boolean): MergeIntoTable = {
MergeIntoTable(
targetTable,
sourceTable,
mergeCondition,
matchedActions,
notMatchedActions,
notMatchedBySourceActions,
withSchemaEvolution,
referencedSourceSchema(
matchedActions ++ notMatchedActions,
sourceTable.schema))
However, it tries to call the schema method a bit too early, before the "stars" are resolved:
[INTERNAL_ERROR] Invalid call to toAttribute on unresolved object SQLSTATE: XX000
org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to toAttribute on unresolved object SQLSTATE: XX000
at org.apache.spark.sql.catalyst.analysis.Star.toAttribute(unresolved.scala:460)
at org.apache.spark.sql.catalyst.analysis.Star.toAttribute$(unresolved.scala:460)
at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.toAttribute(unresolved.scala:864)
at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:75)
at scala.collection.immutable.List.map(List.scala:236)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:75)
at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.output(basicLogicalOperators.scala:1739)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$_schema$1(QueryPlan.scala:467)
at org.apache.spark.util.BestEffortLazyVal.apply(BestEffortLazyVal.scala:53)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:464)
at org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable$.apply(v2Commands.scala:939)
at org.apache.spark.sql.catalyst.parser.AstBuilder.$anonfun$visitMergeIntoTable$1(AstBuilder.scala:1152)
|
@cloud-fan @aokolnychyi can you take a look? i think this is an important improvement to get in before we release MERGE INTO WITH SCHEMA EVOLUTION feature in Spark 4.1, thanks! |
6c6de51 to
24b1a51
Compare
| case _ => false | ||
| } | ||
|
|
||
| def filterSchema(sourceSchema: StructType, basePath: Seq[String]): StructType = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya do you know any existing util functions from nested column pruning to do this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this looks like particular for merge action, right?
| |USING source s | ||
| |ON t.pk = s.pk | ||
| |WHEN MATCHED THEN | ||
| | UPDATE SET dep='software' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is weird, dep is an existing column in the target table, and we for sure do not need to do schema evolution. What was the behavior before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh its because source table has more colunns but they are not used..
| |ON t.pk = s.pk | ||
| |WHEN NOT MATCHED THEN | ||
| | INSERT (pk, info, dep) VALUES (s.pk, | ||
| | named_struct('salary', s.info.salary, 'status', 'active'), 'marketing') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we trigger schema evolution for this case?
What changes were proposed in this pull request?
Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that are referenced in the MERGE INTO query via UPDATE or INSERT statements.
Why are the changes needed?
#51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement.
Does this PR introduce any user-facing change?
No, MERGE INTO schema evolution is not yet released in Spark 4.1.
How was this patch tested?
Added many unit tests in MergeIntoTableSuiteBase
Was this patch authored or co-authored using generative AI tooling?
No