Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add data synchronization test to verification Suite. #526

Merged
merged 3 commits into from
Jan 9, 2024

Conversation

VenkataKarthikP
Copy link
Contributor

@VenkataKarthikP VenkataKarthikP commented Dec 26, 2023

*Issue, if available: #501

Description of changes: Adding data synchronization check to verification suite, with this change users can define isDataSynchronized check.

Example usage -

val verificationResult = VerificationSuite()
  .onData(data)
  .addCheck(Check(CheckLevel.Error, "must have data in sync")
                                      .isDataSynchronized(dfToCompare, Map("id" -> "id"), _ > 0.7)
  .run()

cc: @mentekid @rdsharma26
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Copy link
Contributor

@rdsharma26 rdsharma26 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @VenkataKarthikP for this PR! I've left some comments.

Note: We use this check for the DatasetMatch rule in AWS Glue Data Quality. In a future PR, we can rename DataSynchronization to DatasetMatch for consistency purposes.

src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala Outdated Show resolved Hide resolved
src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala Outdated Show resolved Hide resolved
src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala Outdated Show resolved Hide resolved
* Data Synchronization Analyzer
*
* @param dfToCompare DataFrame to compare
* @param columnMappings columns mappings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can add more documentation here. Are these the key column mappings? What about the other parameter, comparison column mappings? Will that be added in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, will update documentation.
I was planning to do this in phased manner instead of one big PR, will definitely do a follow up PR to add that as well.

src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala Outdated Show resolved Hide resolved
Comment on lines +21 to +22
case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult
case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep 1 set of states. Right now, there are 2 sets of states. Comparison[Failed/Succeeded] vs DataSynchronization[Failed/Succeeded]. Having too many states can result in confusion for the end user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning, to consolidate once ReferentialIntegrity is also integrated into verification suite. I will handle that in next PR.

.collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer }
.collect {
case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer
case constraint: DataSynchronizationConstraint => constraint.analyzer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the existing statement match DataSynchronizationConstraint ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, yes we don't need this.


override def evaluate(metrics: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult = {

val anz = Try(metrics.filter(i => i._1.isInstanceOf[DataSynchronizationAnalyzer]).head._2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write it as the following, for better readability?

val (_, anz) = Try(metrics.filter { case(analyzer, _)  => analyzer.isInstanceOf[DataSynchronizationAnalyzer] }.head)

What happens if .head is called on empty list? Is that possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will refactor to make it improve readability, thanks for suggestion.

@VenkataKarthikP
Copy link
Contributor Author

@rdsharma26 thanks for the review, updated with review comments.

*
* @param dfToCompare The DataFrame to compare with the primary DataFrame that is setup
* during [[com.amazon.deequ.VerificationSuite.onData]] setup.
* @param columnMappings A map where each key-value pair represents a column in the primary DataFrame
Copy link
Contributor

@rdsharma26 rdsharma26 Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to keyColumnMappings in next PR.

*
*/
case class DataSynchronizationAnalyzer(dfToCompare: DataFrame,
columnMappings: Map[String, String],
Copy link
Contributor

@rdsharma26 rdsharma26 Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to keyColumnMappings in next PR.

Copy link
Contributor

@rdsharma26 rdsharma26 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks again for the PR! Please address the following in next PR.

  • columnMappings should be renamed to keyColumnMappings
  • Add matchColumnMappings
  • [If possible] Change DataSynchronization to DatasetMatch

@rdsharma26 rdsharma26 merged commit f3222c1 into awslabs:master Jan 9, 2024
1 check passed
eycho-am pushed a commit that referenced this pull request Feb 21, 2024
* add data synchronization test to verification suite

* review comments

* update test and doc strings
eycho-am pushed a commit that referenced this pull request Feb 21, 2024
* add data synchronization test to verification suite

* review comments

* update test and doc strings
rdsharma26 pushed a commit that referenced this pull request Apr 16, 2024
* add data synchronization test to verification suite

* review comments

* update test and doc strings
rdsharma26 pushed a commit that referenced this pull request Apr 16, 2024
* add data synchronization test to verification suite

* review comments

* update test and doc strings
rdsharma26 pushed a commit that referenced this pull request Apr 16, 2024
* add data synchronization test to verification suite

* review comments

* update test and doc strings
rdsharma26 pushed a commit that referenced this pull request Apr 17, 2024
* add data synchronization test to verification suite

* review comments

* update test and doc strings
rdsharma26 pushed a commit that referenced this pull request Apr 17, 2024
* add data synchronization test to verification suite

* review comments

* update test and doc strings
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants