Skip to content

[SPARK-52095][SQL] Alter table alter column to pass V2Expression to DSV2 #50864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,28 @@ static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newP
* @param fieldNames field names of the column to update
* @param newDefaultValue the new default value SQL string (Spark SQL dialect).
* @return a TableChange for the update
*
* @deprecated Please use {@link #updateColumnDefaultValue(String[], DefaultValue)} instead.
*/
@Deprecated(since = "4.1.0")
static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefaultValue) {
return new UpdateColumnDefaultValue(fieldNames, new ColumnDefaultValue(newDefaultValue, null));
}

/**
* Create a TableChange for updating the default value of a field.
* <p>
* The name is used to find the field to update.
* <p>
* If the field does not exist, the change will result in an {@link IllegalArgumentException}.
*
* @param fieldNames field names of the column to update
* @param newDefaultValue the new default value SQL (Spark SQL dialect and
* V2 expression representation if it can be converted).
* Null indicates dropping column default value
* @return a TableChange for the update
*/
static TableChange updateColumnDefaultValue(String[] fieldNames, DefaultValue newDefaultValue) {
return new UpdateColumnDefaultValue(fieldNames, newDefaultValue);
}

Expand Down Expand Up @@ -709,11 +729,11 @@ public int hashCode() {
*/
final class UpdateColumnDefaultValue implements ColumnChange {
private final String[] fieldNames;
private final String newDefaultValue;
private final DefaultValue newCurrentDefault;

private UpdateColumnDefaultValue(String[] fieldNames, String newDefaultValue) {
private UpdateColumnDefaultValue(String[] fieldNames, DefaultValue newCurrentDefault) {
this.fieldNames = fieldNames;
this.newDefaultValue = newDefaultValue;
this.newCurrentDefault = newCurrentDefault;
}

@Override
Expand All @@ -725,22 +745,33 @@ public String[] fieldNames() {
* Returns the column default value SQL string (Spark SQL dialect). The default value literal
* is not provided as updating column default values does not need to back-fill existing data.
* Empty string means dropping the column default value.
*
* @deprecated Use {@link #newCurrentDefault()} instead.
*/
@Deprecated(since = "4.1.0")
public String newDefaultValue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mark this function as deprecated and encourage users to call newCurrentDefault?

return newCurrentDefault == null ? "" : newCurrentDefault.getSql();
}

/**
* Returns the column default value as {@link DefaultValue}. The default value literal
* is not provided as updating column default values does not need to back-fill existing data.
* Null means dropping the column default value.
*/
public String newDefaultValue() { return newDefaultValue; }
public DefaultValue newCurrentDefault() { return newCurrentDefault; }

@Override
public boolean equals(Object o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you generate equals and hashCode using IDEA?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, whats the recommendation?

Copy link
Contributor Author

@szehon-ho szehon-ho May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry i used wrong template in Intellij, fixed

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o;
return Arrays.equals(fieldNames, that.fieldNames) &&
newDefaultValue.equals(that.newDefaultValue());
Objects.equals(newCurrentDefault, that.newCurrentDefault);
}

@Override
public int hashCode() {
int result = Objects.hash(newDefaultValue);
result = 31 * result + Arrays.hashCode(fieldNames);
int result = Arrays.hashCode(fieldNames);
result = 31 * result + Objects.hashCode(newCurrentDefault);
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3927,7 +3927,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor

case a @ AlterColumns(table: ResolvedTable, specs) =>
val resolvedSpecs = specs.map {
case s @ AlterColumnSpec(ResolvedFieldName(path, field), dataType, _, _, position, _) =>
case s @ AlterColumnSpec(
ResolvedFieldName(path, field), dataType, _, _, position, _, _) =>
val newDataType = dataType.flatMap { dt =>
// Hive style syntax provides the column type, even if it may not have changed.
val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
}
}
specs.foreach {
case AlterColumnSpec(col: ResolvedFieldName, dataType, nullable, _, _, _) =>
case AlterColumnSpec(col: ResolvedFieldName, dataType, nullable, _, _, _, _) =>
val fieldName = col.name.quoted
if (dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5243,28 +5243,26 @@ class AstBuilder extends DataTypeAstBuilder
} else {
None
}
val setDefaultExpression: Option[String] =
if (action.defaultExpression != null) {
Option(action.defaultExpression()).map(visitDefaultExpression).map(_.originalSQL)
} else if (action.dropDefault != null) {
Some("")
} else {
None
}
val setDefaultExpression: Option[DefaultValueExpression] =
Option(action.defaultExpression()).map(visitDefaultExpression)

if (setDefaultExpression.isDefined && !conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
}

val dropDefault = action.dropDefault != null

assert(Seq(dataType, nullable, comment, position, setDefaultExpression)
.count(_.nonEmpty) == 1)
.count(_.nonEmpty) == 1 || dropDefault)

AlterColumnSpec(
UnresolvedFieldName(typedVisit[Seq[String]](spec.column)),
dataType,
nullable,
comment,
position,
setDefaultExpression)
setDefaultExpression,
dropDefault)
}
AlterColumns(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{ANALYSIS_AWARE_EXPRESSIO
import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn, V2ExpressionBuilder}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.validateDefaultValueExpr
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY}
import org.apache.spark.sql.connector.catalog.{Column => V2Column, ColumnDefaultValue, IdentityColumnSpec}
import org.apache.spark.sql.connector.catalog.{Column => V2Column, ColumnDefaultValue, DefaultValue, IdentityColumnSpec}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.connector.ColumnImpl
Expand Down Expand Up @@ -184,6 +185,23 @@ object ColumnDefinition {
}
}

case cmd: AlterColumns if cmd.specs.exists(_.newDefaultExpression.isDefined) =>
// Wrap analysis errors for default values in a more user-friendly message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR but we can probably do the same error message improvement for V2CreateTablePlan

cmd.specs.foreach { c =>
c.newDefaultExpression.foreach { d =>
if (!d.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"ALTER TABLE ALTER COLUMN", c.column.name.quoted, d.originalSQL, null)
}
validateDefaultValueExpr(d, "ALTER TABLE ALTER COLUMN",
c.column.name.quoted, d.dataType)
if (!d.deterministic) {
throw QueryCompilationErrors.defaultValueNonDeterministicError(
"ALTER TABLE ALTER COLUMN", c.column.name.quoted, d.originalSQL)
}
}
}

case _ =>
}
}
Expand Down Expand Up @@ -241,4 +259,13 @@ case class DefaultValueExpression(
case _ =>
throw QueryCompilationErrors.defaultValueNotConstantError(statement, colName, originalSQL)
}

// Convert the default expression to DefaultValue, which is required by DS v2 APIs.
def toV2CurrentDefault(statement: String, colName: String): DefaultValue = child match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Alter table alter column" doesnt change 'exists default', so we can use the narrower 'DefaultValue' class here that doesnt have that field.

case Literal(_, _) =>
val currentDefault = analyzedChild.flatMap(new V2ExpressionBuilder(_).build())
new DefaultValue(originalSQL, currentDefault.orNull)
case _ =>
throw QueryCompilationErrors.defaultValueNotConstantError(statement, colName, originalSQL)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName, UnresolvedException}
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.{CheckConstraint, Expression, TableConstraint, Unevaluable}
import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, TypeUtils}
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.connector.catalog.{DefaultValue, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -202,19 +203,46 @@ case class RenameColumn(
copy(table = newChild)
}

/**
* The spec of the ALTER TABLE ... ALTER COLUMN command.
* @param column column to alter
* @param newDataType new data type of column if set
* @param newNullability new nullability of column if set
* @param newComment new comment of column if set
* @param newPosition new position of column if set
* @param newDefaultExpression new default expression if set
* @param dropDefault whether to drop the default expression
*/
case class AlterColumnSpec(
column: FieldName,
newDataType: Option[DataType],
newNullability: Option[Boolean],
newComment: Option[String],
newPosition: Option[FieldPosition],
newDefaultExpression: Option[String]) extends Expression with Unevaluable {
newDefaultExpression: Option[DefaultValueExpression],
dropDefault: Boolean = false) extends Expression with Unevaluable {

override def children: Seq[Expression] = Seq(column) ++ newPosition.toSeq
override def children: Seq[Expression] = Seq(column) ++ newPosition.toSeq ++
newDefaultExpression.toSeq
override def nullable: Boolean = false
override def dataType: DataType = throw new UnresolvedException("dataType")
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
copy(column = newChildren(0).asInstanceOf[FieldName])
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): Expression = {
val newColumn = newChildren(0).asInstanceOf[FieldName]
val newPos = if (newPosition.isDefined) {
Some(newChildren(1).asInstanceOf[FieldPosition])
} else {
None
}
val newDefault = if (newDefaultExpression.isDefined) {
Some(newChildren.last.asInstanceOf[DefaultValueExpression])
} else {
None
}
copy(column = newColumn, newPosition = newPos, newDefaultExpression = newDefault)
}


}

/**
Expand Down Expand Up @@ -242,18 +270,18 @@ case class AlterColumns(
"FieldPosition should be resolved before it's converted to TableChange.")
TableChange.updateColumnPosition(colName, newPosition.position)
}
val defaultValueChange = spec.newDefaultExpression.map { newDefaultExpression =>
if (newDefaultExpression.nonEmpty) {
// SPARK-45075: We call 'ResolveDefaultColumns.analyze' here to make sure that the default
// value parses successfully, and return an error otherwise
val newDataType = spec.newDataType.getOrElse(
column.asInstanceOf[ResolvedFieldName].field.dataType)
ResolveDefaultColumns.analyze(column.name.last, newDataType, newDefaultExpression,
"ALTER TABLE ALTER COLUMN")
}
TableChange.updateColumnDefaultValue(colName, newDefaultExpression)
val defaultValueChange = spec.newDefaultExpression.map { newDefault =>
TableChange.updateColumnDefaultValue(colName,
newDefault.toV2CurrentDefault("ALTER TABLE", column.name.quoted))
}
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ defaultValueChange
val dropDefaultValue = if (spec.dropDefault) {
Some(TableChange.updateColumnDefaultValue(colName, null: DefaultValue))
} else {
None
}

typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++
defaultValueChange ++ dropDefaultValue
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,14 @@ private[sql] object CatalogV2Util {
}

case update: UpdateColumnDefaultValue =>
replace(schema, update.fieldNames.toImmutableArraySeq, field =>
// The new DEFAULT value string will be non-empty for any DDL commands that set the
// default value, such as "ALTER TABLE t ALTER COLUMN c SET DEFAULT ..." (this is
// enforced by the parser). On the other hand, commands that drop the default value such
// as "ALTER TABLE t ALTER COLUMN c DROP DEFAULT" will set this string to empty.
if (update.newDefaultValue().nonEmpty) {
Some(field.withCurrentDefaultValue(update.newDefaultValue()))
replace(schema, update.fieldNames.toImmutableArraySeq, field => {
val newDefault = update.newCurrentDefault()
if (newDefault != null) {
Some(field.withCurrentDefaultValue(newDefault.getSql))
} else {
Some(field.clearCurrentDefaultValue())
})
}
})

case delete: DeleteColumn =>
replace(schema, delete.fieldNames.toImmutableArraySeq, _ => None, delete.ifExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2735,7 +2735,7 @@ class DDLParserSuite extends AnalysisTest {
None,
None,
None,
Some("42")))))
Some(DefaultValueExpression(Literal(42), "42"))))))
// It is possible to pass an empty string default value using quotes.
comparePlans(
parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT ''"),
Expand All @@ -2747,7 +2747,7 @@ class DDLParserSuite extends AnalysisTest {
None,
None,
None,
Some("''")))))
Some(DefaultValueExpression(Literal(""), "''"))))))
// It is not possible to pass an empty string default value without using quotes.
// This results in a parsing error.
val sql1 = "ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT "
Expand All @@ -2773,7 +2773,8 @@ class DDLParserSuite extends AnalysisTest {
None,
None,
None,
Some("")))))
None,
dropDefault = true))))
// Make sure that the parser returns an exception when the feature is disabled.
withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
val sql = "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
}
// Add the current default column value string (if any) to the column metadata.
s.newDefaultExpression.map { c => builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, c) }
s.newDefaultExpression.map { c => builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY,
c.originalSQL) }
if (s.dropDefault) {
// for legacy reasons, "" means clearing default value
builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "")
}
val newColumn = StructField(
colName,
dataType,
Expand Down
Loading