Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -907,20 +907,28 @@ object Extensions {
join
}

/*
* Compute variants of semantic_hash with different flags. A flag is stored on Hive metadata and used to
* indicate which version of semantic_hash logic to use.
*/
def semanticHash(excludeTopic: Boolean): Map[String, String] = {
/**
* Compute variants of semantic_hash with different flags. A flag is stored on Hive metadata and used to
* indicate which version of semantic_hash logic to use.
* @param excludeTopic exclude streaming topic for any potential streaming topic migrations
* @param excludeBackfillStartDateInJoinPart exclude the backfill start date in the join part, it is intended to
* trigger separate group by backfill jobs, and semantic hash is only
* used in join for versioning check, therefore, exclude it for more
* flexibility of group by backfill jobs
*/
def semanticHash(excludeTopic: Boolean, excludeBackfillStartDateInJoinPart: Boolean = true): Map[String, String] = {
// WARN: deepCopy doesn't guarantee same semantic_hash will be produced due to reordering of map keys
// but the behavior is deterministic
val joinCopy = if (excludeTopic || excludeBackfillStartDateInJoinPart) join.deepCopy() else join

if (excludeTopic) {
// WARN: deepCopy doesn't guarantee same semantic_hash will be produced due to reordering of map keys
// but the behavior is deterministic
val joinCopy = join.deepCopy()
cleanTopic(joinCopy)
joinCopy.baseSemanticHash
} else {
baseSemanticHash
}

if (excludeBackfillStartDateInJoinPart) {
joinCopy.getJoinParts.toScala.foreach(_.groupBy.unsetBackfillStartDate())
}
joinCopy.baseSemanticHash
}

/*
Expand Down