diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index d50c30919ba8..03aaf284f070 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -114,6 +114,58 @@ case class Scd1BatchProcessor( ) } + /** + * Project the user-defined column selection onto the microbatch. By this point the input + * microbatch should already have projected its CDC metadata, because it's possible that the + * user-defined column selection drops columns that are otherwise necessary to compute the + * CDC metadata. + * + * Returned dataframe's schema is: all of the user-selected columns in the input dataframe as per + * [[ChangeArgs.columnSelection]] + the CDC metadata column. + */ + def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = { + val caseSensitiveColumnComparison = + microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + + // The user schema is the microbatch schema after dropping the system CDC metadata column. + // We project out the system column before applying user selection and project it back in + // afterwards, so that users cannot control whether this [necessary] column shows up in the + // target table. + val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = microbatchWithCdcMetadataDf.schema, + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName)) + ) + ), + caseSensitive = caseSensitiveColumnComparison + ) + + val userSelectedColumnsInMicrobatchSchema = + ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = userColumnsInMicrobatchSchema, + columnSelection = changeArgs.columnSelection, + caseSensitive = caseSensitiveColumnComparison + ) + + // In addition to the explicit user-selected columns, re-project the operational CDC metadata + // column as the last column. + val finalColumnsInMicrobatchToSelect = + userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => { + // Spark drops backticks in the schema, quote all identifiers for safety before executing + // select. Identifiers could have special characters such as '.'. + F.col(QuotingUtils.quoteIdentifier(colName)) + }) :+ F.col( + Scd1BatchProcessor.cdcMetadataColName + ) + + microbatchWithCdcMetadataDf.select( + finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _* + ) + } + private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { val microbatchSqlConf = microbatch.sparkSession.sessionState.conf val resolver = microbatchSqlConf.resolver diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 1cb348316436..a49c89e35755 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -27,6 +27,20 @@ import org.apache.spark.sql.types._ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { + /** + * Test Schema for a microbatch that already has the SCD1 CDC metadata column projected. + */ + private val microbatchWithCdcMetadataSchema: StructType = new StructType() + .add("id", IntegerType) + .add("name", StringType) + .add("age", IntegerType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + ) + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) @@ -715,4 +729,207 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } } + + test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC metadata column " + + "when columnSelection is None") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)), + Row(2, "bob", 25, Row(20L, null)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = None + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // None selection is no-op on the user columns, and the CDC metadata column is unconditionally + // re-projected last, so the output shape exactly matches the input. + assert(result.schema.fieldNames.toSeq == microbatchWithCdcMetadataSchema.fieldNames.toSeq) + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "alice", 30, Row(null, 10L)), + Row(2, "bob", 25, Row(20L, null)) + ) + ) + } + + test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column even when " + + "IncludeColumns does not contain it") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch respects exclude column") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("age")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert( + result.schema.fieldNames.toSeq == + Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName) + ) + checkAnswer( + df = result, + expectedAnswer = Row(1, "alice", Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema order") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + // User specifies (age, id) -- intentionally different from the schema order (id, age). + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // Output column order follows the original microbatch schema (id before age), not the order + // in which the user listed columns in IncludeColumns. The CDC metadata column is appended + // last as always. + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch handles backticked column names containing a " + + "literal dot") { + val schema = new StructType() + .add("id", IntegerType) + // Even if a column is created with backticks via DDL, those backticks are consumed by Spark + // before resolving the schema; they won't show up in the schema field. + .add("user.id", StringType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)) + + val batch = microbatchOf(schema)( + Row(1, "u-100", Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq( + UnqualifiedColumnName("id"), + UnqualifiedColumnName("`user.id`") + ) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == + Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, "u-100", Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch resolves columnSelection case-insensitively " + + "when SQLConf.CASE_SENSITIVE=false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + // User columns intentionally use a different case than the schema (id, age). + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // Output column names follow the microbatch schema's casing, not the casing in the user's + // columnSelection. The CDC metadata column is appended last as always. + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + } }