Skip to content

Conversation

@szehon-ho
Copy link
Member

What changes were proposed in this pull request?

Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that are referenced in the MERGE INTO query via UPDATE or INSERT statements.

Why are the changes needed?

#51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement.

Does this PR introduce any user-facing change?

No, MERGE INTO schema evolution is not yet released in Spark 4.1.

How was this patch tested?

Added many unit tests in MergeIntoTableSuiteBase

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Nov 4, 2025
@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch 3 times, most recently from 41731d2 to 6c6de51 Compare November 4, 2025 20:02
notMatchedActions = newNotMatchedActions,
notMatchedBySourceActions = newNotMatchedBySourceActions)
notMatchedBySourceActions = newNotMatchedBySourceActions,
originalSourceActions = newMatchedActions ++ newNotMatchedActions)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: because (not)matchedActions actions get changed by rule: ResolveRowLevelCommandAssignments, i need to preserve the original user actions/assignments so that MergeIntoTable.referencedSourceSchema and needsSchemaEvolution is idempotent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's turn it into a code comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we keep referencedSourceSchema directly?

Copy link
Member Author

@szehon-ho szehon-ho Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, i tried referencedSchema, like:

  def apply(
      targetTable: LogicalPlan,
      sourceTable: LogicalPlan,
      mergeCondition: Expression,
      matchedActions: Seq[MergeAction],
      notMatchedActions: Seq[MergeAction],
      notMatchedBySourceActions: Seq[MergeAction],
      withSchemaEvolution: Boolean): MergeIntoTable = {
    MergeIntoTable(
      targetTable,
      sourceTable,
      mergeCondition,
      matchedActions,
      notMatchedActions,
      notMatchedBySourceActions,
      withSchemaEvolution,
      referencedSourceSchema(
        matchedActions ++ notMatchedActions,
        sourceTable.schema))

However, it tries to call the schema method a bit too early, before the "stars" are resolved:

[INTERNAL_ERROR] Invalid call to toAttribute on unresolved object SQLSTATE: XX000
org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to toAttribute on unresolved object SQLSTATE: XX000
	at org.apache.spark.sql.catalyst.analysis.Star.toAttribute(unresolved.scala:460)
	at org.apache.spark.sql.catalyst.analysis.Star.toAttribute$(unresolved.scala:460)
	at org.apache.spark.sql.catalyst.analysis.UnresolvedStar.toAttribute(unresolved.scala:864)
	at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:75)
	at scala.collection.immutable.List.map(List.scala:236)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:75)
	at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.output(basicLogicalOperators.scala:1739)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$_schema$1(QueryPlan.scala:467)
	at org.apache.spark.util.BestEffortLazyVal.apply(BestEffortLazyVal.scala:53)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:464)
	at org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable$.apply(v2Commands.scala:939)
	at org.apache.spark.sql.catalyst.parser.AstBuilder.$anonfun$visitMergeIntoTable$1(AstBuilder.scala:1152)

@szehon-ho
Copy link
Member Author

@cloud-fan @aokolnychyi can you take a look? i think this is an important improvement to get in before we release MERGE INTO WITH SCHEMA EVOLUTION feature in Spark 4.1, thanks!

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 6c6de51 to 24b1a51 Compare November 4, 2025 20:06
case _ => false
}

def filterSchema(sourceSchema: StructType, basePath: Seq[String]): StructType =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya do you know any existing util functions from nested column pruning to do this work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this looks like particular for merge action, right?

|USING source s
|ON t.pk = s.pk
|WHEN MATCHED THEN
| UPDATE SET dep='software'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is weird, dep is an existing column in the target table, and we for sure do not need to do schema evolution. What was the behavior before this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh its because source table has more colunns but they are not used..

|ON t.pk = s.pk
|WHEN NOT MATCHED THEN
| INSERT (pk, info, dep) VALUES (s.pk,
| named_struct('salary', s.info.salary, 'status', 'active'), 'marketing')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we trigger schema evolution for this case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants