diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index e80d43b11554..d3bcade5c7ec 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.SparkException import org.apache.spark.sql.{functions => F, AnalysisException} import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.util.QuotingUtils import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.types.{DataType, StructField, StructType} @@ -212,6 +213,151 @@ case class Scd1BatchProcessor( ) } + /** + * Merge the reconciled (deduplicated per key) microbatch onto the auxiliary table, + * advancing or deleting existing tombstones and inserting new tombstones for previously + * untracked keys. + * + * After the merge, the auxiliary table has the same schema as before, but with the latest + * tombstone data per key. + * + * @param reconciledMicrobatchDf The deduplicated microbatch. + * @param auxiliaryTableIdentifier The identifier (not a [[DataFrame]]) of the auxiliary + * table, as required by the `mergeInto(table, condition)` + * API. + */ + def mergeMicrobatchOntoAuxiliaryTable( + reconciledMicrobatchDf: DataFrame, + auxiliaryTableIdentifier: TableIdentifier + ): Unit = { + val auxIdentQuoted = auxiliaryTableIdentifier.quotedString + val meta = Scd1BatchProcessor.cdcMetadataColName + + // Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are + // irrelevant for the auxiliary table and should not be persisted. + val reducedMicrobatch = reconciledMicrobatchDf + .select(changeArgs.keys.map(k => F.col(k.quoted)) :+ F.col(meta): _*) + .as("reducedMicrobatch") + + val microbatchCdcMetadata: Column = F.col(s"reducedMicrobatch.`$meta`") + val incomingDelete: Column = Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata) + val incomingUpsert: Column = Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata) + + val auxCdcMetadata: Column = F.col(s"$auxIdentQuoted.`$meta`") + val auxDelete: Column = Scd1BatchProcessor.deleteSequenceOf(auxCdcMetadata) + + val doKeysMatch = changeArgs.keys + .map(k => F.col(s"reducedMicrobatch.${k.quoted}") === F.col(s"$auxIdentQuoted.${k.quoted}")) + .reduce(_ && _) + + val incomingRowRepresentsDeleteEvent = + incomingDelete.isNotNull && (incomingUpsert.isNull || incomingDelete > incomingUpsert) + + reducedMicrobatch + .mergeInto(auxIdentQuoted, doKeysMatch) + // Incoming delete is newer than the stored one: advance the high-water mark. + .whenMatched( + incomingRowRepresentsDeleteEvent && incomingDelete > auxDelete + ) + .update(Map(s"$auxIdentQuoted.`$meta`" -> microbatchCdcMetadata)) + // Incoming upsert is newer than the stored delete: the key was re-inserted after the + // delete, so the aux tombstone is stale - remove it to prevent unbounded growth. + .whenMatched( + !incomingRowRepresentsDeleteEvent && incomingUpsert >= auxDelete + ) + .delete() + // New delete for a key not yet tracked, add it to auxiliary table. Note that in the + // reconciled microbatch, there is at most one event for key, which represents the latest + // known event for the key. If the latest known event is a delete, it must be a tombstone. + .whenNotMatched(incomingRowRepresentsDeleteEvent) + .insertAll() + .merge() + } + + /** + * Merge the reconciled (deduplicated, tombstone applied, and column selection + metadata + * column projected) microbatch onto the target table, as per SCD1 semantics. + * + * Microbatch invariants: + * - Exactly one of {upsert, delete} version is non-null, the other is null. + * - There is at most one event per key, representing the latest known event for the key + * across the microbatch and auxiliary table. + * + * Target table invariants: + * - Target table only contains live rows; delete sequence is always null, upsert sequence + * is always non-null. + * + * @param reconciledMicrobatchDf The reconciled microbatch dataframe. + * @param targetTableIdentifier The identifier (not a [[DataFrame]]) of the target + * table, as required by the `mergeInto(table, condition)` + * API. + */ + def mergeMicrobatchOntoTarget( + reconciledMicrobatchDf: DataFrame, + targetTableIdentifier: TableIdentifier + ): Unit = { + val meta = Scd1BatchProcessor.cdcMetadataColName + + val destinationTableStr = targetTableIdentifier.quotedString + // (Re-)alias the reconciled microbatch DF for easy reference for the remainder of the merge. + val microbatchDf = reconciledMicrobatchDf.as("microbatch") + + val microbatchCdcMetadataCol = F.col(s"microbatch.`$meta`") + val destinationCdcMetadataCol = + F.col(s"$destinationTableStr.`$meta`") + + val microbatchDeleteVersionField = + Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadataCol) + val microbatchUpsertVersionField = + Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadataCol) + val destinationUpsertVersionField = + Scd1BatchProcessor.upsertSequenceOf(destinationCdcMetadataCol) + + val keysMatch = changeArgs.keys + .map(k => + F.col(s"microbatch.${k.quoted}") === F.col(s"$destinationTableStr.${k.quoted}") + ) + .reduce(_ && _) + + // Upsert beats existing row if incoming upsert sequence is geq to the upsert sequence on + // the target. + val incomingWinsUpsert = microbatchUpsertVersionField.isNotNull && + microbatchUpsertVersionField >= destinationUpsertVersionField + + // Delete beats existing row if delete sequencing is strictly greater than the upsert + // sequence on the target. This is an arbitrary but deliberate choice to maintain that + // upserts get priority over deletes on duplicate sequencing. + val incomingWinsDelete = microbatchDeleteVersionField.isNotNull && + microbatchDeleteVersionField > destinationUpsertVersionField + + // When the incoming upsert wins against an existing record, the entire row (all columns) + // will be overwritten, including the CDC metadata column. We only exclude keys because + // most merge implementations require that join columns are not being mutated, even if + // the mutation is a no-op. + val resolver = microbatchDf.sparkSession.sessionState.conf.resolver + val keyNames = changeArgs.keys.map(_.name) + val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] = + microbatchDf.columns + .filterNot(c => keyNames.exists(resolver(_, c))) + .map { c => + val quotedCol = QuotingUtils.quoteIdentifier(c) + s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol") + } + .toMap + + microbatchDf + .mergeInto(destinationTableStr, keysMatch) + .whenMatched(incomingWinsDelete) + .delete() + .whenMatched(incomingWinsUpsert) + .update(columnsToUpdateWhenIncomingWinsUpsert) + // New key: only insert upserts; deletes for absent keys are no-ops for the target table + // merge, and instead would have been inserted as tombstones into the auxiliary table. + .whenNotMatched(microbatchDeleteVersionField.isNull) + .insertAll() + .merge() + } + private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { val microbatchSqlConf = microbatch.sparkSession.sessionState.conf val resolver = microbatchSqlConf.resolver diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala new file mode 100644 index 000000000000..6fa3495e6fb7 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{functions => F, AnalysisException, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryRowLevelOperationTableCatalog, TableInfo} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Tests for [[Scd1BatchProcessor]] methods that perform a `MERGE INTO` against a registered + * v2 table. These tests require a v2 catalog that supports row-level operations + * ([[InMemoryRowLevelOperationTableCatalog]]) and run actual writes through Catalyst's + * row-level-operations machinery, so they are kept separate from the pure-DataFrame-transform + * tests in [[Scd1BatchProcessorSuite]]. + */ +class Scd1BatchProcessorMergeSuite + extends QueryTest with SharedSparkSession with BeforeAndAfter { + + private val auxCatalogName = "cat" + private val auxNamespace = "ns1" + private val auxTableName = "aux_table" + private val targetTableName = "target_table" + + /** v2 [[Identifier]] used for direct catalog API calls (CREATE TABLE). */ + private val auxIdent = Identifier.of(Array(auxNamespace), auxTableName) + private val targetIdent = Identifier.of(Array(auxNamespace), targetTableName) + + /** Three-part [[TableIdentifier]] passed to the function under test. */ + private val auxTableIdentifier = TableIdentifier( + table = auxTableName, + database = Some(auxNamespace), + catalog = Some(auxCatalogName) + ) + private val targetTableIdentifier = TableIdentifier( + table = targetTableName, + database = Some(auxNamespace), + catalog = Some(auxCatalogName) + ) + + private val cdcMetadataColSchemaType = new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + + /** + * Minimal valid shape for both the auxiliary table and microbatch inputs in these tests: + * a single key column `id` plus the CDC metadata struct. The auxiliary table genuinely + * has only this shape in production, and the merge function reduces its microbatch input + * down to keys + `_cdc_metadata` regardless of incoming data columns -- so most tests can + * use this single schema for both ends. + */ + private val minimalSchema: StructType = new StructType() + .add("id", IntegerType) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + /** Minimal target-table shape: one key, one data column, and CDC metadata. */ + private val targetSchema: StructType = new StructType() + .add("id", IntegerType) + .add("value", StringType) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + /** + * A processor with a single key column `id`. `sequencing` is irrelevant for + * merge functions in this suite: they operate entirely on the already-computed CDC metadata + * column, never on the raw sequencing expression. + */ + private val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + before { + spark.conf.set( + s"spark.sql.catalog.$auxCatalogName", + classOf[InMemoryRowLevelOperationTableCatalog].getName + ) + } + + after { + spark.sessionState.catalogManager.reset() + spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$auxCatalogName") + } + + /** + * Build an auxiliary-table schema with the given key columns followed by the standard CDC + * metadata struct. Used by tests that need a non-trivial key shape (composite or dotted). + */ + private def customKeyAuxSchema(keyColumns: Seq[(String, DataType)]): StructType = { + val withKeys = keyColumns.foldLeft(new StructType()) { case (s, (name, dt)) => + s.add(name, dt) + } + withKeys.add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + ) + } + + /** + * Create an auxiliary table in the test catalog using `schema` and seed it with `seedRows`. + * The table is unpartitioned and supports row-level operations. Pass no rows to create an + * empty table. + */ + private def createAuxTableWithSchema(schema: StructType, seedRows: Row*): Unit = { + val tableInfo = new TableInfo.Builder() + .withSchema(schema) + .build() + spark.sessionState.catalogManager + .catalog(auxCatalogName) + .asTableCatalog + .createTable(auxIdent, tableInfo) + + if (seedRows.nonEmpty) { + val df = spark.createDataFrame(spark.sparkContext.parallelize(seedRows), schema) + df.writeTo(auxTableIdentifier.quotedString).append() + } + } + + /** Create an auxiliary table using [[minimalSchema]] (single `id` key). */ + private def createAuxTable(seedRows: Row*): Unit = + createAuxTableWithSchema(minimalSchema, seedRows: _*) + + /** Create a target table in the test catalog using [[targetSchema]]. */ + private def createTargetTable(seedRows: Row*): Unit = { + val tableInfo = new TableInfo.Builder() + .withSchema(targetSchema) + .build() + spark.sessionState.catalogManager + .catalog(auxCatalogName) + .asTableCatalog + .createTable(targetIdent, tableInfo) + + if (seedRows.nonEmpty) { + val df = spark.createDataFrame(spark.sparkContext.parallelize(seedRows), targetSchema) + df.writeTo(targetTableIdentifier.quotedString).append() + } + } + + /** Read the current contents of the auxiliary table. */ + private def readAuxTable(): DataFrame = spark.read.table(auxTableIdentifier.quotedString) + + /** Read the current contents of the target table. */ + private def readTargetTable(): DataFrame = spark.read.table(targetTableIdentifier.quotedString) + + /** Build a microbatch [[DataFrame]] from explicit `rows` and an explicit `schema`. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Build a row matching the [[Scd1BatchProcessor.cdcMetadataColName]] struct's two fields, in + * the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]: + * `(deleteSequence, upsertSequence)`. Pass `None` for the side that doesn't apply. + */ + private def cdcMetadataRow(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row = + Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null)) + + /** + * `(name, dataType)` pairs of `schema`'s fields, used to compare two schemas for structural + * equivalence while deliberately ignoring nullability and metadata. + */ + private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = + schema.fields.map(f => (f.name, f.dataType)).toSeq + + // =============== mergeMicrobatchOntoAuxiliaryTable tests =============== + + test("mergeMicrobatchOntoAuxiliaryTable replaces an existing tombstone with a newer " + + "microbatch tombstone, dropping any microbatch-only data columns") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + // The microbatch carries an extra `value` data column that has no place in the auxiliary + // table. mergeMicrobatchOntoAuxiliaryTable must project it away before merging, both to + // satisfy MergeIntoTable's schema requirements and to keep the auxiliary table free of + // unrelated columns. + val microbatchSchema = new StructType() + .add("id", IntegerType) + .add("value", StringType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + ) + val microbatch = microbatchOf(microbatchSchema)( + Row(1, "data-leak", cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + val result = readAuxTable() + // Tombstone advanced to delete=20, with exactly one row per key (no duplicate tombstones). + checkAnswer(result, Row(1, Row(20L, null))) + // Schema strictly matches minimalSchema; the `value` column was dropped, not smuggled in. + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(minimalSchema)) + } + + test("mergeMicrobatchOntoAuxiliaryTable deletes an existing tombstone when superseded by a " + + "newer microbatch upsert") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + assert(readAuxTable().collect().isEmpty) + } + + test("mergeMicrobatchOntoAuxiliaryTable inserts a new tombstone for a previously-untracked " + + "key") { + createAuxTable() + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable leaves rows for unrelated keys untouched") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + // Microbatch event affects a different key entirely; the existing tombstone for id=1 must + // not be touched even though the new tombstone's sequence is much larger. + val microbatch = microbatchOf(minimalSchema)( + Row(2, cdcMetadataRow(deleteSeq = Some(100), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Seq( + Row(1, Row(10L, null)), + Row(2, Row(100L, null)) + )) + } + + test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch deletes whose sequence is older " + + "than the existing tombstone") { + // This documents that mergeMicrobatchOntoAuxiliaryTable's contract is stronger than just + // relying on applyTombstonesToMicrobatch having filtered out stale events upstream: even + // an unfiltered stale incoming delete must not regress the high-water mark. + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(5), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch upserts whose sequence is older " + + "than the existing tombstone") { + createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable applies the tied-sequence asymmetry: equal deletes " + + "are kept, equal upserts delete the tombstone") { + // On a delete<->upsert sequencing tie, upsert events are given priority over deletes; + // therefore an incoming upsert with the same sequence as a tombstone should delete the + // tombstone. On a delete<->delete sequencing tie, the effect is a no-op. This is an + // internal SCD1 tie-breaking convention, not a publicly documented contract. + createAuxTable( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)), + Row(2, cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)) + ) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)), + Row(2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + // Row 1's tombstone remains the same, but row 2's tombstone should be marked as stale and + // deleted. + checkAnswer(readAuxTable(), Row(1, Row(10L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable upsert event for different key does not affect " + + "tombstone") { + createAuxTable(Row(2, cdcMetadataRow(deleteSeq = Some(5), upsertSeq = None))) + + val microbatch = microbatchOf(minimalSchema)( + // Although the upsert seq is 10, this is for key=1; tombstone for key=2 should be unaffected. + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))) + ) + + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(2, Row(5L, null))) + } + + test("mergeMicrobatchOntoAuxiliaryTable is idempotent across a microbatch that exercises " + + "every merge clause") { + // The auxiliary table starts with three tombstones; the microbatch then exercises every + // merge clause simultaneously: + // - id=1: aux tombstone superseded by a microbatch upsert + // - id=2: aux tombstone advanced by a newer microbatch delete + // - id=3: untouched by the microbatch + // - id=4: new tombstone for an untracked key + createAuxTable( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)), + Row(2, cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)), + Row(3, cdcMetadataRow(deleteSeq = Some(30), upsertSeq = None)) + ) + + val microbatch = microbatchOf(minimalSchema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(15))), + Row(2, cdcMetadataRow(deleteSeq = Some(25), upsertSeq = None)), + Row(4, cdcMetadataRow(deleteSeq = Some(40), upsertSeq = None)) + ) + + val expectedAfterMerge = Seq( + Row(2, Row(25L, null)), + Row(3, Row(30L, null)), + Row(4, Row(40L, null)) + ) + + // First merge applies all three clauses exactly once. + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + checkAnswer(readAuxTable(), expectedAfterMerge) + + // Re-applying the same microbatch is a no-op: + // - id=1 is absent from aux; whenNotMatched is gated on delete events => skipped. + // - id=2 has tied delete (incoming==aux); strict `>` in the update clause fails. + // - id=4 has tied delete (incoming==aux); same reason. + processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + checkAnswer(readAuxTable(), expectedAfterMerge) + } + + test("mergeMicrobatchOntoAuxiliaryTable correctly inserts tombstones for composite key") { + // Composite key: (region, customer_id). The merge join condition is the AND of every key + // column equality, so an aux row sharing only `region` with the microbatch must NOT be + // touched, while the microbatch row must be inserted as a new tombstone. + val compositeSchema = customKeyAuxSchema(Seq( + "region" -> StringType, + "customer_id" -> IntegerType + )) + createAuxTableWithSchema( + compositeSchema, + Row("US", 99, cdcMetadataRow(deleteSeq = Some(50), upsertSeq = None)) + ) + + val compositeKeyProcessor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val microbatch = microbatchOf(compositeSchema)( + Row("US", 1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + compositeKeyProcessor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Seq( + Row("US", 99, Row(50L, null)), + Row("US", 1, Row(10L, null)) + )) + } + + test("mergeMicrobatchOntoAuxiliaryTable correctly merges for backticked/dotted keys") { + // Even though the column is a backticked identifier in user-facing DDL, Spark drops the + // backticks during schema resolution so the field name is the literal `user.id`. The merge + // path must propagate the user's quoted identifier through `k.quoted` so the join condition + // and update target both resolve to the same physical column. + val dottedKeySchema = customKeyAuxSchema(Seq("user.id" -> IntegerType)) + createAuxTableWithSchema( + dottedKeySchema, + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val dottedKeyProcessor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + // We expect the existing tombstone with del seq=10 to be advanced to 20 if the merge matches + // dotted keys correctly. + val microbatch = microbatchOf(dottedKeySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)) + ) + + dottedKeyProcessor.mergeMicrobatchOntoAuxiliaryTable(microbatch, auxTableIdentifier) + + checkAnswer(readAuxTable(), Row(1, Row(20L, null))) + } + + // =============== mergeMicrobatchOntoTarget tests =============== + + test("mergeMicrobatchOntoTarget updates an existing row with a newer upsert") { + createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10)))) + + val microbatch = microbatchOf(targetSchema)( + Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + processor.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + + val result = readTargetTable() + checkAnswer(result, Row(1, "new", Row(null, 20L))) + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(targetSchema)) + } + + test("mergeMicrobatchOntoTarget deletes an existing row with a newer delete") { + createTargetTable( + Row(1, "delete-me", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))), + Row(2, "keep-me", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + val microbatch = microbatchOf(targetSchema)( + Row(1, "unused", cdcMetadataRow(deleteSeq = Some(15), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + + checkAnswer(readTargetTable(), Row(2, "keep-me", Row(null, 20L))) + } + + test("mergeMicrobatchOntoTarget inserts new upserts but not new (tombstone) deletes") { + createTargetTable(Row(1, "existing", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10)))) + + val microbatch = microbatchOf(targetSchema)( + Row(2, "insert-me", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))), + Row(3, "do-not-insert", cdcMetadataRow(deleteSeq = Some(30), upsertSeq = None)) + ) + + processor.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + + checkAnswer(readTargetTable(), Seq( + Row(1, "existing", Row(null, 10L)), + Row(2, "insert-me", Row(null, 20L)) + )) + } + + test("mergeMicrobatchOntoTarget ignores stale upserts and stale deletes") { + createTargetTable( + Row(1, "target-delete-tie", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))), + Row(2, "target-newer", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + val microbatch = microbatchOf(targetSchema)( + Row(1, "delete-tie", cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)), + Row(2, "older-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(15))) + ) + + processor.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + + checkAnswer(readTargetTable(), Seq( + Row(1, "target-delete-tie", Row(null, 10L)), + Row(2, "target-newer", Row(null, 20L)) + )) + } + + test("mergeMicrobatchOntoTarget gives tied upserts priority over the target row") { + createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10)))) + + val microbatch = microbatchOf(targetSchema)( + Row(1, "same-sequence-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))) + ) + + processor.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + + checkAnswer(readTargetTable(), Row(1, "same-sequence-upsert", Row(null, 10L))) + } + + test("mergeMicrobatchOntoTarget correctly matches escaped key column names") { + // The raw key name contains special characters that would require being escaped on name + // resolution. + val rawKeyName = "a`b" + val schemaWithSpecialKeyCharacters = new StructType() + // The schema always stores the backtick consumed column name, so unticked the raw name here. + .add(rawKeyName, IntegerType) + .add("value", StringType) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + val tableInfo = new TableInfo.Builder() + .withSchema(schemaWithSpecialKeyCharacters) + .build() + spark.sessionState.catalogManager + .catalog(auxCatalogName) + .asTableCatalog + .createTable(targetIdent, tableInfo) + + val targetTableDf = spark.createDataFrame( + spark.sparkContext.parallelize(Seq( + Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))) + )), + schemaWithSpecialKeyCharacters) + targetTableDf.writeTo(targetTableIdentifier.quotedString).append() + + val processorForCustomKeySchema = processor.copy( + changeArgs = processor.changeArgs.copy( + keys = Seq(UnqualifiedColumnName(QuotingUtils.quoteIdentifier(rawKeyName))) + ) + ) + val microbatch = microbatchOf(schemaWithSpecialKeyCharacters)( + Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + processorForCustomKeySchema.mergeMicrobatchOntoTarget( + microbatch, + targetTableIdentifier + ) + + checkAnswer(readTargetTable(), Row(1, "new", Row(null, 20L))) + } + + gridTest( + "mergeMicrobatchOntoTarget key column comparison respects spark session case sensitivity" + )(Seq(false, true)) { caseSensitive => + withSQLConf("spark.sql.caseSensitive" -> caseSensitive.toString) { + createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10)))) + + val processorWithUpperCaseKey = processor.copy( + changeArgs = processor.changeArgs.copy( + keys = Seq(UnqualifiedColumnName("ID")) + ) + ) + + val microbatch = microbatchOf(targetSchema)( + Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))) + ) + + if (caseSensitive) { + val ex = intercept[AnalysisException] { + processorWithUpperCaseKey.mergeMicrobatchOntoTarget( + microbatch, + targetTableIdentifier + ) + } + // Intentionally not using checkError here, to avoid asserting on a brittle query context + // and long message parmeters list. + assert(ex.errorClass.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION")) + } else { + processorWithUpperCaseKey.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + checkAnswer(readTargetTable(), Row(1, "new", Row(null, 20L))) + } + } + } + + test("mergeMicrobatchOntoTarget is idempotent across a microbatch") { + createTargetTable( + Row(1, "delete-me", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))), + Row(2, "update-me", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20))), + Row(3, "untouched", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(30))) + ) + + val microbatch = microbatchOf(targetSchema)( + Row(1, "delete-event", cdcMetadataRow(deleteSeq = Some(15), upsertSeq = None)), + Row(2, "updated", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(25))), + Row(4, "inserted", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(40))), + Row(5, "absent-delete", cdcMetadataRow(deleteSeq = Some(50), upsertSeq = None)) + ) + + val expectedAfterMerge = Seq( + Row(2, "updated", Row(null, 25L)), + Row(3, "untouched", Row(null, 30L)), + Row(4, "inserted", Row(null, 40L)) + ) + + processor.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + checkAnswer(readTargetTable(), expectedAfterMerge) + + processor.mergeMicrobatchOntoTarget(microbatch, targetTableIdentifier) + checkAnswer(readTargetTable(), expectedAfterMerge) + } +}