Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add data synchronization test to verification Suite. #526

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -19,17 +19,17 @@ package com.amazon.deequ.analyzers
import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.runners._
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import scala.language.existentials
import scala.util.Failure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers.metricFromFailure
import com.amazon.deequ.comparison.DataSynchronization
import com.amazon.deequ.comparison.DataSynchronizationFailed
import com.amazon.deequ.comparison.DataSynchronizationSucceeded
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import org.apache.spark.sql.DataFrame

import scala.util.Failure
import scala.util.Try


/**
* An Analyzer for Deequ that performs a data synchronization check between two DataFrames.
* It evaluates the degree of synchronization based on specified column mappings and an assertion function.
*
* The analyzer computes a ratio of synchronized data points to the total data points, represented as a DoubleMetric.
* Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for DataSynchronization implementation
*
* @param dfToCompare The DataFrame to compare with the primary DataFrame that is setup
* during [[com.amazon.deequ.VerificationSuite.onData]] setup.
* @param columnMappings A map where each key-value pair represents a column in the primary DataFrame
Copy link
Contributor

@rdsharma26 rdsharma26 Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to keyColumnMappings in next PR.

* and its corresponding column in dfToCompare.
* @param assertion A function that takes a Double (the match ratio) and returns a Boolean.
* It defines the condition for successful synchronization.
*
* Usage:
* This analyzer is used in Deequ's VerificationSuite based if `isDataSynchronized` check is defined or could be used
* manually as well.
*
* Example:
* val analyzer = DataSynchronizationAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8)
* val verificationResult = VerificationSuite().onData(df).addAnalyzer(analyzer).run()
*
* // or could do something like below
* val verificationResult = VerificationSuite().onData(df).isDataSynchronized(dfToCompare, Map("col1" -> "col2"),
* _ > 0.8).run()
*
*
* The computeStateFrom method calculates the synchronization state by comparing the specified columns of the two
* DataFrames.
* The computeMetricFrom method then converts this state into a DoubleMetric representing the synchronization ratio.
*
*/
case class DataSynchronizationAnalyzer(dfToCompare: DataFrame,
columnMappings: Map[String, String],
Copy link
Contributor

@rdsharma26 rdsharma26 Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to keyColumnMappings in next PR.

assertion: Double => Boolean)
extends Analyzer[DataSynchronizationState, DoubleMetric] {

override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = {

val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion)

result match {
case succeeded: DataSynchronizationSucceeded =>
Some(DataSynchronizationState(succeeded.passedCount, succeeded.totalCount))
case failed: DataSynchronizationFailed =>
Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0)))
case _ => None
}
}

override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = {

val metric = state match {
case Some(s) => Try(s.synchronizedDataCount.toDouble / s.totalDataCount.toDouble)
case _ => Failure(new IllegalStateException("No state available for DataSynchronizationAnalyzer"))
}

DoubleMetric(Entity.Dataset, "DataSynchronization", "", metric, None)
}

override private[deequ] def toFailureMetric(failure: Exception) =
metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers

/**
* Represents the state of data synchronization between two DataFrames in Deequ.
* This state keeps track of the count of synchronized record count and the total record count.
* It is used to calculate a ratio of synchronization, which is a measure of how well the data
* in the two DataFrames are synchronized.
*
* @param synchronizedDataCount The count of records that are considered synchronized between the two DataFrames.
* @param totalDataCount The total count of records for check.
*
* The `sum` method allows for aggregation of this state with another, combining the counts from both states.
* This is useful in distributed computations where states from different partitions need to be aggregated.
*
* The `metricValue` method computes the synchronization ratio. It is the ratio of `synchronizedDataCount`
* to `dataCount`.
* If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN`
* to indicate the undefined state.
*
*/
case class DataSynchronizationState(synchronizedDataCount: Long, totalDataCount: Long)
extends DoubleValuedState[DataSynchronizationState] {
override def sum(other: DataSynchronizationState): DataSynchronizationState = {
DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, totalDataCount + other.totalDataCount)
}

override def metricValue(): Double = {
if (totalDataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / totalDataCount.toDouble
}
}

object DataSynchronizationState
83 changes: 75 additions & 8 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -16,17 +16,29 @@

package com.amazon.deequ.checks

import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint}
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.analyzers.Analyzer
import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.DataSynchronizationAnalyzer
import com.amazon.deequ.analyzers.DataSynchronizationState
import com.amazon.deequ.analyzers.Histogram
import com.amazon.deequ.analyzers.KLLParameters
import com.amazon.deequ.analyzers.Patterns
import com.amazon.deequ.analyzers.State
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy
import com.amazon.deequ.anomalydetection.AnomalyDetector
import com.amazon.deequ.anomalydetection.DataPoint
import com.amazon.deequ.checks.ColumnCondition.isAnyNotNull
import com.amazon.deequ.checks.ColumnCondition.isEachNotNull
import com.amazon.deequ.constraints.Constraint._
import com.amazon.deequ.constraints._
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric}
import com.amazon.deequ.metrics.BucketDistribution
import com.amazon.deequ.metrics.Distribution
import com.amazon.deequ.metrics.Metric
import com.amazon.deequ.repository.MetricsRepository
import org.apache.spark.sql.DataFrame
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.expressions.UserDefinedFunction
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull}

import scala.util.matching.Regex

Expand Down Expand Up @@ -338,6 +350,59 @@ case class Check(
uniqueValueRatioConstraint(columns, assertion, filter, hint) }
}

/**
* Performs a data synchronization check between the base DataFrame supplied to
* [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's
* [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework.
* This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion.
*
* Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data
* and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]].
*
* Usage:
* To use this method, create a VerificationSuite and invoke this method as part of adding checks:
* {{{
* val baseDataFrame: DataFrame = ...
* val otherDataFrame: DataFrame = ...
* val columnMappings: Map[String, String] = Map("baseCol1" -> "otherCol1", "baseCol2" -> "otherCol2")
* val assertionFunction: Double => Boolean = _ > 0.7
*
* val check = new Check(CheckLevel.Error, "Data Synchronization Check")
* .isDataSynchronized(otherDataFrame, columnMappings, assertionFunction)
*
* val verificationResult = VerificationSuite()
* .onData(baseDataFrame)
* .addCheck(check)
* .run()
* }}}
*
* This will add a data synchronization check to the VerificationSuite, comparing the specified columns of
* baseDataFrame and otherDataFrame based on the provided assertion function.
*
*
* @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the
* current DataFrame to assess data synchronization.
* @param columnMappings A map defining the column correlations between the current DataFrame and otherDf.
* Keys represent column names in the current DataFrame,
* and values are corresponding column names in otherDf.
* @param assertion A function that takes a Double (result of the comparison) and returns a Boolean.
* Defines the condition under which the data in both DataFrames is considered synchronized.
* For example (_ > 0.7) denoting metric value > 0.7 or 70% of records.
* @param hint Optional. Additional context or information about the synchronization check.
* Helpful for understanding the intent or specifics of the check. Default is None.
* @return A [[com.amazon.deequ.checks.Check]] object representing the outcome
* of the synchronization check. This object can be used in Deequ's verification suite to
* assert data quality constraints.
*
*/
def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean,
hint: Option[String] = None): Check = {
val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion)
val constraint = AnalysisBasedConstraint[DataSynchronizationState, Double, Double](dataSyncAnalyzer, assertion,
hint = hint)
addConstraint(constraint)
}

/**
* Creates a constraint that asserts on the number of distinct values a column has.
*
Expand Down Expand Up @@ -1092,7 +1157,9 @@ case class Check(
case nc: ConstraintDecorator => nc.inner
case c: Constraint => c
}
.collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer }
.collect {
case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer
}
.map { _.asInstanceOf[Analyzer[_, Metric[_]]] }
.toSet
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -17,5 +17,10 @@
package com.amazon.deequ.comparison

sealed trait ComparisonResult
case class ComparisonFailed(errorMessage: String) extends ComparisonResult
case class ComparisonSucceeded() extends ComparisonResult

case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult
case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult
Comment on lines +21 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep 1 set of states. Right now, there are 2 sets of states. Comparison[Failed/Succeeded] vs DataSynchronization[Failed/Succeeded]. Having too many states can result in confusion for the end user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning, to consolidate once ReferentialIntegrity is also integrated into verification suite. I will handle that in next PR.


case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None,
totalCount: Option[Long] = None) extends ComparisonResult
case class DataSynchronizationSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult
Loading