Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.pipelines.autocdc
import org.apache.spark.SparkException
import org.apache.spark.sql.{functions => F, AnalysisException}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.types.{DataType, StructField, StructType}
Expand Down Expand Up @@ -212,6 +213,151 @@ case class Scd1BatchProcessor(
)
}

/**
* Merge the reconciled (deduplicated per key) microbatch onto the auxiliary table,
* advancing or deleting existing tombstones and inserting new tombstones for previously
* untracked keys.
*
* After the merge, the auxiliary table has the same schema as before, but with the latest
* tombstone data per key.
*
* @param reconciledMicrobatchDf The deduplicated microbatch.
* @param auxiliaryTableIdentifier The identifier (not a [[DataFrame]]) of the auxiliary
* table, as required by the `mergeInto(table, condition)`
* API.
*/
def mergeMicrobatchOntoAuxiliaryTable(
reconciledMicrobatchDf: DataFrame,
auxiliaryTableIdentifier: TableIdentifier
): Unit = {
val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
val meta = Scd1BatchProcessor.cdcMetadataColName

// Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are
// irrelevant for the auxiliary table and should not be persisted.
val reducedMicrobatch = reconciledMicrobatchDf
.select(changeArgs.keys.map(k => F.col(k.quoted)) :+ F.col(meta): _*)
.as("reducedMicrobatch")

val microbatchCdcMetadata: Column = F.col(s"reducedMicrobatch.`$meta`")
val incomingDelete: Column = Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata)
val incomingUpsert: Column = Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)

val auxCdcMetadata: Column = F.col(s"$auxIdentQuoted.`$meta`")
val auxDelete: Column = Scd1BatchProcessor.deleteSequenceOf(auxCdcMetadata)

val doKeysMatch = changeArgs.keys
.map(k => F.col(s"reducedMicrobatch.${k.quoted}") === F.col(s"$auxIdentQuoted.${k.quoted}"))
.reduce(_ && _)

val incomingRowRepresentsDeleteEvent =
incomingDelete.isNotNull && (incomingUpsert.isNull || incomingDelete > incomingUpsert)

reducedMicrobatch
.mergeInto(auxIdentQuoted, doKeysMatch)
// Incoming delete is newer than the stored one: advance the high-water mark.
.whenMatched(
incomingRowRepresentsDeleteEvent && incomingDelete > auxDelete
)
.update(Map(s"$auxIdentQuoted.`$meta`" -> microbatchCdcMetadata))
// Incoming upsert is newer than the stored delete: the key was re-inserted after the
// delete, so the aux tombstone is stale - remove it to prevent unbounded growth.
.whenMatched(
!incomingRowRepresentsDeleteEvent && incomingUpsert >= auxDelete
)
.delete()
// New delete for a key not yet tracked, add it to auxiliary table. Note that in the
// reconciled microbatch, there is at most one event for key, which represents the latest
// known event for the key. If the latest known event is a delete, it must be a tombstone.
.whenNotMatched(incomingRowRepresentsDeleteEvent)
.insertAll()
.merge()
}

/**
* Merge the reconciled (deduplicated, tombstone applied, and column selection + metadata
* column projected) microbatch onto the target table, as per SCD1 semantics.
*
* Microbatch invariants:
* - Exactly one of {upsert, delete} version is non-null, the other is null.
* - There is at most one event per key, representing the latest known event for the key
* across the microbatch and auxiliary table.
*
* Target table invariants:
* - Target table only contains live rows; delete sequence is always null, upsert sequence
* is always non-null.
*
* @param reconciledMicrobatchDf The reconciled microbatch dataframe.
* @param targetTableIdentifier The identifier (not a [[DataFrame]]) of the target
* table, as required by the `mergeInto(table, condition)`
* API.
*/
def mergeMicrobatchOntoTarget(
reconciledMicrobatchDf: DataFrame,
targetTableIdentifier: TableIdentifier
): Unit = {
val meta = Scd1BatchProcessor.cdcMetadataColName

val destinationTableStr = targetTableIdentifier.quotedString
// (Re-)alias the reconciled microbatch DF for easy reference for the remainder of the merge.
val microbatchDf = reconciledMicrobatchDf.as("microbatch")

val microbatchCdcMetadataCol = F.col(s"microbatch.`$meta`")
val destinationCdcMetadataCol =
F.col(s"$destinationTableStr.`$meta`")

val microbatchDeleteVersionField =
Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadataCol)
val microbatchUpsertVersionField =
Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadataCol)
val destinationUpsertVersionField =
Scd1BatchProcessor.upsertSequenceOf(destinationCdcMetadataCol)

val keysMatch = changeArgs.keys
.map(k =>
F.col(s"microbatch.${k.quoted}") === F.col(s"$destinationTableStr.${k.quoted}")
)
.reduce(_ && _)

// Upsert beats existing row if incoming upsert sequence is geq to the upsert sequence on
// the target.
val incomingWinsUpsert = microbatchUpsertVersionField.isNotNull &&
microbatchUpsertVersionField >= destinationUpsertVersionField

// Delete beats existing row if delete sequencing is strictly greater than the upsert
// sequence on the target. This is an arbitrary but deliberate choice to maintain that
// upserts get priority over deletes on duplicate sequencing.
val incomingWinsDelete = microbatchDeleteVersionField.isNotNull &&
microbatchDeleteVersionField > destinationUpsertVersionField

// When the incoming upsert wins against an existing record, the entire row (all columns)
// will be overwritten, including the CDC metadata column. We only exclude keys because
// most merge implementations require that join columns are not being mutated, even if
// the mutation is a no-op.
val resolver = microbatchDf.sparkSession.sessionState.conf.resolver
val keyNames = changeArgs.keys.map(_.name)
val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] =
microbatchDf.columns
.filterNot(c => keyNames.exists(resolver(_, c)))
.map { c =>
val quotedCol = QuotingUtils.quoteIdentifier(c)
s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
}
.toMap

microbatchDf
.mergeInto(destinationTableStr, keysMatch)
.whenMatched(incomingWinsDelete)
.delete()
.whenMatched(incomingWinsUpsert)
.update(columnsToUpdateWhenIncomingWinsUpsert)
// New key: only insert upserts; deletes for absent keys are no-ops for the target table
// merge, and instead would have been inserted as tombstones into the auxiliary table.
.whenNotMatched(microbatchDeleteVersionField.isNull)
.insertAll()
.merge()
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
Expand Down
Loading