Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/JoinBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,25 @@ abstract class JoinBase(joinConf: api.Join,
logger.info(s"printing results for joinPart: ${joinConf.metaData.name}::${joinPart.groupBy.metaData.name}")
rightDfWithDerivations.prettyPrint()
}

if (tableUtils.joinPartUniquenessCheck) {
// Uniqueness check on key columns (+ ts for temporal cases) - throw error if violated
val hasTimeColumn = rightDfWithDerivations.columns.contains(Constants.TimeColumn)
val keyColumns = joinPart.groupBy.keyColumns.toScala ++
(if (hasTimeColumn) Seq(Constants.TimeColumn) else Seq.empty) :+
tableUtils.partitionColumn
val totalCount = rightDfWithDerivations.count()
val distinctCount = rightDfWithDerivations.select(keyColumns.map(col): _*).distinct().count()

if (totalCount != distinctCount) {
throw new IllegalStateException(
s"Uniqueness check failed for join part ${joinPart.groupBy.metaData.name}: " +
s"total rows = $totalCount, distinct keys = $distinctCount. " +
s"Key columns: ${keyColumns.mkString(", ")}"
)
}
}

Some(rightDfWithDerivations)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ case class TableUtils(sparkSession: SparkSession) {
val chrononAvroSchemaValidation: Boolean =
sparkSession.conf.get("spark.chronon.avroSchemaValidation", "true").toBoolean

// whether or not to enable joinPart (key col, ts) uniqueness check
val joinPartUniquenessCheck: Boolean =
sparkSession.conf.get("spark.chronon.joinPartUniquenessCheck", "false").toBoolean

private lazy val tableFormatProvider: FormatProvider = {
sparkSession.conf.getOption("spark.chronon.table.format_provider") match {
case Some(clazzName) =>
Expand Down