Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify that non key columns exist in each dataset #517

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object DataSynchronization extends ComparisonBase {
if (columnErrors.isEmpty) {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val colsDS1 = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val nonKeyColsMatch = colsDS1.forall { col => Try { ds2(col) }.isSuccess }
val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _))

if (!nonKeyColsMatch) {
ComparisonFailed("Non key columns in the given data frames do not match.")
Expand Down Expand Up @@ -131,12 +131,23 @@ object DataSynchronization extends ComparisonBase {
colKeyMap: Map[String, String],
compCols: Map[String, String],
assertion: Double => Boolean): ComparisonResult = {
val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
if (columnErrors.isEmpty) {
val mergedMaps = colKeyMap ++ compCols
finalAssertion(ds1, ds2, mergedMaps, assertion)
val keyColumnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
if (keyColumnErrors.isEmpty) {
val nonKeyColumns1NotInDataset = compCols.keys.filterNot(columnExists(ds1, _))
val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _))

if (nonKeyColumns1NotInDataset.nonEmpty) {
ComparisonFailed(s"The following columns were not found in the first dataset: " +
s"${nonKeyColumns1NotInDataset.mkString(", ")}")
} else if (nonKeyColumns2NotInDataset.nonEmpty) {
ComparisonFailed(s"The following columns were not found in the second dataset: " +
s"${nonKeyColumns2NotInDataset.mkString(", ")}")
} else {
val mergedMaps = colKeyMap ++ compCols
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
ComparisonFailed(columnErrors.get)
ComparisonFailed(keyColumnErrors.get)
}
}

Expand All @@ -150,12 +161,27 @@ object DataSynchronization extends ComparisonBase {
val compColsEither: Either[ComparisonFailed, Map[String, String]] = if (optionalCompCols.isDefined) {
optionalCompCols.get match {
case compCols if compCols.isEmpty => Left(ComparisonFailed("Empty column comparison map provided."))
case compCols => Right(compCols)
case compCols =>
val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _))
val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _))
if (ds1CompColsNotInDataset.nonEmpty) {
Left(
ComparisonFailed(s"The following columns were not found in the first dataset: " +
s"${ds1CompColsNotInDataset.mkString(", ")}")
)
} else if (ds2CompColsNotInDataset.nonEmpty) {
Left(
ComparisonFailed(s"The following columns were not found in the second dataset: " +
s"${ds2CompColsNotInDataset.mkString(", ")}")
)
} else {
Right(compCols)
}
}
} else {
// Get all the non-key columns from DS1 and verify that they are present in DS2
val ds1NonKeyCols = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted
val nonKeyColsMatch = ds1NonKeyCols.forall { col => Try { ds2(col) }.isSuccess }
val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _))

if (!nonKeyColsMatch) {
Left(ComparisonFailed("Non key columns in the given data frames do not match."))
Expand All @@ -181,30 +207,40 @@ object DataSynchronization extends ComparisonBase {
private def areKeyColumnsValid(ds1: DataFrame,
ds2: DataFrame,
colKeyMap: Map[String, String]): Option[String] = {
// We verify that the key columns provided form a valid primary/composite key.
// To achieve this, we group the dataframes and compare their count with the original count.
// If the key columns provided are valid, then the two counts should match.
val ds1Cols = colKeyMap.keys.toSeq
val ds2Cols = colKeyMap.values.toSeq
val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count()
val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count()

val ds1Count = ds1.count()
val ds2Count = ds2.count()
val ds1UniqueCount = ds1Unique.count()
val ds2UniqueCount = ds2Unique.count()
val ds1ColsNotInDataset = ds1Cols.filterNot(columnExists(ds1, _))
val ds2ColsNotInDataset = ds2Cols.filterNot(columnExists(ds2, _))

if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) {
None
if (ds1ColsNotInDataset.nonEmpty) {
Some(s"The following key columns were not found in the first dataset: ${ds1ColsNotInDataset.mkString(", ")}")
} else if (ds2ColsNotInDataset.nonEmpty) {
Some(s"The following key columns were not found in the second dataset: ${ds2ColsNotInDataset.mkString(", ")}")
} else {
val combo1 = ds1Cols.mkString(", ")
val combo2 = ds2Cols.mkString(", ")
Some(s"The selected columns are not comparable due to duplicates present in the dataset." +
s"Comparison keys must be unique, but " +
s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," +
s" and " +
s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " +
s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2")
// We verify that the key columns provided form a valid primary/composite key.
// To achieve this, we group the dataframes and compare their count with the original count.
// If the key columns provided are valid, then the two counts should match.
val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count()
val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count()

val ds1Count = ds1.count()
val ds2Count = ds2.count()
val ds1UniqueCount = ds1Unique.count()
val ds2UniqueCount = ds2Unique.count()

if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) {
None
} else {
val combo1 = ds1Cols.mkString(", ")
val combo2 = ds2Cols.mkString(", ")
Some(s"The selected columns are not comparable due to duplicates present in the dataset." +
s"Comparison keys must be unique, but " +
s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," +
s" and " +
s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " +
s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2")
}
}
}

Expand Down Expand Up @@ -291,4 +327,6 @@ object DataSynchronization extends ComparisonBase {
.drop(ds2HashColName)
.drop(ds2KeyColsUpdatedNamesMap.values.toSeq: _*)
}

private def columnExists(df: DataFrame, col: String) = Try { df(col) }.isSuccess
}
Original file line number Diff line number Diff line change
Expand Up @@ -686,5 +686,122 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec {
val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false)
assert(expected == rowLevelResults)
}

"fails as expected when key columns do not exist" in withSparkSession { spark =>
val idColumnName = "id"
val ds1 = primaryDataset(spark, idColumnName)
val ds2 = referenceDataset(spark, idColumnName)
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match

val nonExistCol1 = "foo"
val nonExistCol2 = "bar"

// Key columns not in either dataset (Overall)
val colKeyMap1 = Map(nonExistCol1 -> nonExistCol2)
val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap1, assertion)

assert(overallResult1.isInstanceOf[ComparisonFailed])
val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed]
assert(failedOverallResult1.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedOverallResult1.errorMessage.contains(nonExistCol1))

// Key columns not in either dataset (Row level)
val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap1)
assert(rowLevelResult1.isLeft)
val failedRowLevelResult1 = rowLevelResult1.left.get
assert(failedRowLevelResult1.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedRowLevelResult1.errorMessage.contains(nonExistCol1))

// Key column not in first dataset
val colKeyMap2 = Map(nonExistCol1 -> idColumnName)
val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap2, assertion)

assert(overallResult2.isInstanceOf[ComparisonFailed])
val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed]
assert(failedOverallResult2.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedOverallResult2.errorMessage.contains(nonExistCol1))

// Key column not in first dataset (Row level)
val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap2)
assert(rowLevelResult2.isLeft)
val failedRowLevelResult2 = rowLevelResult2.left.get
assert(failedRowLevelResult2.errorMessage.contains("key columns were not found in the first dataset"))
assert(failedRowLevelResult2.errorMessage.contains(nonExistCol1))

// Key column not in second dataset
val colKeyMap3 = Map(idColumnName -> nonExistCol2)
val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap3, assertion)

assert(overallResult3.isInstanceOf[ComparisonFailed])
val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed]
assert(failedOverallResult3.errorMessage.contains("key columns were not found in the second dataset"))
assert(failedOverallResult3.errorMessage.contains(nonExistCol2))

// Key column not in second dataset (Row level)
val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap3)
assert(rowLevelResult3.isLeft)
val failedRowLevelResult3 = rowLevelResult3.left.get
assert(failedRowLevelResult3.errorMessage.contains("key columns were not found in the second dataset"))
assert(failedRowLevelResult3.errorMessage.contains(nonExistCol2))
}

"fails as expected when non-key columns do not exist" in withSparkSession { spark =>
val idColumnName = "id"
val ds1 = primaryDataset(spark, idColumnName)
val ds2 = referenceDataset(spark, idColumnName)
val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match
val colKeyMap = Map(idColumnName -> idColumnName)

val nonExistCol1 = "foo"
val nonExistCol2 = "bar"

// Non-key columns not in either dataset (Overall)
val compColsMap1 = Map(nonExistCol1 -> nonExistCol2)
val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap1, assertion)

assert(overallResult1.isInstanceOf[ComparisonFailed])
val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed]
assert(failedOverallResult1.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key columns not in either dataset (Row level)
val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap1))
assert(rowLevelResult1.isLeft)
val failedRowLevelResult1 = rowLevelResult1.left.get
assert(failedRowLevelResult1.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key column not in first dataset
val compColsMap2 = Map(nonExistCol1 -> "State")
val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap2, assertion)

assert(overallResult2.isInstanceOf[ComparisonFailed])
val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed]
assert(failedOverallResult2.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key columns not in first dataset (Row level)
val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap2))
assert(rowLevelResult2.isLeft)
val failedRowLevelResult2 = rowLevelResult2.left.get
assert(failedRowLevelResult2.errorMessage.contains(
s"The following columns were not found in the first dataset: $nonExistCol1"))

// Non-key column not in second dataset
val compColsMap3 = Map("state" -> nonExistCol2)
val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap3, assertion)

assert(overallResult3.isInstanceOf[ComparisonFailed])
val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed]
assert(failedOverallResult3.errorMessage.contains(
s"The following columns were not found in the second dataset: $nonExistCol2"))

// Non-key column not in second dataset (Row level)
val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap3))
assert(rowLevelResult3.isLeft)
val failedRowLevelResult3 = rowLevelResult3.left.get
assert(failedOverallResult3.errorMessage.contains(
s"The following columns were not found in the second dataset: $nonExistCol2"))
}
}
}