diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 889ecf9f7b08a..8536c6385f2bd 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -191,6 +191,24 @@ ], "sqlState" : "0A000" }, + "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : { + "message" : [ + "Using column name comparison, the following columns are not present in the schema: . Available columns: ." + ], + "sqlState" : "42703" + }, + "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : { + "message" : [ + "Expected a single column identifier; got the multi-part identifier (parts: )." + ], + "sqlState" : "42703" + }, + "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : { + "message" : [ + "Using column name comparison, the column `` in the schema conflicts with the reserved AutoCDC column name ``. Rename or remove the column." + ], + "sqlState" : "42710" + }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { "message" : [ "Cannot write null value for field defined as non-null Avro data type .", diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala new file mode 100644 index 0000000000000..1c87068fca291 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.types.StructType + +/** + * A single, unqualified column identifier (no nested path or table/alias qualifier). Backticks + * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for direct schema-fieldName + * comparison and [[quoted]] for APIs that re-parse identifier strings. + */ +case class UnqualifiedColumnName private (name: String) { + def quoted: String = QuotingUtils.quoteIdentifier(name) +} + +object UnqualifiedColumnName { + def apply(input: String): UnqualifiedColumnName = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(input) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(input, nameParts) + } + new UnqualifiedColumnName(nameParts.head) + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + +sealed trait ColumnSelection +object ColumnSelection { + + case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends ColumnSelection + case class ExcludeColumns(columns: Seq[UnqualifiedColumnName]) + extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. Field + * order follows the original schema; filtering happens in place. + */ + def applyToSchema( + schemaName: String, + schema: StructType, + columnSelection: Option[ColumnSelection], + ignoreCase: Boolean): StructType = columnSelection match { + case None => + // A None column selection is interpreted as a no-op. + schema + case Some(IncludeColumns(cols)) => + val keepIndices = lookupFieldIndices(schemaName, schema, cols, ignoreCase) + StructType(schema.fields.zipWithIndex.collect { + case (field, idx) if keepIndices.contains(idx) => field + }) + case Some(ExcludeColumns(cols)) => + val dropIndices = lookupFieldIndices(schemaName, schema, cols, ignoreCase) + StructType(schema.fields.zipWithIndex.collect { + case (field, idx) if !dropIndices.contains(idx) => field + }) + } + + private def lookupFieldIndices( + schemaName: String, + schema: StructType, + fields: Seq[UnqualifiedColumnName], + ignoreCase: Boolean): Set[Int] = { + val caseAwareGetFieldIndex: String => Option[Int] = + if (ignoreCase) schema.getFieldIndexCaseInsensitive else schema.getFieldIndex + + val fieldIndexResolutions = fields.map(f => f -> caseAwareGetFieldIndex(f.name)) + val missingFieldNames = fieldIndexResolutions.collect { case (f, None) => f.name }.distinct + if (missingFieldNames.nonEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of(ignoreCase), + "schemaName" -> schemaName, + "missingColumns" -> missingFieldNames.mkString(", "), + "availableColumns" -> schema.fieldNames.mkString(", ") + ) + ) + } + fieldIndexResolutions.flatMap { case (_, idx) => idx }.toSet + } +} + +/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */ +private[autocdc] object CaseSensitivityLabels { + val CaseSensitive: String = "case-sensitive" + val CaseInsensitive: String = "case-insensitive" + + def of(ignoreCase: Boolean): String = + if (ignoreCase) CaseInsensitive else CaseSensitive +} + +/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ +sealed trait ScdType + +object ScdType { + /** Representation for the standard SCD1 strategy. */ + case object Type1 extends ScdType + /** Representation for the standard SCD2 strategy. */ + case object Type2 extends ScdType +} + +/** + * Configuration for an AutoCDC flow. + * + * @param keys The column(s) that uniquely identify a row in the source data. + * @param sequencing Expression ordering CDC events to correctly resolve out-of-order + * arrivals. Must be a sortable type. + * @param deleteCondition Expression that marks a source row as a DELETE. When None, all + * rows are treated as upserts. + * @param storedAsScdType The SCD strategy these args should be applied to. + * @param columnSelection Which source columns to select in the target table. None means + * all columns. + */ +case class ChangeArgs( + keys: Seq[UnqualifiedColumnName], + sequencing: Column, + storedAsScdType: ScdType, + deleteCondition: Option[Column] = None, + columnSelection: Option[ColumnSelection] = None +) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala new file mode 100644 index 0000000000000..50b635c4ba2b8 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +/** Shared helpers for the out-of-order CDC merge implementations (SCD Type 1 and Type 2). */ +private[autocdc] object OutOfOrderCdcMergeUtils { + + /** + * Build a synthetic column name with a UUID suffix so it cannot collide with any user + * column. Intended for transient columns attached during merge processing (e.g. holding + * intermediate aggregation outputs, carrying per-key state through a join, etc.). + * + * Each invocation produces a fresh name, so callers should remember the returned string if + * they need to reference the same column from multiple sites within a single merge plan. + */ + def tempColName(prefix: String): String = + s"${prefix}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala new file mode 100644 index 0000000000000..0c0dae6391b4a --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkException +import org.apache.spark.sql.{functions => F, AnalysisException} +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.ArrayImplicits._ + +/** + * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] + * configuration. + * + * @param changeArgs The CDC flow configuration. + * @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived + * from the flow's resolved DataFrame at flow setup time. + */ +case class Scd1BatchProcessor( + changeArgs: ChangeArgs, + resolvedSequencingType: DataType) { + + /** + * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key + * as ordered by [[ChangeArgs.sequencing]]. + * + * For SCD1 we only care about the most recent (by sequence value) event per key. When + * multiple events share the same key and the same sequence value, the row selected is + * non-deterministic and undefined. + * + * The schema of the returned dataframe matches the schema of the microbatch exactly. + */ + def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { + // The `max_by` API can only return a single column, so pack/unpack the entire row into a + // temporary column before and after the `max_by` operation. + val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + + val allMicrobatchColumns = + microbatchDf.columns + .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) + .toImmutableArraySeq + + microbatchDf + .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) + .agg( + F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) + .as(winningRowCol) + ) + .select(F.col(s"$winningRowCol.*")) + } + + /** + * Project the CDC metadata column onto the microbatch. + * + * The returned dataframe has all of the columns in the input microbatch + the CDC metadata + * column. + */ + def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + // Proactively validate the reserved CDC metadata column does not exist in the microbatch. + validateCdcMetadataColumnNotPresent(microbatchDf) + + val rowDeleteSequence: Column = changeArgs.deleteCondition match { + case Some(deleteCondition) => + F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null)) + case None => + F.lit(null) + } + + val rowUpsertSequence: Column = + // A row that is not a delete must be an upsert, these are mutually exclusive and a complete + // set of CDC event types. + F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null)) + + microbatchDf.withColumn( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.constructCdcMetadataCol( + deleteSequence = rowDeleteSequence, + upsertSequence = rowUpsertSequence, + sequencingType = resolvedSequencingType + ) + ) + } + + /** + * Left anti-join the microbatch with the auxiliary table on tombstones that match against and + * effectively delete late-arriving upserts (or stale deletes). + * + * @param microbatchDf The incoming microbatch dataframe with at minimum all of the key + * columns + CDC metadata column. + * @param auxiliaryTableDf Dataframe representing the auxiliary table, with at minimum the key + * columns + CDC metadata column. + * + * The returned filtered dataframe has the same schema as the input microbatch, but with only + * the rows that remain unaffected by any known tombstones. + */ + def applyTombstonesToMicrobatch( + microbatchDf: DataFrame, + auxiliaryTableDf: DataFrame): DataFrame = { + val aliasedMicrobatchDf = microbatchDf.alias("microbatch") + val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable") + + val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName + + val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata") + val effectiveSeq = F.greatest( + Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata), + Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata) + ) + val tombstoneDeleteSeq = + Scd1BatchProcessor.deleteSequenceOf(F.col(s"auxiliaryTable.$cdcMetadata")) + + val keysMatch = changeArgs.keys + .map { k => + F.col(s"microbatch.${k.quoted}") === F.col(s"auxiliaryTable.${k.quoted}") + } + .reduce(_ && _) + + // A microbatch row is considered late-arriving (and therefore deleted by the tombstone) when + // the auxiliary table holds a tombstone for the same key with a strictly larger delete + // sequence. Both late-arriving upserts and deletes are dropped. + val microbatchRowDeletedByTombstone = effectiveSeq < tombstoneDeleteSeq + + aliasedMicrobatchDf.join( + right = aliasedAuxiliaryTableDf, + joinExprs = keysMatch && microbatchRowDeletedByTombstone, + joinType = "left_anti" + ) + } + + /** + * Merge the reconciled (deduplicated per key) microbatch onto the auxiliary table, + * advancing or deleting existing tombstones and inserting new tombstones for previously + * untracked keys. + * + * After the merge, the auxiliary table has the same schema as before, but with the latest + * tombstone data per key. + * + * @param reconciledMicrobatchDf The deduplicated microbatch. + * @param auxiliaryTableIdentifier The identifier (not a [[DataFrame]]) of the auxiliary + * table, as required by the `mergeInto(table, condition)` + * API. + */ + def mergeMicrobatchOntoAuxiliaryTable( + reconciledMicrobatchDf: DataFrame, + auxiliaryTableIdentifier: TableIdentifier + ): Unit = { + val auxIdentQuoted = auxiliaryTableIdentifier.quotedString + val meta = Scd1BatchProcessor.cdcMetadataColName + + // Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are + // irrelevant for the auxiliary table and should not be persisted. + val reducedMicrobatch = reconciledMicrobatchDf + .select(changeArgs.keys.map(k => F.col(k.quoted)) :+ F.col(meta): _*) + .as("reducedMicrobatch") + + val microbatchCdcMetadata: Column = F.col(s"reducedMicrobatch.`$meta`") + val incomingDelete: Column = Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata) + val incomingUpsert: Column = Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata) + + val auxCdcMetadata: Column = F.col(s"$auxIdentQuoted.`$meta`") + val auxDelete: Column = Scd1BatchProcessor.deleteSequenceOf(auxCdcMetadata) + + val doKeysMatch = changeArgs.keys + .map(k => F.col(s"reducedMicrobatch.${k.quoted}") === F.col(s"$auxIdentQuoted.${k.quoted}")) + .reduce(_ && _) + + val incomingRowRepresentsDeleteEvent = + incomingDelete.isNotNull && (incomingUpsert.isNull || incomingDelete > incomingUpsert) + + reducedMicrobatch + .mergeInto(auxIdentQuoted, doKeysMatch) + // Incoming delete is newer than the stored one: advance the high-water mark. + .whenMatched( + incomingRowRepresentsDeleteEvent && incomingDelete > auxDelete + ) + .update(Map(s"$auxIdentQuoted.`$meta`" -> microbatchCdcMetadata)) + // Incoming upsert is newer than the stored delete: the key was re-inserted after the + // delete, so the aux tombstone is stale - remove it to prevent unbounded growth. + .whenMatched( + !incomingRowRepresentsDeleteEvent && incomingUpsert >= auxDelete + ) + .delete() + // New delete for a key not yet tracked, add it to auxiliary table. Note that in the + // reconciled microbatch, there is at most one event for key, which represents the latest + // known event for the key. If the latest known event is a delete, it must be a tombstone. + .whenNotMatched(incomingRowRepresentsDeleteEvent) + .insertAll() + .merge() + } + + private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { + val sqlConf = microbatchDf.sparkSession.sessionState.conf + val resolver = sqlConf.resolver + + microbatchDf.schema.fieldNames + .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) + .foreach { conflictingColumnName => + throw new AnalysisException( + errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of(!sqlConf.caseSensitiveAnalysis), + "columnName" -> conflictingColumnName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } +} + +object Scd1BatchProcessor { + private[autocdc] val cdcMetadataColName: String = "_cdc_metadata" + + private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence" + private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence" + + /** Project the delete sequence out of the CDC metadata column. */ + private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column = + cdcMetadataCol.getField(cdcDeleteSequenceFieldName) + + /** Project the upsert sequence out of the CDC metadata column. */ + private[autocdc] def upsertSequenceOf(cdcMetadataCol: Column): Column = + cdcMetadataCol.getField(cdcUpsertSequenceFieldName) + + /** + * Schema of the CDC metadata struct column for SCD1. + */ + private def cdcMetadataColSchema(sequencingType: DataType): StructType = + StructType( + Seq( + // The sequencing of the event if it represents a delete, null otherwise. + StructField(cdcDeleteSequenceFieldName, sequencingType, nullable = true), + // The sequencing of the event if it represents an upsert, null otherwise. + StructField(cdcUpsertSequenceFieldName, sequencingType, nullable = true) + ) + ) + + /** + * Construct the CDC metadata struct column for SCD1, following the exact schema and field + * ordering defined by [[cdcMetadataColSchema]]. + */ + private[autocdc] def constructCdcMetadataCol( + deleteSequence: Column, + upsertSequence: Column, + sequencingType: DataType): Column = { + val cdcMetadataFieldsInOrder = cdcMetadataColSchema(sequencingType).fields.map { field => + val value = field.name match { + case `cdcDeleteSequenceFieldName` => deleteSequence + case `cdcUpsertSequenceFieldName` => upsertSequence + case other => + throw SparkException.internalError( + s"Unable to construct SCD1 CDC metadata column due to unknown `${other}` field." + ) + } + value.cast(field.dataType).as(field.name) + } + F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala new file mode 100644 index 0000000000000..e5a602b5e84e4 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, AnalysisException, Row} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { + + private val sourceSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("Name", StringType) + .add("age", IntegerType) + + test("ColumnSelection None leaves schema unchanged") { + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = None, + ignoreCase = false + ) == sourceSchema) + } + + test("ColumnSelection IncludeColumns(Seq()) returns an empty schema") { + // An explicit empty include-list is semantically distinct from None: it means "select + // no columns" and produces an empty StructType, not the original schema. + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.IncludeColumns(Seq.empty)), + ignoreCase = false + ) == new StructType()) + } + + test("ColumnSelection ExcludeColumns(Seq()) leaves schema unchanged") { + // An empty exclude-list is a no-op: nothing to remove, so the original schema is + // returned unchanged (same observable behavior as None for this case). + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.ExcludeColumns(Seq.empty)), + ignoreCase = false + ) == sourceSchema) + } + + test("ColumnSelection IncludeColumns filters by exact name in schema order") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name")) + ) + ), + ignoreCase = false + ) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection ExcludeColumns filters by exact name") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))) + ), + ignoreCase = false + ) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection IncludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // Under ignoreCase = false, "name" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")) + ) + ), + ignoreCase = false + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", + "missingColumns" -> "name, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection ExcludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // Under ignoreCase = false, "NAME" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")) + ) + ), + ignoreCase = false + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", + "missingColumns" -> "NAME, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection IncludeColumns matches case-insensitively under ignoreCase=true") { + // "NAME" and "AGE" do not exactly match the schema fields "Name" and "age", but + // ignoreCase = true folds both sides to lowercase before comparing. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("AGE"), UnqualifiedColumnName("NAME")) + ) + ), + ignoreCase = true + ) + + // The retained fields keep their original casing from the schema, not the user's input. + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection deduplicates user-provided columns that normalize to the same name") { + // Under ignoreCase = true, "name" and "NAME" both fold to "name" and refer to the same + // schema field. The returned schema must include "Name" once, not twice. Output ordering + // and casing follow the schema, not the user's input. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("NAME")) + ) + ), + ignoreCase = true + ) + + assert(filteredSchema == new StructType().add("Name", StringType)) + } + + test("ColumnSelection ExcludeColumns matches case-insensitively under ignoreCase=true") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) + ), + ignoreCase = true + ) + + assert(filteredSchema == new StructType() + .add("id", IntegerType, nullable = false) + .add("age", IntegerType)) + } + + test("ColumnSelection missing-column error under ignoreCase=true preserves user casing") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + // "NAME" matches "Name" under ignoreCase=true, but "Missing" has no schema match. + // The error message reports the user's original casing for the missing column and + // the schema's original casing for the available columns. + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("Missing")) + ) + ), + ignoreCase = true + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "schemaName" -> "test", + "missingColumns" -> "Missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("UnqualifiedColumnName accepts a simple single-part identifier") { + assert(UnqualifiedColumnName("col").name == "col") + // .quoted always wraps in back-ticks, even when the input had none. + assert(UnqualifiedColumnName("col").quoted == "`col`") + } + + test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") { + // Backticks make the dot part of a single name part, so this passes validation. The + // stored name is the parsed (unquoted) form so it matches the actual schema field name. + assert(UnqualifiedColumnName("`a.b`").name == "a.b") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`a.b`").quoted == "`a.b`") + } + + test("UnqualifiedColumnName accepts redundant backticks around a single-part name") { + // Backticks around an already-single-part identifier are decorative; the parser strips them + // so the stored name has no surrounding back-ticks. + assert(UnqualifiedColumnName("`col`").name == "col") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`col`").quoted == "`col`") + } + + test("UnqualifiedColumnName.quoted is safe to pass to functions.col for literal-dot names") { + val schema = new StructType() + .add("a.b", IntegerType) + .add("c", IntegerType) + + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, 2), Row(3, 4))), + schema + ) + + val key = UnqualifiedColumnName("`a.b`") + + // Sanity-check: the unquoted `name` is not safe to pass to `functions.col`. The string is + // re-parsed and the literal dot is interpreted as a nested-field path separator, so the + // analyzer fails to resolve `a`.`b` against the available top-level columns. + checkError( + exception = intercept[AnalysisException] { + df.select(F.col(key.name)).collect() + }, + condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + sqlState = "42703", + parameters = Map( + "objectName" -> "`a`.`b`", + "proposal" -> "`a.b`, `c`" + ), + context = ExpectedContext( + fragment = "col", + callSitePattern = "" + ) + ) + + // The `quoted` form wraps the name in back-ticks so the re-parser treats the whole thing + // as a single identifier, resolving to the top-level "a.b" column. + assert(df.select(F.col(key.quoted)).collect().toSeq == Seq(Row(1), Row(3))) + } + + test("IncludeColumns correctly matches a backtick-quoted literal-dot column") { + val schema = new StructType() + .add("a.b", IntegerType) + .add("c", StringType) + + // The user writes `a.b` to refer to the literal-dot column "a.b" in the schema. After + // construction, the [[UnqualifiedColumnName]] holds "a.b", which matches the field name + // exactly and the column is included in the filtered schema. + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = schema, + columnSelection = Some( + ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`a.b`"))) + ), + ignoreCase = false + ) + + assert(filteredSchema == new StructType().add("a.b", IntegerType)) + } + + test("UnqualifiedColumnName rejects a dotted (multi-part) identifier") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("a.b") + }, + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "a.b", + "nameParts" -> "a, b" + ) + ) + } + + test("UnqualifiedColumnName rejects a qualified column reference") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("src.x") + }, + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "src.x", + "nameParts" -> "src, x" + ) + ) + } + + test("UnqualifiedColumnName rejects an identifier with three or more parts") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("a.b.c") + }, + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "a.b.c", + "nameParts" -> "a, b, c" + ) + ) + } + + test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") { + checkError( + exception = intercept[ParseException] { + UnqualifiedColumnName("") + }, + condition = "PARSE_EMPTY_STATEMENT", + sqlState = Some("42617") + ) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala new file mode 100644 index 0000000000000..f2f80a640ae79 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryRowLevelOperationTableCatalog, TableInfo} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Tests for [[Scd1BatchProcessor]] methods that perform a `MERGE INTO` against a registered + * v2 table. These tests require a v2 catalog that supports row-level operations + * ([[InMemoryRowLevelOperationTableCatalog]]) and run actual writes through Catalyst's + * row-level-operations machinery, so they are kept separate from the pure-DataFrame-transform + * tests in [[Scd1BatchProcessorSuite]]. + */ +class Scd1BatchProcessorMergeSuite + extends SparkFunSuite with SharedSparkSession with BeforeAndAfter { + + private val auxCatalogName = "cat" + private val auxNamespace = "ns1" + private val auxTableName = "aux_table" + + /** v2 [[Identifier]] used for direct catalog API calls (CREATE TABLE). */ + private val auxIdent = Identifier.of(Array(auxNamespace), auxTableName) + + /** Three-part [[TableIdentifier]] passed to the function under test. */ + private val auxTableIdentifier = TableIdentifier( + table = auxTableName, + database = Some(auxNamespace), + catalog = Some(auxCatalogName) + ) + + /** + * Minimal valid shape for both the auxiliary table and microbatch inputs in these tests: + * a single key column `id` plus the CDC metadata struct. The auxiliary table genuinely + * has only this shape in production, and the merge function reduces its microbatch input + * down to keys + `_cdc_metadata` regardless of incoming data columns -- so most tests can + * use this single schema for both ends. + */ + private val minimalSchema: StructType = new StructType() + .add("id", IntegerType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + ) + + /** + * A processor with a single key column `id`. `sequencing` is irrelevant for + * `mergeMicrobatchOntoAuxiliaryTable`: that function operates entirely on the already- + * computed CDC metadata column, never on the raw sequencing expression. + */ + private val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + before { + spark.conf.set( + s"spark.sql.catalog.$auxCatalogName", + classOf[InMemoryRowLevelOperationTableCatalog].getName + ) + } + + after { + spark.sessionState.catalogManager.reset() + spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$auxCatalogName") + } + + /** + * Create an auxiliary table in the test catalog using [[minimalSchema]] and seed it with + * `seedRows`. The table is unpartitioned and supports row-level operations. Pass no rows + * to create an empty table. + */ + private def createAuxTable(seedRows: Row*): Unit = { + val tableInfo = new TableInfo.Builder() + .withSchema(minimalSchema) + .build() + spark.sessionState.catalogManager + .catalog(auxCatalogName) + .asTableCatalog + .createTable(auxIdent, tableInfo) + + if (seedRows.nonEmpty) { + val df = spark.createDataFrame(spark.sparkContext.parallelize(seedRows), minimalSchema) + df.writeTo(auxTableIdentifier.quotedString).append() + } + } + + /** Read the current contents of the auxiliary table. */ + private def readAuxTable(): DataFrame = spark.read.table(auxTableIdentifier.quotedString) + + /** Build a microbatch [[DataFrame]] from explicit `rows` and an explicit `schema`. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Build a row matching the [[Scd1BatchProcessor.cdcMetadataColName]] struct's two fields, in + * the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]: + * `(deleteSequence, upsertSequence)`. Pass `None` for the side that doesn't apply. + */ + private def cdcMetadataRow(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row = + Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null)) + + /** + * `(name, dataType)` pairs of `schema`'s fields, used to compare two schemas for structural + * equivalence while deliberately ignoring nullability and metadata. + */ + private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = + schema.fields.map(f => (f.name, f.dataType)).toSeq + + // =============== mergeMicrobatchOntoAuxiliaryTable tests =============== + + test("mergeMicrobatchOntoAuxiliaryTable replaces an existing tombstone with a newer " + + "microbatch tombstone, dropping any microbatch-only data columns") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + // The microbatch carries an extra `value` data column that has no place in the auxiliary + // table. mergeMicrobatchOntoAuxiliaryTable must project it away before merging, both to + // satisfy MergeIntoTable's schema requirements and to keep the auxiliary table free of + // unrelated columns. + val microbatchSchema = new StructType() + .add("id", IntegerType) + .add("value", StringType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + ) + val microbatch = microbatchOf(microbatchSchema)( + Row(1, "data-leak", cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + val result = readAuxTable() + // Tombstone advanced to delete=20, with exactly one row per key (no duplicate tombstones). + checkAnswer(result, Row(1, Row(20L, null))) + // Schema strictly matches minimalSchema; the `value` column was dropped, not smuggled in. + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(minimalSchema)) + } + + test("mergeMicrobatchOntoAuxiliaryTable deletes an existing tombstone when superseded by a " + + "newer microbatch upsert") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + assert(readAuxTable().collect().isEmpty) + } + + test("mergeMicrobatchOntoAuxiliaryTable inserts a new tombstone for a previously-untracked " + + "key") { + createAuxTable() + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable leaves rows for unrelated keys untouched") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + // Microbatch event affects a different key entirely; the existing tombstone for id=1 must + // not be touched even though the new tombstone's sequence is much larger. + val microbatch = microbatchOf(minimalSchema)( + Row(2, cdcMetadataRow(deleteSeq = Some(100), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Seq( + Row(1, Row(10L, null)), + Row(2, Row(100L, null)) + )) + } + + test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch deletes whose sequence is older " + + "than the existing tombstone") { + // This documents that mergeMicrobatchOntoAuxiliaryTable's contract is stronger than just + // relying on applyTombstonesToMicrobatch having filtered out stale events upstream: even + // an unfiltered stale incoming delete must not regress the high-water mark. + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(5), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch upserts whose sequence is older " + + "than the existing tombstone") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable applies the tied-sequence asymmetry: equal deletes " + + "are kept, equal upserts delete the tombstone") { + // On a delete<->upsert sequencing tie, upsert events are given priority over deletes; + // therefore an incoming upsert with the same sequence as a tombstone should delete the + // tombstone. On a delete<->delete sequencing tie, the effect is a no-op. This is an + // internal SCD1 tie-breaking convention, not a publicly documented contract. + createAuxTable( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)), + Row(2, cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)) + ) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)), + Row(2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + // Row 1's tombstone remains the same, but row 2's tombstone should be marked as stale and + // deleted. + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable upsert event for different key does not affect " + + "tombstone") { + createAuxTable(Row(2, cdcMetadataRow(deleteSeq = Some(5), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + // Although the upsert seq is 10, this is for key=1; tombstone for key=2 should be unaffected. + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(2, Row(5L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable is idempotent across a microbatch that exercises " + + "every merge clause") { + // The auxiliary table starts with three tombstones; the microbatch then exercises every + // merge clause simultaneously: + // - id=1: aux tombstone superseded by a microbatch upsert + // - id=2: aux tombstone advanced by a newer microbatch delete + // - id=3: untouched by the microbatch + // - id=4: new tombstone for an untracked key + createAuxTable( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)), + Row(2, cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)), + Row(3, cdcMetadataRow(deleteSeq = Some(30), upsertSeq = None)) + ) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(15))), + Row(2, cdcMetadataRow(deleteSeq = Some(25), upsertSeq = None)), + Row(4, cdcMetadataRow(deleteSeq = Some(40), upsertSeq = None)) + ) + + val expectedAfterMerge = Seq( + Row(2, Row(25L, null)), + Row(3, Row(30L, null)), + Row(4, Row(40L, null)) + ) + + // First merge applies all three clauses exactly once. + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + checkAnswer(readAuxTable(), expectedAfterMerge) + + // Re-applying the same microbatch is a no-op: + // - id=1 is absent from aux; whenNotMatched is gated on delete events => skipped. + // - id=2 has tied delete (incoming==aux); strict `>` in the update clause fails. + // - id=4 has tied delete (incoming==aux); same reason. + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + checkAnswer(readAuxTable(), expectedAfterMerge) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala new file mode 100644 index 0000000000000..7a15ec38f04aa --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, AnalysisException, Row} +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { + + /** DataType for the CDC metadata column, where sequencing type is Long. */ + private val cdcMetadataColSchemaType: DataType = new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + + /** + * Helper to construct a CDC metadata column row, following [[cdcMetadataColSchemaType]]. + */ + private def cdcMetadataRow(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row = + Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null)) + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Returns the `(name, dataType)` pairs of `schema`'s fields. Used to compare two schemas for + * structural equivalence while deliberately ignoring nullability and metadata, which can shift + * benignly when columns are unpacked from a struct. + */ + private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = + schema.fields.map(f => (f.name, f.dataType)).toSeq + + // =============== deduplicateMicrobatch tests =============== + + test("deduplicateMicrobatch keeps only the row with the largest sequence value per key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first"), + Row(1, 30L, "winner"), + Row(1, 20L, "middle") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 30L, "winner") + ) + } + + test("deduplicateMicrobatch processes multiple keys independently") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a1"), + Row(2, 50L, "b1-winner"), + Row(1, 20L, "a2-winner"), + Row(2, 40L, "b2-loser"), + Row(3, 1L, "c1-only") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 20L, "a2-winner"), + Row(2, 50L, "b1-winner"), + Row(3, 1L, "c1-only") + ) + ) + } + + test("deduplicateMicrobatch carries non-key, non-sequence columns from the winning row") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("name", StringType) + .add("amount", IntegerType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old-name", 100), + Row(1, 20L, "winning-name", 200) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + // All non-key columns must come from the row with the largest sequence value, never + // a mix of values from multiple rows. + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "winning-name", 200) + ) + } + + test("deduplicateMicrobatch supports composite (multi-column) keys") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("US", 1, 10L, "us1-old"), + Row("US", 1, 20L, "us1-new"), + // Same customer_id as above but different region: independent group. + Row("EU", 1, 5L, "eu1-only"), + // Same region as above but different customer_id: independent group. + Row("US", 2, 99L, "us2-only") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row("US", 1, 20L, "us1-new"), + Row("EU", 1, 5L, "eu1-only"), + Row("US", 2, 99L, "us2-only") + ) + ) + } + + test("deduplicateMicrobatch supports literal-dot column names") { + val schema = new StructType() + .add("user.id", IntegerType) + .add("seq", LongType) + .add("event.value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old"), + Row(1, 20L, "new") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "new") + ) + } + + test("deduplicateMicrobatch preserves the input column names, types, and ordering") { + val schema = new StructType() + .add("a", StringType) + .add("id", IntegerType) + .add("z", DoubleType) + .add("seq", LongType) + .add("flag", BooleanType) + + val batch = microbatchOf(schema)( + Row("a1", 1, 1.5, 10L, true), + Row("a2", 1, 2.5, 20L, false) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + // Field names and dataTypes must match the input exactly, in the original order. + assert( + columnNamesAndDataTypes(processor.deduplicateMicrobatch(batch).schema) == + columnNamesAndDataTypes(schema)) + } + + test("deduplicateMicrobatch returns an empty DataFrame with preserved schema") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)() + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.deduplicateMicrobatch(batch) + assert(result.collect().isEmpty) + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) + } + + // =============== extendMicrobatchRowsWithCdcMetadata tests =============== + + test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or an upsert " + + "per deleteCondition") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, 10L, false), + Row(2, 20L, true), + Row(3, 30L, false), + Row(4, 40L, true) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = Some(F.col("is_delete") === true) + ), + resolvedSequencingType = LongType + ) + + // Mutual-exclusivity invariant: each row's _cdc_metadata struct has exactly one of + // (deleteSequence, upsertSequence) non-null, and the non-null side carries the row's + // sequence value. + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, false, Row(null, 10L)), + Row(2, 20L, true, Row(20L, null)), + Row(3, 30L, false, Row(null, 30L)), + Row(4, 40L, true, Row(40L, null)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " + + "when deleteCondition is None") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a"), + Row(2, 20L, "b") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = None + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, "a", Row(null, 10L)), + Row(2, 20L, "b", Row(null, 20L)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata appends CDC metadata as the last column") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + // Original columns are preserved in their original order, with CDC metadata appended at + // the very end. + assert(result.schema.fieldNames.toSeq == + schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName) + } + + test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence fields to " + + "resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is IntegerType, but the flow's resolved sequencing type + // will be LongType. This should be upcasted in the projected CDC metadata column. + .add("seq", IntegerType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + val cdcMetadataDataType = + resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq( + Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType, + Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType)) + + // The cast must also succeed at runtime: upsertSequence is materialized as a Long value, not + // an Int. + checkAnswer( + df = resultDf, + expectedAnswer = Row(1, 10, "a", Row(null, 10L)) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata fails fast when the microbatch's sequencing column " + + "is incompatible with resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is a struct, whereas the flow's resolved sequencing type + // will be LongType. These are incompatible and should throw. + .add( + "seq", + new StructType() + .add("major", LongType) + .add("minor", LongType)) + + val batch = microbatchOf(schema)( + Row(1, Row(1L, 0L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val ex = intercept[AnalysisException] { + // .schema forces analysis of the underlying logical plan, surfacing the invalid cast. + processor.extendMicrobatchRowsWithCdcMetadata(batch).schema + } + assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION") + } + + test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already contains the " + + "reserved CDC metadata column") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add(Scd1BatchProcessor.cdcMetadataColName, StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "user-supplied") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkError( + exception = intercept[AnalysisException] { + processor.extendMicrobatchRowsWithCdcMetadata(batch) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "columnName" -> Scd1BatchProcessor.cdcMetadataColName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } + + // =============== applyTombstonesToMicrobatch tests =============== + + /** + * Schema for the microbatch input to [[Scd1BatchProcessor.applyTombstonesToMicrobatch]] + * tests. + */ + private val applyTombstonesToMicrobatchTestMicrobatchSchema: StructType = new StructType() + // Key column. + .add("id", IntegerType) + // Data column. + .add("value", StringType) + // CDC metadata column. + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + /** + * Schema for the auxiliary input to [[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests. + * + * In practice for SCD1 the auxiliary table only carries key columns and the CDC metadata + * column -- never user data columns -- so we mirror that production-side asymmetry here, + * even though the function's API contract would allow a single shared schema. + */ + private val applyTombstonesToMicrobatchTestAuxiliarySchema: StructType = new StructType() + // Key column. + .add("id", IntegerType) + // CDC metadata column. + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts when a matching " + + "tombstone exists for the same key") { + // Both microbatch events have an effective sequence strictly less than the tombstone's + // delete sequence, so they must be anti-joined out of the microbatch regardless of whether + // they are deletes or upserts. + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))), + Row(1, "stale-delete", cdcMetadataRow(deleteSeq = Some(7), upsertSeq = None)) + ) + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary) + assert(result.collect().isEmpty) + } + + test("applyTombstonesToMicrobatch keeps a microbatch row whose effective sequence ties the " + + "tombstone's delete sequence") { + // The join uses strict `<`, so a microbatch row with the same effective sequence as the + // tombstone is kept. This is an internal tie-breaking convention for SCD1 only, and is + // *not* a publicly documented contract: if external callers ever start relying on it, both + // this test and the join condition in applyTombstonesToMicrobatch should move together. + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "tied-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))) + ) + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Row(1, "tied-upsert", Row(null, 10L)) + ) + } + + test("applyTombstonesToMicrobatch keeps microbatch rows whose effective sequence exceeds the " + + "tombstone's delete sequence") { + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "fresher-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(15))), + Row(1, "fresher-delete", cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)) + ) + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Seq( + Row(1, "fresher-upsert", Row(null, 15L)), + Row(1, "fresher-delete", Row(20L, null)) + ) + ) + } + + test("applyTombstonesToMicrobatch leaves microbatch rows untouched when the tombstone targets " + + "a different key") { + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "stays", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + // Tombstone on a different key with a much larger sequence; the key match must guard + // against cross-key application no matter how stale the microbatch row's sequence is. + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(2, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Row(1, "stays", Row(null, 5L)) + ) + } + + test("applyTombstonesToMicrobatch with composite keys requires every key column to match") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + val microbatch = microbatchOf(schema)( + Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))), + Row("US", 2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + // Tombstone matches on `region` only; `customer_id` differs from every microbatch row. + // The join condition is the AND of all key column equalities, so neither microbatch row + // should be dropped. + val auxiliary = microbatchOf(schema)( + Row("US", 99, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Seq( + Row("US", 1, Row(null, 5L)), + Row("US", 2, Row(null, 5L)) + ) + ) + } + + test("applyTombstonesToMicrobatch supports backticked key names containing a literal dot") { + val schema = new StructType() + .add("user.id", IntegerType) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + val microbatch = microbatchOf(schema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + val auxiliary = microbatchOf(schema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary) + assert(result.collect().isEmpty) + } +}