From b121bc11ddac7dc774e02d8509630614be3866ec Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 21:30:02 +0000 Subject: [PATCH 1/4] project target columns onto microbatch --- .../autocdc/Scd1BatchProcessor.scala | 50 +++++ .../autocdc/Scd1BatchProcessorSuite.scala | 182 ++++++++++++++++++ 2 files changed, 232 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index d50c30919ba8..6f6ca67dbf1e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -114,6 +114,56 @@ case class Scd1BatchProcessor( ) } + /** + * Project the user-defined column selection onto the microbatch. By this point the input + * microbatch should already have projected its CDC metadata, because it's possible that the + * user-defined column selection drops columns that are otherwise necessary to compute the + * CDC metadata. + * + * Returned dataframe's schema is: all of the user-selected columns in the input dataframe as per + * [[ChangeArgs.columnSelection]] + the CDC metadata column. + */ + def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = { + val ignoreColumnNameCase = + !microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + + // The schema of the microbatch less the system-projected CDC metadata column, i.e. the + // original microbatch schema. + val userColumnsInMicrobatchSchema = + StructType( + microbatchWithCdcMetadataDf.schema.fields.filterNot { field => + if (ignoreColumnNameCase) { + field.name.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) + } else { + field.name.equals(Scd1BatchProcessor.cdcMetadataColName) + } + } + ) + + val userSelectedColumnsInMicrobatchSchema = + ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = userColumnsInMicrobatchSchema, + columnSelection = changeArgs.columnSelection, + ignoreCase = ignoreColumnNameCase + ) + + // In addition to the explicit user-selected columns, re-project the operational CDC metadata + // column as the last column. + val finalColumnsInMicrobatchToSelect = + userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => { + // Spark drops backticks in the schema, quote all identifiers for safety before executing + // select. Identifiers could have special characters such as '.'. + F.col(QuotingUtils.quoteIdentifier(colName)) + }) :+ F.col( + Scd1BatchProcessor.cdcMetadataColName + ) + + microbatchWithCdcMetadataDf.select( + finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _* + ) + } + private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { val microbatchSqlConf = microbatch.sparkSession.sessionState.conf val resolver = microbatchSqlConf.resolver diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 1cb348316436..2de9b2fdc400 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -27,6 +27,20 @@ import org.apache.spark.sql.types._ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { + /** + * Test Schema for a microbatch that already has the SCD1 CDC metadata column projected. + */ + private val microbatchWithCdcMetadataSchema: StructType = new StructType() + .add("id", IntegerType) + .add("name", StringType) + .add("age", IntegerType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + ) + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) @@ -715,4 +729,172 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } } + + test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC metadata column " + + "when columnSelection is None") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)), + Row(2, "bob", 25, Row(20L, null)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = None + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // None selection is no-op on the user columns, and the CDC metadata column is unconditionally + // re-projected last, so the output shape exactly matches the input. + assert(result.schema.fieldNames.toSeq == microbatchWithCdcMetadataSchema.fieldNames.toSeq) + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "alice", 30, Row(null, 10L)), + Row(2, "bob", 25, Row(20L, null)) + ) + ) + } + + test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column even when " + + "IncludeColumns does not contain it") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch respects exclude column") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("age")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert( + result.schema.fieldNames.toSeq == + Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName) + ) + checkAnswer( + df = result, + expectedAnswer = Row(1, "alice", Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema order") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + // User specifies (age, id) -- intentionally different from the schema order (id, age). + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // Output column order follows the original microbatch schema (id before age), not the order + // in which the user listed columns in IncludeColumns. The CDC metadata column is appended + // last as always. + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch handles backticked column names containing a " + + "literal dot") { + val schema = new StructType() + .add("id", IntegerType) + // Even if a column is created with backticks via DDL, those backticks are consumed by Spark + // before resolving the schema; they won't show up in the schema field. + .add("user.id", StringType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)) + + val batch = microbatchOf(schema)( + Row(1, "u-100", Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq( + UnqualifiedColumnName("id"), + UnqualifiedColumnName("`user.id`") + ) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == + Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, "u-100", Row(null, 10L)) + ) + } } From 2094551bad163b0edc026c4f94a093843dff6d92 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:31:55 +0000 Subject: [PATCH 2/4] reuse applyToSchema --- .../autocdc/Scd1BatchProcessor.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 6f6ca67dbf1e..0a28e1d39cf0 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -127,18 +127,23 @@ case class Scd1BatchProcessor( val ignoreColumnNameCase = !microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis - // The schema of the microbatch less the system-projected CDC metadata column, i.e. the - // original microbatch schema. - val userColumnsInMicrobatchSchema = - StructType( - microbatchWithCdcMetadataDf.schema.fields.filterNot { field => - if (ignoreColumnNameCase) { - field.name.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) - } else { - field.name.equals(Scd1BatchProcessor.cdcMetadataColName) - } - } - ) + // Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e. + // the The user schema is the microbatch's schema after dropping the system columns - i.e the + // CDC metadata column. + + // We project out the system columns before applying user selection and project back in + // afterwards, so that users cannot control whether these [necessary] columns show up in the + // target table. + val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = microbatchWithCdcMetadataDf.schema, + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName)) + ) + ), + ignoreCase = ignoreColumnNameCase + ) val userSelectedColumnsInMicrobatchSchema = ColumnSelection.applyToSchema( From 10f8f0c3b8e6b029778d3ac556879e7d2a8ff506 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 18:18:50 +0000 Subject: [PATCH 3/4] rebase conflict --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 0a28e1d39cf0..122d6447df9e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -124,8 +124,8 @@ case class Scd1BatchProcessor( * [[ChangeArgs.columnSelection]] + the CDC metadata column. */ def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = { - val ignoreColumnNameCase = - !microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + val caseSensitiveColumnComparison = + microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis // Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e. // the The user schema is the microbatch's schema after dropping the system columns - i.e the @@ -142,7 +142,7 @@ case class Scd1BatchProcessor( Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName)) ) ), - ignoreCase = ignoreColumnNameCase + caseSensitive = caseSensitiveColumnComparison ) val userSelectedColumnsInMicrobatchSchema = @@ -150,7 +150,7 @@ case class Scd1BatchProcessor( schemaName = "microbatch", schema = userColumnsInMicrobatchSchema, columnSelection = changeArgs.columnSelection, - ignoreCase = ignoreColumnNameCase + caseSensitive = caseSensitiveColumnComparison ) // In addition to the explicit user-selected columns, re-project the operational CDC metadata From e53bcf49d21f8a9cdd23d9a7f9ea39695fd1a305 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 21 May 2026 19:05:39 +0000 Subject: [PATCH 4/4] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 9 ++--- .../autocdc/Scd1BatchProcessorSuite.scala | 35 +++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 122d6447df9e..03aaf284f070 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -127,12 +127,9 @@ case class Scd1BatchProcessor( val caseSensitiveColumnComparison = microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis - // Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e. - // the The user schema is the microbatch's schema after dropping the system columns - i.e the - // CDC metadata column. - - // We project out the system columns before applying user selection and project back in - // afterwards, so that users cannot control whether these [necessary] columns show up in the + // The user schema is the microbatch schema after dropping the system CDC metadata column. + // We project out the system column before applying user selection and project it back in + // afterwards, so that users cannot control whether this [necessary] column shows up in the // target table. val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema( schemaName = "microbatch", diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 2de9b2fdc400..a49c89e35755 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -897,4 +897,39 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { expectedAnswer = Row(1, "u-100", Row(null, 10L)) ) } + + test("projectTargetColumnsOntoMicrobatch resolves columnSelection case-insensitively " + + "when SQLConf.CASE_SENSITIVE=false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + // User columns intentionally use a different case than the schema (id, age). + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // Output column names follow the microbatch schema's casing, not the casing in the user's + // columnSelection. The CDC metadata column is appended last as always. + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + } }