Skip to content

Commit

Permalink
Verify that non key columns exist in each dataset (#517)
Browse files Browse the repository at this point in the history
* Verify that non key columns exist in each dataset

- The non existence of non key columns was resulting in a Spark SQL exception, instead of a graceful "ComparisonFailed" return value.
- In a future PR, will consolidate all the column validation logic into one single place.

* Fix failing build due to formatting issues.
  • Loading branch information
rdsharma26 committed Apr 16, 2024
1 parent 5dc3638 commit da0f298
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object DataSynchronization extends ComparisonBase {
if (columnErrors.isEmpty) {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val colsDS1 = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val nonKeyColsMatch = colsDS1.forall { col => Try { ds2(col) }.isSuccess }
val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _))

if (!nonKeyColsMatch) {
ComparisonFailed("Non key columns in the given data frames do not match.")
Expand Down Expand Up @@ -131,12 +131,23 @@ object DataSynchronization extends ComparisonBase {
colKeyMap: Map[String, String],
compCols: Map[String, String],
assertion: Double => Boolean): ComparisonResult = {
val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
if (columnErrors.isEmpty) {
val mergedMaps = colKeyMap ++ compCols
finalAssertion(ds1, ds2, mergedMaps, assertion)
val keyColumnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
if (keyColumnErrors.isEmpty) {
val nonKeyColumns1NotInDataset = compCols.keys.filterNot(columnExists(ds1, _))
val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _))

if (nonKeyColumns1NotInDataset.nonEmpty) {
ComparisonFailed(s"The following columns were not found in the first dataset: " +
s"${nonKeyColumns1NotInDataset.mkString(", ")}")
} else if (nonKeyColumns2NotInDataset.nonEmpty) {
ComparisonFailed(s"The following columns were not found in the second dataset: " +
s"${nonKeyColumns2NotInDataset.mkString(", ")}")
} else {
val mergedMaps = colKeyMap ++ compCols
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
ComparisonFailed(columnErrors.get)
ComparisonFailed(keyColumnErrors.get)
}
}

Expand All @@ -150,12 +161,27 @@ object DataSynchronization extends ComparisonBase {
val compColsEither: Either[ComparisonFailed, Map[String, String]] = if (optionalCompCols.isDefined) {
optionalCompCols.get match {
case compCols if compCols.isEmpty => Left(ComparisonFailed("Empty column comparison map provided."))
case compCols => Right(compCols)
case compCols =>
val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _))
val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _))
if (ds1CompColsNotInDataset.nonEmpty) {
Left(
ComparisonFailed(s"The following columns were not found in the first dataset: " +
s"${ds1CompColsNotInDataset.mkString(", ")}")
)
} else if (ds2CompColsNotInDataset.nonEmpty) {
Left(
ComparisonFailed(s"The following columns were not found in the second dataset: " +
s"${ds2CompColsNotInDataset.mkString(", ")}")
)
} else {
Right(compCols)
}
}
} else {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val ds1NonKeyCols = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val nonKeyColsMatch = ds1NonKeyCols.forall { col => Try { ds2(col) }.isSuccess }
val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _))

if (!nonKeyColsMatch) {
Left(ComparisonFailed("Non key columns in the given data frames do not match."))
Expand All @@ -181,30 +207,40 @@ object DataSynchronization extends ComparisonBase {
private def areKeyColumnsValid(ds1: DataFrame,
ds2: DataFrame,
colKeyMap: Map[String, String]): Option[String] = {
// We verify that the key columns provided form a valid primary/composite key.
// To achieve this, we group the dataframes and compare their count with the original count.
// If the key columns provided are valid, then the two counts should match.
val ds1Cols = colKeyMap.keys.toSeq
val ds2Cols = colKeyMap.values.toSeq
val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count()
val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count()

val ds1Count = ds1.count()
val ds2Count = ds2.count()
val ds1UniqueCount = ds1Unique.count()
val ds2UniqueCount = ds2Unique.count()
val ds1ColsNotInDataset = ds1Cols.filterNot(columnExists(ds1, _))
val ds2ColsNotInDataset = ds2Cols.filterNot(columnExists(ds2, _))

if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) {
None
if (ds1ColsNotInDataset.nonEmpty) {
Some(s"The following key columns were not found in the first dataset: ${ds1ColsNotInDataset.mkString(", ")}")
} else if (ds2ColsNotInDataset.nonEmpty) {
Some(s"The following key columns were not found in the second dataset: ${ds2ColsNotInDataset.mkString(", ")}")
} else {
val combo1 = ds1Cols.mkString(", ")
val combo2 = ds2Cols.mkString(", ")
Some(s"The selected columns are not comparable due to duplicates present in the dataset." +
s"Comparison keys must be unique, but " +
s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," +
s" and " +
s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " +
s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2")
// We verify that the key columns provided form a valid primary/composite key.
// To achieve this, we group the dataframes and compare their count with the original count.
// If the key columns provided are valid, then the two counts should match.
val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count()
val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count()

val ds1Count = ds1.count()
val ds2Count = ds2.count()
val ds1UniqueCount = ds1Unique.count()
val ds2UniqueCount = ds2Unique.count()

if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) {
None
} else {
val combo1 = ds1Cols.mkString(", ")
val combo2 = ds2Cols.mkString(", ")
Some(s"The selected columns are not comparable due to duplicates present in the dataset." +
s"Comparison keys must be unique, but " +
s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," +
s" and " +
s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " +
s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2")
}
}
}

Expand Down Expand Up @@ -291,4 +327,6 @@ object DataSynchronization extends ComparisonBase {
.drop(ds2HashColName)
.drop(ds2KeyColsUpdatedNamesMap.values.toSeq: _*)
}

private def columnExists(df: DataFrame, col: String) = Try { df(col) }.isSuccess
}
Original file line number Diff line number Diff line change
Expand Up @@ -686,5 +686,122 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec {
val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false)
assert(expected == rowLevelResults)
}

"fails as expected when key columns do not exist" in withSparkSession { spark =>
val idColumnName = "id"
val ds1 = primaryDataset(spark, idColumnName)
val ds2 = referenceDataset(spark, idColumnName)
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match

val nonExistCol1 = "foo"
val nonExistCol2 = "bar"

// Key columns not in either dataset (Overall)
val colKeyMap1 = Map(nonExistCol1 -> nonExistCol2)
val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap1, assertion)

assert(overallResult1.isInstanceOf[ComparisonFailed])
val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed]
assert(failedOverallResult1.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedOverallResult1.errorMessage.contains(nonExistCol1))

// Key columns not in either dataset (Row level)
val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap1)
assert(rowLevelResult1.isLeft)
val failedRowLevelResult1 = rowLevelResult1.left.get
assert(failedRowLevelResult1.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedRowLevelResult1.errorMessage.contains(nonExistCol1))

// Key column not in first dataset
val colKeyMap2 = Map(nonExistCol1 -> idColumnName)
val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap2, assertion)

assert(overallResult2.isInstanceOf[ComparisonFailed])
val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed]
assert(failedOverallResult2.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedOverallResult2.errorMessage.contains(nonExistCol1))

// Key column not in first dataset (Row level)
val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap2)
assert(rowLevelResult2.isLeft)
val failedRowLevelResult2 = rowLevelResult2.left.get
assert(failedRowLevelResult2.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedRowLevelResult2.errorMessage.contains(nonExistCol1))

// Key column not in second dataset
val colKeyMap3 = Map(idColumnName -> nonExistCol2)
val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap3, assertion)

assert(overallResult3.isInstanceOf[ComparisonFailed])
val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed]
assert(failedOverallResult3.errorMessage.contains("key columns were not found in the second dataset"))
assert(failedOverallResult3.errorMessage.contains(nonExistCol2))

// Key column not in second dataset (Row level)
val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap3)
assert(rowLevelResult3.isLeft)
val failedRowLevelResult3 = rowLevelResult3.left.get
assert(failedRowLevelResult3.errorMessage.contains("key columns were not found in the second dataset"))
assert(failedRowLevelResult3.errorMessage.contains(nonExistCol2))
}

"fails as expected when non-key columns do not exist" in withSparkSession { spark =>
val idColumnName = "id"
val ds1 = primaryDataset(spark, idColumnName)
val ds2 = referenceDataset(spark, idColumnName)
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
val colKeyMap = Map(idColumnName -> idColumnName)

val nonExistCol1 = "foo"
val nonExistCol2 = "bar"

// Non-key columns not in either dataset (Overall)
val compColsMap1 = Map(nonExistCol1 -> nonExistCol2)
val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap1, assertion)

assert(overallResult1.isInstanceOf[ComparisonFailed])
val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed]
assert(failedOverallResult1.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key columns not in either dataset (Row level)
val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap1))
assert(rowLevelResult1.isLeft)
val failedRowLevelResult1 = rowLevelResult1.left.get
assert(failedRowLevelResult1.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key column not in first dataset
val compColsMap2 = Map(nonExistCol1 -> "State")
val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap2, assertion)

assert(overallResult2.isInstanceOf[ComparisonFailed])
val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed]
assert(failedOverallResult2.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key columns not in first dataset (Row level)
val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap2))
assert(rowLevelResult2.isLeft)
val failedRowLevelResult2 = rowLevelResult2.left.get
assert(failedRowLevelResult2.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key column not in second dataset
val compColsMap3 = Map("state" -> nonExistCol2)
val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap3, assertion)

assert(overallResult3.isInstanceOf[ComparisonFailed])
val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed]
assert(failedOverallResult3.errorMessage.contains(
s"The following columns were not found in the second dataset: $nonExistCol2"))

// Non-key column not in second dataset (Row level)
val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap3))
assert(rowLevelResult3.isLeft)
val failedRowLevelResult3 = rowLevelResult3.left.get
assert(failedOverallResult3.errorMessage.contains(
s"The following columns were not found in the second dataset: $nonExistCol2"))
}
}
}

0 comments on commit da0f298

Please sign in to comment.