From ba2accb2bf654312e0a6adb6f42ffdb1df70ce1e Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 15 Apr 2025 08:55:57 -0700 Subject: [PATCH 1/9] Alter table alter column to pass V2Expression to DSV2 --- .../sql/connector/catalog/TableChange.java | 39 +++++- .../sql/catalyst/parser/AstBuilder.scala | 6 +- .../plans/logical/ColumnDefinition.scala | 24 +++- .../plans/logical/v2AlterTableCommands.scala | 39 +++--- .../sql/catalyst/parser/DDLParserSuite.scala | 6 +- .../analysis/ResolveSessionCatalog.scala | 3 +- .../DataSourceV2DataFrameSuite.scala | 126 +++++++++++++++++- .../command/PlanResolutionSuite.scala | 4 +- 8 files changed, 212 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index a53962f8f3008..abd6e7d981149 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -22,9 +22,12 @@ import javax.annotation.Nullable; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.catalyst.plans.logical.DefaultValueExpression; import org.apache.spark.sql.connector.catalog.constraints.Constraint; +import org.apache.spark.sql.connector.expressions.Expression; import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.types.DataType; +import scala.Option; /** * TableChange subclasses represent requested changes to a table. These are passed to @@ -230,10 +233,29 @@ static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newP * If the field does not exist, the change will result in an {@link IllegalArgumentException}. * * @param fieldNames field names of the column to update - * @param newDefaultValue the new default value SQL string (Spark SQL dialect). + * @param newDefaultValue the new default value, or null if it is to be removed * @return a TableChange for the update + * + * @deprecated This is deprecated. Please use {@link #updateColumnDefaultValue(String[], DefaultValue)} + * instead. */ + @Deprecated(since = "4.1.0") static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefaultValue) { + return new UpdateColumnDefaultValue(fieldNames, new ColumnDefaultValue(newDefaultValue, null)); + } + + /** + * Create a TableChange for updating the default value of a field. + *

+ * The name is used to find the field to update. + *

+ * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the column to update + * @param newDefaultValue the new default value SQL string (Spark SQL dialect). + * @return a TableChange for the update + */ + static TableChange updateColumnDefaultValue(String[] fieldNames, DefaultValue newDefaultValue) { return new UpdateColumnDefaultValue(fieldNames, newDefaultValue); } @@ -709,9 +731,9 @@ public int hashCode() { */ final class UpdateColumnDefaultValue implements ColumnChange { private final String[] fieldNames; - private final String newDefaultValue; + private final DefaultValue newDefaultValue; - private UpdateColumnDefaultValue(String[] fieldNames, String newDefaultValue) { + private UpdateColumnDefaultValue(String[] fieldNames, DefaultValue newDefaultValue) { this.fieldNames = fieldNames; this.newDefaultValue = newDefaultValue; } @@ -722,11 +744,18 @@ public String[] fieldNames() { } /** - * Returns the column default value SQL string (Spark SQL dialect). The default value literal + * Returns the column default value SQL string. The default value literal * is not provided as updating column default values does not need to back-fill existing data. * Empty string means dropping the column default value. */ - public String newDefaultValue() { return newDefaultValue; } + public String newDefaultValue() { + return newDefaultValue == null ? "" : newDefaultValue.getSql(); + } + + /** + * Returns the column default value as {@link DefaultValue}. + */ + public DefaultValue newModelDefaultValue() { return newDefaultValue; } @Override public boolean equals(Object o) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d6be188d8a0f1..caa059d109089 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -5243,11 +5243,11 @@ class AstBuilder extends DataTypeAstBuilder } else { None } - val setDefaultExpression: Option[String] = + val setDefaultExpression: Option[DefaultValueExpression] = if (action.defaultExpression != null) { - Option(action.defaultExpression()).map(visitDefaultExpression).map(_.originalSQL) + Option(action.defaultExpression()).map(visitDefaultExpression) } else if (action.dropDefault != null) { - Some("") + Some(DefaultValueExpression(Literal(""), "")) } else { None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala index 8b33163a8d30c..040b6b2911067 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{ANALYSIS_AWARE_EXPRESSIO import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn, V2ExpressionBuilder} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.validateDefaultValueExpr import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY} -import org.apache.spark.sql.connector.catalog.{Column => V2Column, ColumnDefaultValue, IdentityColumnSpec} +import org.apache.spark.sql.connector.catalog.{Column => V2Column, ColumnDefaultValue, DefaultValue, IdentityColumnSpec} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.connector.ColumnImpl @@ -184,6 +185,18 @@ object ColumnDefinition { } } + case cmd: AlterColumns if cmd.specs.exists(_.newDefaultExpression.isDefined) => + // Wrap analysis errors for default values in a more user-friendly message. + cmd.specs.foreach { c => + c.newDefaultExpression.foreach { d => + if (!d.resolved) { + throw QueryCompilationErrors.defaultValuesUnresolvedExprError( + "ALTER TABLE ALTER COLUMN", c.column.name.quoted, d.originalSQL, null) + } + validateDefaultValueExpr(d, "ALTER TABLE", c.column.name.quoted, d.dataType) + } + } + case _ => } } @@ -241,4 +254,13 @@ case class DefaultValueExpression( case _ => throw QueryCompilationErrors.defaultValueNotConstantError(statement, colName, originalSQL) } + + // Convert the default expression to DefaultValue, which is required by DS v2 APIs. + def toV2CurrentDefault(statement: String, colName: String): DefaultValue = child match { + case Literal(_, _) => + val currentDefault = analyzedChild.flatMap(new V2ExpressionBuilder(_).build()) + new DefaultValue(originalSQL, currentDefault.orNull) + case _ => + throw QueryCompilationErrors.defaultValueNotConstantError(statement, colName, originalSQL) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index 25a48d60585cc..5ace272bbcb30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ClusterBySpec import org.apache.spark.sql.catalyst.expressions.{CheckConstraint, Expression, TableConstraint, Unevaluable} -import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, TypeUtils} +import org.apache.spark.sql.catalyst.util.{TypeUtils} import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.DataType import org.apache.spark.util.ArrayImplicits._ @@ -208,13 +209,25 @@ case class AlterColumnSpec( newNullability: Option[Boolean], newComment: Option[String], newPosition: Option[FieldPosition], - newDefaultExpression: Option[String]) extends Expression with Unevaluable { + newDefaultExpression: Option[DefaultValueExpression]) extends Expression with Unevaluable { - override def children: Seq[Expression] = Seq(column) ++ newPosition.toSeq + override def children: Seq[Expression] = Seq(column) ++ newPosition.toSeq ++ + newDefaultExpression.toSeq override def nullable: Boolean = false override def dataType: DataType = throw new UnresolvedException("dataType") - override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = - copy(column = newChildren(0).asInstanceOf[FieldName]) + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): Expression = { + val newColumn = newChildren(0).asInstanceOf[FieldName] + val newPosition = newChildren collectFirst { + case p: FieldPosition => p + } + val newDefault = newChildren collectFirst { + case d: DefaultValueExpression => d + } + copy(column = newColumn, newPosition = newPosition, newDefaultExpression = newDefault) + } + + } /** @@ -242,17 +255,11 @@ case class AlterColumns( "FieldPosition should be resolved before it's converted to TableChange.") TableChange.updateColumnPosition(colName, newPosition.position) } - val defaultValueChange = spec.newDefaultExpression.map { newDefaultExpression => - if (newDefaultExpression.nonEmpty) { - // SPARK-45075: We call 'ResolveDefaultColumns.analyze' here to make sure that the default - // value parses successfully, and return an error otherwise - val newDataType = spec.newDataType.getOrElse( - column.asInstanceOf[ResolvedFieldName].field.dataType) - ResolveDefaultColumns.analyze(column.name.last, newDataType, newDefaultExpression, - "ALTER TABLE ALTER COLUMN") - } - TableChange.updateColumnDefaultValue(colName, newDefaultExpression) + val defaultValueChange = spec.newDefaultExpression.map { newDefault => + TableChange.updateColumnDefaultValue(colName, + newDefault.toV2CurrentDefault("ALTER TABLE", column.name.quoted)) } + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ defaultValueChange } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 9e8e63dae9e19..e493b57b2c27b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2735,7 +2735,7 @@ class DDLParserSuite extends AnalysisTest { None, None, None, - Some("42"))))) + Some(DefaultValueExpression(Literal(42), "42")))))) // It is possible to pass an empty string default value using quotes. comparePlans( parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT ''"), @@ -2747,7 +2747,7 @@ class DDLParserSuite extends AnalysisTest { None, None, None, - Some("''"))))) + Some(DefaultValueExpression(Literal(""), "''")))))) // It is not possible to pass an empty string default value without using quotes. // This results in a parsing error. val sql1 = "ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT " @@ -2773,7 +2773,7 @@ class DDLParserSuite extends AnalysisTest { None, None, None, - Some(""))))) + Some(DefaultValueExpression(Literal(""), "")))))) // Make sure that the parser returns an exception when the feature is disabled. withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { val sql = "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 64cc9da1f1890..27368aa5ad1e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -105,7 +105,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } } // Add the current default column value string (if any) to the column metadata. - s.newDefaultExpression.map { c => builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, c) } + s.newDefaultExpression.map { c => builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, + c.originalSQL) } val newColumn = StructField( colName, dataType, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 5574814db625d..28c35b5b7ce5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, Identifier, InMemoryTableCatalog} -import org.apache.spark.sql.connector.catalog.TableChange.AddColumn +import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, GeneralScalarExpression, LiteralValue, Transform} import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan @@ -441,7 +441,7 @@ class DataSourceV2DataFrameSuite } } - test("alter table with complex foldable default values") { + test("alter table add column with complex foldable default values") { val tableName = "testcat.ns1.ns2.tbl" withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withTable(tableName) { @@ -484,6 +484,83 @@ class DataSourceV2DataFrameSuite } } + test("alter table alter column with complex foldable default values") { + val tableName = "testcat.ns1.ns2.tbl" + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName ( + | salary INT DEFAULT (100 + 23), + | dep STRING DEFAULT ('h' || 'r'), + | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN) + |) USING foo + |""".stripMargin) + + val alterExecCol1 = executeAndKeepPhysicalPlan[AlterTableExec] { + sql(s"ALTER TABLE $tableName ALTER COLUMN salary SET DEFAULT (123 + 56)") + } + checkDefaultValue( + alterExecCol1.changes.collect { + case u: UpdateColumnDefaultValue => u + }.head, + new DefaultValue( + "(123 + 56)", + new GeneralScalarExpression( + "+", + Array(LiteralValue(123, IntegerType), LiteralValue(56, IntegerType))))) + + val alterExecCol2 = executeAndKeepPhysicalPlan[AlterTableExec] { + sql(s"ALTER TABLE $tableName ALTER COLUMN salary SET DEFAULT ('r' || 'l')") + } + checkDefaultValue( + alterExecCol2.changes.collect { + case u: UpdateColumnDefaultValue => u + }.head, + new DefaultValue( + "('r' || 'l')", + new GeneralScalarExpression( + "CONCAT", + Array( + LiteralValue(UTF8String.fromString("r"), StringType), + LiteralValue(UTF8String.fromString("l"), StringType))))) + + val alterExecCol3 = executeAndKeepPhysicalPlan[AlterTableExec] { + sql(s"ALTER TABLE $tableName ALTER COLUMN salary SET DEFAULT CAST(0 AS BOOLEAN)") + } + checkDefaultValue( + alterExecCol3.changes.collect { + case u: UpdateColumnDefaultValue => u + }.head, + new DefaultValue( + "CAST(0 AS BOOLEAN)", + new V2Cast(LiteralValue(0, IntegerType), IntegerType, BooleanType))) + } + } + } + + test("alter table alter column drop default") { + val tableName = "testcat.ns1.ns2.tbl" + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName ( + | salary INT DEFAULT (100 + 23) + |) USING foo + |""".stripMargin) + + val alterExecCol = executeAndKeepPhysicalPlan[AlterTableExec] { + sql(s"ALTER TABLE $tableName ALTER COLUMN salary DROP DEFAULT") + } + checkDefaultValue( + alterExecCol.changes.collect { + case u: UpdateColumnDefaultValue => u + }.head, + new DefaultValue( + "", LiteralValue(UTF8String.fromString(""), StringType))) + } + } + test("create/replace table with current like default values") { val tableName = "testcat.ns1.ns2.tbl" withTable(tableName) { @@ -529,7 +606,7 @@ class DataSourceV2DataFrameSuite } } - test("alter table with current like default values") { + test("alter table add columns with current like default values") { val tableName = "testcat.ns1.ns2.tbl" withTable(tableName) { sql( @@ -560,6 +637,38 @@ class DataSourceV2DataFrameSuite } } + test("alter table alter column with current like default values") { + val tableName = "testcat.ns1.ns2.tbl" + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName ( + | dummy INT, + | cat STRING + |) USING foo + |""".stripMargin) + + val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] { + sql(s"ALTER TABLE $tableName ALTER COLUMN cat SET DEFAULT current_catalog()") + } + + checkDefaultValue( + alterExec.changes.collect { + case u: UpdateColumnDefaultValue => u + }.head, + new DefaultValue( + "current_catalog()", + null /* No V2 Expression */)) + + val df1 = Seq(1).toDF("dummy") + df1.writeTo(tableName).append() + + checkAnswer( + sql(s"SELECT * FROM $tableName"), + Seq(Row(1, "spark_catalog"))) + } + } + private def executeAndKeepPhysicalPlan[T <: SparkPlan](func: => Unit): T = { val qe = withQueryExecutionsCaptured(spark) { func @@ -594,4 +703,13 @@ class DataSourceV2DataFrameSuite s"expected $expectedDefault but found ${column.defaultValue}") } } + + private def checkDefaultValue( + column: UpdateColumnDefaultValue, + expectedDefault: DefaultValue): Unit = { + assert( + column.newModelDefaultValue() == expectedDefault, + s"Default value mismatch for column '${column.toString}': " + + s"expected $expectedDefault but found ${column.newDefaultValue()}") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index b4b3bff86471e..af948d9e99061 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{AlterColumns, AlterColumnSpec, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumns, AlterColumnSpec, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DefaultValueExpression, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.connector.FakeV2Provider @@ -1428,7 +1428,7 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { None, None, None, - Some("'value'")))) => + Some(DefaultValueExpression(_, _, _))))) => assert(column1.name == Seq("i")) assert(column2.name == Seq("s")) case _ => fail("expect AlterColumns") From 3539dd0579ef4afc2386b30dd108b24b40743df3 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 12 May 2025 22:06:24 -0700 Subject: [PATCH 2/9] Lint --- .../sql/connector/catalog/TableChange.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index abd6e7d981149..795a310865b48 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -22,12 +22,9 @@ import javax.annotation.Nullable; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.catalyst.plans.logical.DefaultValueExpression; import org.apache.spark.sql.connector.catalog.constraints.Constraint; -import org.apache.spark.sql.connector.expressions.Expression; import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.types.DataType; -import scala.Option; /** * TableChange subclasses represent requested changes to a table. These are passed to @@ -233,11 +230,11 @@ static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newP * If the field does not exist, the change will result in an {@link IllegalArgumentException}. * * @param fieldNames field names of the column to update - * @param newDefaultValue the new default value, or null if it is to be removed + * @param newDefaultValue the new default value SQL string (Spark SQL dialect). * @return a TableChange for the update * - * @deprecated This is deprecated. Please use {@link #updateColumnDefaultValue(String[], DefaultValue)} - * instead. + * @deprecated This is deprecated. Please use {@link #updateColumnDefaultValue( + * String[],DefaultValue)} instead. */ @Deprecated(since = "4.1.0") static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefaultValue) { @@ -252,7 +249,8 @@ static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefau * If the field does not exist, the change will result in an {@link IllegalArgumentException}. * * @param fieldNames field names of the column to update - * @param newDefaultValue the new default value SQL string (Spark SQL dialect). + * @param newDefaultValue the new default value SQL (Spark SQL dialect and + * V2 expression representation if it can be converted). * @return a TableChange for the update */ static TableChange updateColumnDefaultValue(String[] fieldNames, DefaultValue newDefaultValue) { @@ -744,7 +742,7 @@ public String[] fieldNames() { } /** - * Returns the column default value SQL string. The default value literal + * Returns the column default value SQL string (Spark SQL dialect). The default value literal * is not provided as updating column default values does not need to back-fill existing data. * Empty string means dropping the column default value. */ @@ -753,7 +751,9 @@ public String newDefaultValue() { } /** - * Returns the column default value as {@link DefaultValue}. + * Returns the column default value as {@link DefaultValue}. The default value literal + * is not provided as updating column default values does not need to back-fill existing data. + * Empty string means dropping the column default value. */ public DefaultValue newModelDefaultValue() { return newDefaultValue; } From dbd29ee9b5c0c3d5776d6e2514db70f66c83b1f3 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 13 May 2025 13:36:34 -0700 Subject: [PATCH 3/9] review comments --- .../plans/logical/v2AlterTableCommands.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index 5ace272bbcb30..b29b2e5bc48b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -218,13 +218,17 @@ case class AlterColumnSpec( override protected def withNewChildrenInternal( newChildren: IndexedSeq[Expression]): Expression = { val newColumn = newChildren(0).asInstanceOf[FieldName] - val newPosition = newChildren collectFirst { - case p: FieldPosition => p + val newPos = if (newPosition.isDefined) { + Some(newChildren(1).asInstanceOf[FieldPosition]) + } else { + None } - val newDefault = newChildren collectFirst { - case d: DefaultValueExpression => d + val newDefault = if (newDefaultExpression.isDefined) { + Some(newChildren.last.asInstanceOf[DefaultValueExpression]) + } else { + None } - copy(column = newColumn, newPosition = newPosition, newDefaultExpression = newDefault) + copy(column = newColumn, newPosition = newPos, newDefaultExpression = newDefault) } From f53ecec018e117531b5a23d730dd1338839b0d7d Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 14 May 2025 11:55:05 -0700 Subject: [PATCH 4/9] Review comments --- .../sql/connector/catalog/TableChange.java | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 2 +- .../plans/logical/ColumnDefinition.scala | 7 +++++- .../sql/catalyst/parser/DDLParserSuite.scala | 2 +- .../DataSourceV2DataFrameSuite.scala | 4 ++-- .../sql/connector/DataSourceV2SQLSuite.scala | 23 +++++++++++++++++++ 6 files changed, 34 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index 795a310865b48..1f3f4c7654aba 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -753,7 +753,7 @@ public String newDefaultValue() { /** * Returns the column default value as {@link DefaultValue}. The default value literal * is not provided as updating column default values does not need to back-fill existing data. - * Empty string means dropping the column default value. + * Empty string and Null literal means dropping the column default value. */ public DefaultValue newModelDefaultValue() { return newDefaultValue; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index caa059d109089..247dc6d8210a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -5247,7 +5247,7 @@ class AstBuilder extends DataTypeAstBuilder if (action.defaultExpression != null) { Option(action.defaultExpression()).map(visitDefaultExpression) } else if (action.dropDefault != null) { - Some(DefaultValueExpression(Literal(""), "")) + Some(DefaultValueExpression(Literal(null), "")) } else { None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala index 040b6b2911067..b4a58bf2c85ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala @@ -193,7 +193,12 @@ object ColumnDefinition { throw QueryCompilationErrors.defaultValuesUnresolvedExprError( "ALTER TABLE ALTER COLUMN", c.column.name.quoted, d.originalSQL, null) } - validateDefaultValueExpr(d, "ALTER TABLE", c.column.name.quoted, d.dataType) + validateDefaultValueExpr(d, "ALTER TABLE ALTER COLUMN", + c.column.name.quoted, d.dataType) + if (!d.deterministic) { + throw QueryCompilationErrors.defaultValueNonDeterministicError( + "ALTER TABLE ALTER COLUMN", c.column.name.quoted, d.originalSQL) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index e493b57b2c27b..5c47739f41074 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2773,7 +2773,7 @@ class DDLParserSuite extends AnalysisTest { None, None, None, - Some(DefaultValueExpression(Literal(""), "")))))) + Some(DefaultValueExpression(Literal(null), "")))))) // Make sure that the parser returns an exception when the feature is disabled. withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { val sql = "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 28c35b5b7ce5d..0873ebec5438f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, IntegerType, StringType} +import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, IntegerType, NullType, StringType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String @@ -557,7 +557,7 @@ class DataSourceV2DataFrameSuite case u: UpdateColumnDefaultValue => u }.head, new DefaultValue( - "", LiteralValue(UTF8String.fromString(""), StringType))) + "", LiteralValue(null, NullType))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 179aa0e0940fa..3e51e4b373716 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -3606,6 +3606,29 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-52116: alter column with default value which is not deterministic") { + val foldableExpressions = Seq("1", "2 + 1") + withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> v2Source) { + withTable("tab") { + spark.sql(s"CREATE TABLE tab (col1 DOUBLE DEFAULT 0) USING $v2Source") + val exception = analysisException( + // Rand function is not deterministic + s"ALTER TABLE tab ALTER COLUMN col1 SET DEFAULT rand()") + assert(exception.getSqlState == "42623") + assert(exception.errorClass.get == "INVALID_DEFAULT_VALUE.NON_DETERMINISTIC") + assert(exception.messageParameters("statement") == "ALTER TABLE ALTER COLUMN") + assert(exception.messageParameters("colName") == "`col1`") + assert(exception.messageParameters("defaultValue") == "rand()") + } + foldableExpressions.foreach(expr => { + withTable("tab") { + spark.sql(s"CREATE TABLE tab (col1 INT DEFAULT 100) USING $v2Source") + spark.sql(s"ALTER TABLE tab ADD COLUMN col2 DOUBLE DEFAULT $expr") + } + }) + } + } + test("SPARK-49099: Switch current schema with custom spark_catalog") { // Reset CatalogManager to clear the materialized `spark_catalog` instance, so that we can // configure a new implementation. From 0877f023e659fcbf7d957109d20a8250a4d2bfcd Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 14 May 2025 18:04:46 -0700 Subject: [PATCH 5/9] Rename new method and regenerate equals/hashcode --- .../sql/connector/catalog/TableChange.java | 19 ++++++++----------- .../DataSourceV2DataFrameSuite.scala | 2 +- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index 1f3f4c7654aba..9f5ea4078ae28 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -729,11 +729,11 @@ public int hashCode() { */ final class UpdateColumnDefaultValue implements ColumnChange { private final String[] fieldNames; - private final DefaultValue newDefaultValue; + private final DefaultValue newCurrentDefault; - private UpdateColumnDefaultValue(String[] fieldNames, DefaultValue newDefaultValue) { + private UpdateColumnDefaultValue(String[] fieldNames, DefaultValue newCurrentDefault) { this.fieldNames = fieldNames; - this.newDefaultValue = newDefaultValue; + this.newCurrentDefault = newCurrentDefault; } @Override @@ -747,7 +747,7 @@ public String[] fieldNames() { * Empty string means dropping the column default value. */ public String newDefaultValue() { - return newDefaultValue == null ? "" : newDefaultValue.getSql(); + return newCurrentDefault == null ? "" : newCurrentDefault.getSql(); } /** @@ -755,22 +755,19 @@ public String newDefaultValue() { * is not provided as updating column default values does not need to back-fill existing data. * Empty string and Null literal means dropping the column default value. */ - public DefaultValue newModelDefaultValue() { return newDefaultValue; } + public DefaultValue newCurrentDefault() { return newCurrentDefault; } @Override public boolean equals(Object o) { - if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o; - return Arrays.equals(fieldNames, that.fieldNames) && - newDefaultValue.equals(that.newDefaultValue()); + return Objects.deepEquals(fieldNames, that.fieldNames) && + Objects.equals(newCurrentDefault, that.newCurrentDefault); } @Override public int hashCode() { - int result = Objects.hash(newDefaultValue); - result = 31 * result + Arrays.hashCode(fieldNames); - return result; + return Objects.hash(Arrays.hashCode(fieldNames), newCurrentDefault); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 0873ebec5438f..677f6368d7cff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -708,7 +708,7 @@ class DataSourceV2DataFrameSuite column: UpdateColumnDefaultValue, expectedDefault: DefaultValue): Unit = { assert( - column.newModelDefaultValue() == expectedDefault, + column.newCurrentDefault() == expectedDefault, s"Default value mismatch for column '${column.toString}': " + s"expected $expectedDefault but found ${column.newDefaultValue()}") } From e8ea5ad27ed2e6c7218ca662b7fe6796c9cadcc2 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 15 May 2025 11:43:45 -0700 Subject: [PATCH 6/9] Properly model drop default with a boolean instead of using "" --- .../sql/connector/catalog/TableChange.java | 5 ++- .../sql/catalyst/analysis/Analyzer.scala | 3 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 16 +++---- .../plans/logical/v2AlterTableCommands.scala | 25 +++++++++-- .../sql/catalyst/parser/DDLParserSuite.scala | 3 +- .../DataSourceV2DataFrameSuite.scala | 45 ++++++++++++++++--- .../command/PlanResolutionSuite.scala | 18 +++++--- 8 files changed, 86 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index 9f5ea4078ae28..4d9756e12cd3c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -751,9 +751,10 @@ public String newDefaultValue() { } /** - * Returns the column default value as {@link DefaultValue}. The default value literal + * Returns the column default value as {@link DefaultValue} with a + * {@link org.apache.spark.sql.connector.expressions.Expression}. The default value literal * is not provided as updating column default values does not need to back-fill existing data. - * Empty string and Null literal means dropping the column default value. + * Null means dropping the column default value. */ public DefaultValue newCurrentDefault() { return newCurrentDefault; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0a5e46d98728f..34dd8af74e40a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3927,7 +3927,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case a @ AlterColumns(table: ResolvedTable, specs) => val resolvedSpecs = specs.map { - case s @ AlterColumnSpec(ResolvedFieldName(path, field), dataType, _, _, position, _) => + case s @ AlterColumnSpec( + ResolvedFieldName(path, field), dataType, _, _, position, _, _) => val newDataType = dataType.flatMap { dt => // Hive style syntax provides the column type, even if it may not have changed. val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 431be1dbe8e86..f34cf8ce53498 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1063,7 +1063,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString } } specs.foreach { - case AlterColumnSpec(col: ResolvedFieldName, dataType, nullable, _, _, _) => + case AlterColumnSpec(col: ResolvedFieldName, dataType, nullable, _, _, _, _) => val fieldName = col.name.quoted if (dataType.isDefined) { val field = CharVarcharUtils.getRawType(col.field.metadata) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 247dc6d8210a2..4efb22e474f23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -5244,19 +5244,16 @@ class AstBuilder extends DataTypeAstBuilder None } val setDefaultExpression: Option[DefaultValueExpression] = - if (action.defaultExpression != null) { - Option(action.defaultExpression()).map(visitDefaultExpression) - } else if (action.dropDefault != null) { - Some(DefaultValueExpression(Literal(null), "")) - } else { - None - } + Option(action.defaultExpression()).map(visitDefaultExpression) + if (setDefaultExpression.isDefined && !conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) { throw QueryParsingErrors.defaultColumnNotEnabledError(ctx) } + val dropDefault = action.dropDefault != null + assert(Seq(dataType, nullable, comment, position, setDefaultExpression) - .count(_.nonEmpty) == 1) + .count(_.nonEmpty) == 1 || dropDefault) AlterColumnSpec( UnresolvedFieldName(typedVisit[Seq[String]](spec.column)), @@ -5264,7 +5261,8 @@ class AstBuilder extends DataTypeAstBuilder nullable, comment, position, - setDefaultExpression) + setDefaultExpression, + dropDefault) } AlterColumns( createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index b29b2e5bc48b2..d1eb561f3add1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -21,8 +21,8 @@ import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, Unresol import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ClusterBySpec import org.apache.spark.sql.catalyst.expressions.{CheckConstraint, Expression, TableConstraint, Unevaluable} -import org.apache.spark.sql.catalyst.util.{TypeUtils} -import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange} +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.sql.connector.catalog.{DefaultValue, TableCatalog, TableChange} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.DataType @@ -203,13 +203,24 @@ case class RenameColumn( copy(table = newChild) } +/** + * The spec of the ALTER TABLE ... ALTER COLUMN command. + * @param column column to alter + * @param newDataType new data type of column if set + * @param newNullability new nullability of column if set + * @param newComment new comment of column if set + * @param newPosition new position of column if set + * @param newDefaultExpression new default expression if set + * @param dropDefault whether to drop the default expression + */ case class AlterColumnSpec( column: FieldName, newDataType: Option[DataType], newNullability: Option[Boolean], newComment: Option[String], newPosition: Option[FieldPosition], - newDefaultExpression: Option[DefaultValueExpression]) extends Expression with Unevaluable { + newDefaultExpression: Option[DefaultValueExpression], + dropDefault: Boolean = false) extends Expression with Unevaluable { override def children: Seq[Expression] = Seq(column) ++ newPosition.toSeq ++ newDefaultExpression.toSeq @@ -263,8 +274,14 @@ case class AlterColumns( TableChange.updateColumnDefaultValue(colName, newDefault.toV2CurrentDefault("ALTER TABLE", column.name.quoted)) } + val dropDefaultValue = if (spec.dropDefault) { + Some(TableChange.updateColumnDefaultValue(colName, null: DefaultValue)) + } else { + None + } - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ defaultValueChange + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ + defaultValueChange ++ dropDefaultValue } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 5c47739f41074..66eedde8e80dd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2773,7 +2773,8 @@ class DDLParserSuite extends AnalysisTest { None, None, None, - Some(DefaultValueExpression(Literal(null), "")))))) + None, + dropDefault = true)))) // Make sure that the parser returns an exception when the feature is disabled. withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { val sql = "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 677f6368d7cff..e68642c86c933 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, IntegerType, NullType, StringType} +import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, IntegerType, StringType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String @@ -552,12 +552,26 @@ class DataSourceV2DataFrameSuite val alterExecCol = executeAndKeepPhysicalPlan[AlterTableExec] { sql(s"ALTER TABLE $tableName ALTER COLUMN salary DROP DEFAULT") } - checkDefaultValue( - alterExecCol.changes.collect { + checkDropDefaultValue(alterExecCol.changes.collect { case u: UpdateColumnDefaultValue => u - }.head, - new DefaultValue( - "", LiteralValue(null, NullType))) + }.head) + } + } + + test("alter table alter column should not produce default value if unchanged") { + val tableName = "testcat.ns1.ns2.tbl" + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName ( + | salary INT DEFAULT (100 + 23) + |) USING foo + |""".stripMargin) + + val alterExecCol = executeAndKeepPhysicalPlan[AlterTableExec] { + sql(s"ALTER TABLE $tableName ALTER COLUMN salary COMMENT 'new comment'") + } + assert(!alterExecCol.changes.exists(_.isInstanceOf[UpdateColumnDefaultValue])) } } @@ -710,6 +724,23 @@ class DataSourceV2DataFrameSuite assert( column.newCurrentDefault() == expectedDefault, s"Default value mismatch for column '${column.toString}': " + - s"expected $expectedDefault but found ${column.newDefaultValue()}") + s"expected $expectedDefault but found ${column.newCurrentDefault()}") + assert( + column.newDefaultValue() == expectedDefault.getSql, + s"Default value mismatch for column '${column.toString}': " + + s"expected ${expectedDefault.getSql} but found ${column.newDefaultValue()}") + } + + private def checkDropDefaultValue( + column: UpdateColumnDefaultValue): Unit = { + assert( + column.newCurrentDefault() == null, + s"Default value mismatch for column '${column.toString}': " + + s"expected empty but found ${column.newCurrentDefault()}") + + assert( + column.newDefaultValue() == "", + s"Default value mismatch for column '${column.toString}': " + + s"expected empty but found ${column.newDefaultValue()}") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index af948d9e99061..ccf502d79c00d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1391,7 +1391,8 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { None, None, None, - None))) => + None, + false))) => assert(column.name == Seq("i")) case _ => fail("expect AlterColumns") } @@ -1405,7 +1406,8 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { None, Some("new comment"), None, - None))) => + None, + false))) => assert(column.name == Seq("i")) case _ => fail("expect AlterColumns") } @@ -1421,14 +1423,16 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { None, None, None, - None), + None, + false), AlterColumnSpec( column2: ResolvedFieldName, None, None, None, None, - Some(DefaultValueExpression(_, _, _))))) => + Some(DefaultValueExpression(_, _, _)), + false))) => assert(column1.name == Seq("i")) assert(column2.name == Seq("s")) case _ => fail("expect AlterColumns") @@ -1515,7 +1519,8 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { None, Some(comment), None, - None))) => + None, + false))) => assert(comment == "an index") case _ => fail("expect AlterTableAlterColumn with comment change only") } @@ -1529,7 +1534,8 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { None, Some(comment), None, - None))) => + None, + false))) => assert(comment == "an index") assert(dataType == LongType) case _ => fail("expect AlterTableAlterColumn with type and comment changes") From e176c62094c8733e2fea769a5d265c9f21a727dd Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 15 May 2025 17:30:36 -0700 Subject: [PATCH 7/9] Fix v1 --- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 27368aa5ad1e5..d7d8fc07b872f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -107,6 +107,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // Add the current default column value string (if any) to the column metadata. s.newDefaultExpression.map { c => builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, c.originalSQL) } + if (s.dropDefault) { + // for legacy reasons, "" means clearing default value + builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "") + } val newColumn = StructField( colName, dataType, From ec5c930945dce0615e811d477ca7de3f7247ff04 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 16 May 2025 10:12:37 -0700 Subject: [PATCH 8/9] Deprecate old method and regenerate equals/hashcode --- .../spark/sql/connector/catalog/TableChange.java | 12 ++++++++---- .../sql/connector/catalog/CatalogV2Util.scala | 14 ++++++-------- .../sql/connector/DataSourceV2DataFrameSuite.scala | 9 --------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index 4d9756e12cd3c..6644feea9f600 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -745,14 +745,16 @@ public String[] fieldNames() { * Returns the column default value SQL string (Spark SQL dialect). The default value literal * is not provided as updating column default values does not need to back-fill existing data. * Empty string means dropping the column default value. + * + * @deprecated Use {@link #newCurrentDefault()} instead. */ + @Deprecated(since = "4.1.0") public String newDefaultValue() { return newCurrentDefault == null ? "" : newCurrentDefault.getSql(); } /** - * Returns the column default value as {@link DefaultValue} with a - * {@link org.apache.spark.sql.connector.expressions.Expression}. The default value literal + * Returns the column default value as {@link DefaultValue}. The default value literal * is not provided as updating column default values does not need to back-fill existing data. * Null means dropping the column default value. */ @@ -762,13 +764,15 @@ public String newDefaultValue() { public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o; - return Objects.deepEquals(fieldNames, that.fieldNames) && + return Arrays.equals(fieldNames, that.fieldNames) && Objects.equals(newCurrentDefault, that.newCurrentDefault); } @Override public int hashCode() { - return Objects.hash(Arrays.hashCode(fieldNames), newCurrentDefault); + int result = Arrays.hashCode(fieldNames); + result = 31 * result + Objects.hashCode(newCurrentDefault); + return result; } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index d6fa7f58d61cf..ce7fe9c0590df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -276,16 +276,14 @@ private[sql] object CatalogV2Util { } case update: UpdateColumnDefaultValue => - replace(schema, update.fieldNames.toImmutableArraySeq, field => - // The new DEFAULT value string will be non-empty for any DDL commands that set the - // default value, such as "ALTER TABLE t ALTER COLUMN c SET DEFAULT ..." (this is - // enforced by the parser). On the other hand, commands that drop the default value such - // as "ALTER TABLE t ALTER COLUMN c DROP DEFAULT" will set this string to empty. - if (update.newDefaultValue().nonEmpty) { - Some(field.withCurrentDefaultValue(update.newDefaultValue())) + replace(schema, update.fieldNames.toImmutableArraySeq, field => { + val newDefault = update.newCurrentDefault() + if (newDefault != null) { + Some(field.withCurrentDefaultValue(newDefault.getSql)) } else { Some(field.clearCurrentDefaultValue()) - }) + } + }) case delete: DeleteColumn => replace(schema, delete.fieldNames.toImmutableArraySeq, _ => None, delete.ifExists) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index e68642c86c933..2513ac98a9f6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -725,10 +725,6 @@ class DataSourceV2DataFrameSuite column.newCurrentDefault() == expectedDefault, s"Default value mismatch for column '${column.toString}': " + s"expected $expectedDefault but found ${column.newCurrentDefault()}") - assert( - column.newDefaultValue() == expectedDefault.getSql, - s"Default value mismatch for column '${column.toString}': " + - s"expected ${expectedDefault.getSql} but found ${column.newDefaultValue()}") } private def checkDropDefaultValue( @@ -737,10 +733,5 @@ class DataSourceV2DataFrameSuite column.newCurrentDefault() == null, s"Default value mismatch for column '${column.toString}': " + s"expected empty but found ${column.newCurrentDefault()}") - - assert( - column.newDefaultValue() == "", - s"Default value mismatch for column '${column.toString}': " + - s"expected empty but found ${column.newDefaultValue()}") } } From d5a3b0ce4cd12e95b9db677a47fd5cb5bfae32fc Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 16 May 2025 10:16:34 -0700 Subject: [PATCH 9/9] Clarify a javadoc comment --- .../org/apache/spark/sql/connector/catalog/TableChange.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index 6644feea9f600..08754b18d6ebd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -233,8 +233,7 @@ static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newP * @param newDefaultValue the new default value SQL string (Spark SQL dialect). * @return a TableChange for the update * - * @deprecated This is deprecated. Please use {@link #updateColumnDefaultValue( - * String[],DefaultValue)} instead. + * @deprecated Please use {@link #updateColumnDefaultValue(String[], DefaultValue)} instead. */ @Deprecated(since = "4.1.0") static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefaultValue) { @@ -251,6 +250,7 @@ static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefau * @param fieldNames field names of the column to update * @param newDefaultValue the new default value SQL (Spark SQL dialect and * V2 expression representation if it can be converted). + * Null indicates dropping column default value * @return a TableChange for the update */ static TableChange updateColumnDefaultValue(String[] fieldNames, DefaultValue newDefaultValue) {