Skip to content

Commit

Permalink
add data synchronization test to verification suite
Browse files Browse the repository at this point in the history
  • Loading branch information
VenkataKarthikP committed Dec 26, 2023
1 parent 1fc09e1 commit 6461856
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 62 deletions.
64 changes: 54 additions & 10 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package com.amazon.deequ.analyzers
import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.runners._
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.comparison.{DataSynchronization, DataSynchronizationFailed, DataSynchronizationSucceeded}
import com.amazon.deequ.metrics.{DoubleMetric, Entity, FullColumn, Metric}
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import scala.language.existentials
import scala.util.Failure
import scala.util.Success
import scala.util.{Failure, Success, Try}

/**
* A state (sufficient statistic) computed from data, from which we can compute a metric.
Expand Down Expand Up @@ -299,6 +293,56 @@ abstract class GroupingAnalyzer[S <: State[_], +M <: Metric[_]] extends Analyzer
}
}

/**
* Data Synchronization Analyzer
*
* @param dfToCompare DataFrame to compare
* @param columnMappings columns mappings
* @param assertion assertion logic
*/
case class DataSynchronizationAnalyzer(dfToCompare: DataFrame,
columnMappings: Map[String, String],
assertion: Double => Boolean)
extends Analyzer[DataSynchronizationState, DoubleMetric] {

override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = {

val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion)

result match {
case succeeded: DataSynchronizationSucceeded =>
Some(DataSynchronizationState(succeeded.passedCount.getOrElse(0), succeeded.totalCount.getOrElse(0)))
case failed: DataSynchronizationFailed =>
Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0)))
case _ => None
}
}

override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = {

state match {
case Some(s) => DoubleMetric(
Entity.Dataset,
"DataSynchronization",
"",
Try(s.synchronizedDataCount.toDouble / s.dataCount.toDouble),
None
)
case None => DoubleMetric(
Entity.Dataset,
"DataSynchronization",
"",
Try(0.0),
None
)
}
}

override private[deequ] def toFailureMetric(failure: Exception) =
metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset)
}


/** Helper method to check conditions on the schema of the data */
object Preconditions {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers

/**
* To store state of DataSynchronization
*
* @param synchronizedDataCount - Count Of rows that are in sync
* @param dataCount - total count of records to caluculate ratio.
*/
case class DataSynchronizationState(synchronizedDataCount: Long, dataCount: Long)
extends DoubleValuedState[DataSynchronizationState] {
override def sum(other: DataSynchronizationState): DataSynchronizationState = {
DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, dataCount + other.dataCount)
}

override def metricValue(): Double = {
if (dataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / dataCount.toDouble
}
}

object DataSynchronizationState
43 changes: 39 additions & 4 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -16,8 +16,7 @@

package com.amazon.deequ.checks

import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.analyzers.{Analyzer, AnalyzerOptions, DataSynchronizationState, DataSynchronizationAnalyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint}
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.constraints.Constraint._
Expand All @@ -27,6 +26,7 @@ import com.amazon.deequ.repository.MetricsRepository
import org.apache.spark.sql.expressions.UserDefinedFunction
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull}
import org.apache.spark.sql.DataFrame

import scala.util.matching.Regex

Expand Down Expand Up @@ -338,6 +338,38 @@ case class Check(
uniqueValueRatioConstraint(columns, assertion, filter, hint) }
}

/**
* Performs a data synchronization check between the base DataFrame supplied to
* [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's
* [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework.
* This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion.
*
* Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data
* and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]].
*
* @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the
* current DataFrame to assess data synchronization.
* @param columnMappings A map defining the column correlations between the current DataFrame and otherDf.
* Keys represent column names in the current DataFrame,
* and values are corresponding column names in otherDf.
* @param assertion A function that takes a Double (result of the comparison) and returns a Boolean.
* Defines the condition under which the data in both DataFrames is considered synchronized.
* For example (_ > 0.7) denoting metric value > 0.7 or 70% of records.
* @param hint Optional. Additional context or information about the synchronization check.
* Helpful for understanding the intent or specifics of the check. Default is None.
* @return A [[com.amazon.deequ.checks.Check]] object representing the outcome
* of the synchronization check. This object can be used in Deequ's verification suite to
* assert data quality constraints.
*/
def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean,
hint: Option[String] = None): Check = {

val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion)
val constraint = DataSynchronizationConstraint(dataSyncAnalyzer, hint)
addConstraint(constraint)

}

/**
* Creates a constraint that asserts on the number of distinct values a column has.
*
Expand Down Expand Up @@ -1092,7 +1124,10 @@ case class Check(
case nc: ConstraintDecorator => nc.inner
case c: Constraint => c
}
.collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer }
.collect {
case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer
case constraint: DataSynchronizationConstraint => constraint.analyzer
}
.map { _.asInstanceOf[Analyzer[_, Metric[_]]] }
.toSet
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -17,5 +17,11 @@
package com.amazon.deequ.comparison

sealed trait ComparisonResult
case class ComparisonFailed(errorMessage: String) extends ComparisonResult
case class ComparisonSucceeded() extends ComparisonResult

case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult
case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult

case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None,
totalCount: Option[Long] = None) extends ComparisonResult
case class DataSynchronizationSucceeded(passedCount: Option[Long] = None, totalCount: Option[Long] = None)
extends ComparisonResult
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand Down Expand Up @@ -107,7 +107,7 @@ object DataSynchronization extends ComparisonBase {
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
ComparisonFailed(columnErrors.get)
DataSynchronizationFailed(columnErrors.get)
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ object DataSynchronization extends ComparisonBase {
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
ComparisonFailed(keyColumnErrors.get)
DataSynchronizationFailed(keyColumnErrors.get)
}
}

Expand Down Expand Up @@ -260,12 +260,15 @@ object DataSynchronization extends ComparisonBase {
.reduce((e1, e2) => e1 && e2)

val joined = ds1.join(ds2, joinExpression, "inner")
val ratio = joined.count().toDouble / ds1Count
val passedCount = joined.count()
val totalCount = ds1Count
val ratio = passedCount.toDouble / totalCount.toDouble

if (assertion(ratio)) {
ComparisonSucceeded()
DataSynchronizationSucceeded(Some(passedCount), Some(totalCount))
} else {
ComparisonFailed(s"Value: $ratio does not meet the constraint requirement.")
DataSynchronizationFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" +
s"requirement.", Some(passedCount), Some(totalCount))
}
}
}
Expand Down
29 changes: 28 additions & 1 deletion src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package com.amazon.deequ.constraints

import com.amazon.deequ.analyzers._
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric}
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, DoubleMetric, Metric}
import org.apache.spark.sql.expressions.UserDefinedFunction

import scala.util.{Success, Try}
import scala.util.matching.Regex

object ConstraintStatus extends Enumeration {
Expand Down Expand Up @@ -897,3 +898,29 @@ object Constraint {
}

}

/**
* Data Synchronization Constraint
* @param analyzer Data Synchronization Analyzer
* @param hint hint
*/
case class DataSynchronizationConstraint(analyzer: DataSynchronizationAnalyzer, hint: Option[String])
extends Constraint {

override def evaluate(metrics: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult = {

val anz = Try(metrics.filter(i => i._1.isInstanceOf[DataSynchronizationAnalyzer]).head._2)
anz match {
case Success(m: DoubleMetric) =>
val result = m.value match {
case Success(value) => analyzer.assertion(value)
case _ => false
}
val status = if (result) ConstraintStatus.Success else ConstraintStatus.Failure
ConstraintResult(this, status, hint, Some(m))

case _ =>
ConstraintResult(this, ConstraintStatus.Failure, hint, anz.toOption)
}
}
}
11 changes: 8 additions & 3 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package com.amazon.deequ
import com.amazon.deequ.analyzers._
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.anomalydetection.AbsoluteChangeStrategy
import com.amazon.deequ.checks.Check
import com.amazon.deequ.checks.CheckLevel
import com.amazon.deequ.checks.CheckStatus
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.{Constraint, ConstraintResult}
import com.amazon.deequ.io.DfsUtils
import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric}
Expand Down Expand Up @@ -806,6 +804,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val complianceCheckThatShouldFailCompleteness = Check(CheckLevel.Error, "shouldErrorStringType")
.hasCompleteness("fake", x => x > 0)

val checkHasDataInSyncTest = Check(CheckLevel.Error, "shouldSucceedForAge")
.isDataSynchronized(df, Map("age" -> "age"), _ > 0.99, Some("shouldpass"))

val verificationResult = VerificationSuite()
.onData(df)
.addCheck(checkThatShouldSucceed)
Expand All @@ -815,6 +816,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
.addCheck(checkThatShouldFail)
.addCheck(complianceCheckThatShouldFail)
.addCheck(complianceCheckThatShouldFailCompleteness)
.addCheck(checkHasDataInSyncTest)
.run()

val checkSuccessResult = verificationResult.checkResults(checkThatShouldSucceed)
Expand Down Expand Up @@ -846,6 +848,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
checkFailedCompletenessResult.constraintResults.map(_.message) shouldBe
List(Some("Input data does not include column fake!"))
assert(checkFailedCompletenessResult.status == CheckStatus.Error)

val checkDataSyncResult = verificationResult.checkResults(checkHasDataInSyncTest)
checkDataSyncResult.status shouldBe CheckStatus.Success
}

"Well-defined checks should produce correct result even if another check throws an exception" in withSparkSession {
Expand Down
Loading

0 comments on commit 6461856

Please sign in to comment.