-
Notifications
You must be signed in to change notification settings - Fork 538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Min/Max] Apply filtered row behavior at the row level evaluation #543
Changes from 1 commit
42d2425
8fb010c
1195dcf
724df47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,13 +41,10 @@ import org.scalamock.scalatest.MockFactory | |
import org.scalatest.Matchers | ||
import org.scalatest.WordSpec | ||
|
||
|
||
|
||
class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec | ||
with FixtureSupport with MockFactory { | ||
|
||
"Verification Suite" should { | ||
|
||
"return the correct verification status regardless of the order of checks" in | ||
withSparkSession { sparkSession => | ||
|
||
|
@@ -374,11 +371,11 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec | |
|
||
// filtered rows 1, 2, 3 (where item > 3) | ||
val minRowLevel = resultData.select(expectedColumn4).collect().map(r => r.getAs[Any](0)) | ||
assert(Seq(true, true, true, true, true, true).sameElements(minRowLevel)) | ||
assert(Seq(null, null, null, true, true, true).sameElements(minRowLevel)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these test were written with the intention that without specifying analyzer options, the default behavior would be filtered rows are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted the change. |
||
|
||
// filtered rows 4, 5, 6 (where item < 4) | ||
val maxRowLevel = resultData.select(expectedColumn5).collect().map(r => r.getAs[Any](0)) | ||
assert(Seq(true, true, true, true, true, true).sameElements(maxRowLevel)) | ||
assert(Seq(true, true, true, null, null, null).sameElements(maxRowLevel)) | ||
|
||
// filtered rows 4, 5, 6 (where item < 4) | ||
val rowLevel6 = resultData.select(expectedColumn6).collect().map(r => r.getAs[Any](0)) | ||
|
@@ -1609,6 +1606,121 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec | |
} | ||
} | ||
|
||
"Verification Suite with == based Min/Max checks and filtered row behavior" should { | ||
val col1 = "att1" | ||
val col2 = "att2" | ||
val col3 = "att3" | ||
|
||
val check1Description = "equality-check-1" | ||
val check2Description = "equality-check-2" | ||
val check3Description = "equality-check-3" | ||
|
||
val check1WhereClause = "att1 > 3" | ||
val check2WhereClause = "att2 > 4" | ||
val check3WhereClause = "att3 = 0" | ||
|
||
def mkEqualityCheck1(analyzerOptions: AnalyzerOptions): Check = new Check(CheckLevel.Error, check1Description) | ||
.hasMin(col1, _ == 4, analyzerOptions = Some(analyzerOptions)).where(check1WhereClause) | ||
.hasMax(col1, _ == 4, analyzerOptions = Some(analyzerOptions)).where(check1WhereClause) | ||
|
||
def mkEqualityCheck2(analyzerOptions: AnalyzerOptions): Check = new Check(CheckLevel.Error, check2Description) | ||
.hasMin(col2, _ == 7, analyzerOptions = Some(analyzerOptions)).where(check2WhereClause) | ||
.hasMax(col2, _ == 7, analyzerOptions = Some(analyzerOptions)).where(check2WhereClause) | ||
|
||
def mkEqualityCheck3(analyzerOptions: AnalyzerOptions): Check = new Check(CheckLevel.Error, check3Description) | ||
.hasMin(col3, _ == 0, analyzerOptions = Some(analyzerOptions)).where(check3WhereClause) | ||
.hasMax(col3, _ == 0, analyzerOptions = Some(analyzerOptions)).where(check3WhereClause) | ||
|
||
def getRowLevelResults(df: DataFrame): Seq[java.lang.Boolean] = | ||
df.collect().map { r => r.getAs[java.lang.Boolean](0) }.toSeq | ||
|
||
def assertCheckResults(verificationResult: VerificationResult): Unit = { | ||
val passResult = verificationResult.checkResults | ||
val equalityCheck1Result = passResult.values.find(_.check.description == check1Description) | ||
val equalityCheck2Result = passResult.values.find(_.check.description == check2Description) | ||
val equalityCheck3Result = passResult.values.find(_.check.description == check3Description) | ||
|
||
assert(equalityCheck1Result.isDefined && equalityCheck1Result.get.status == CheckStatus.Error) | ||
assert(equalityCheck2Result.isDefined && equalityCheck2Result.get.status == CheckStatus.Error) | ||
assert(equalityCheck3Result.isDefined && equalityCheck3Result.get.status == CheckStatus.Success) | ||
} | ||
|
||
def assertRowLevelResults(rowLevelResults: DataFrame, | ||
analyzerOptions: AnalyzerOptions): Unit = { | ||
val equalityCheck1Results = getRowLevelResults(rowLevelResults.select(check1Description)) | ||
val equalityCheck2Results = getRowLevelResults(rowLevelResults.select(check2Description)) | ||
val equalityCheck3Results = getRowLevelResults(rowLevelResults.select(check3Description)) | ||
|
||
val filteredOutcome: java.lang.Boolean = analyzerOptions.filteredRow match { | ||
case FilteredRowOutcome.TRUE => true | ||
case FilteredRowOutcome.NULL => null | ||
} | ||
assert(equalityCheck1Results == Seq(filteredOutcome, filteredOutcome, filteredOutcome, true, false, false)) | ||
assert(equalityCheck2Results == Seq(filteredOutcome, filteredOutcome, filteredOutcome, false, false, true)) | ||
assert(equalityCheck3Results == Seq(true, true, true, filteredOutcome, filteredOutcome, filteredOutcome)) | ||
} | ||
|
||
def assertMetrics(metricsDF: DataFrame): Unit = { | ||
val metricsMap: Map[String, Double] = metricsDF.collect().map { r => | ||
val colName = r.getAs[String]("instance") | ||
val metricName = r.getAs[String]("name") | ||
val metricValue = r.getAs[Double]("value") | ||
s"$colName|$metricName" -> metricValue | ||
}.toMap | ||
|
||
assert(metricsMap(s"$col1|Minimum (where: $check1WhereClause)") == 4.0) | ||
assert(metricsMap(s"$col1|Maximum (where: $check1WhereClause)") == 6.0) | ||
assert(metricsMap(s"$col2|Minimum (where: $check2WhereClause)") == 5.0) | ||
assert(metricsMap(s"$col2|Maximum (where: $check2WhereClause)") == 7.0) | ||
assert(metricsMap(s"$col3|Minimum (where: $check3WhereClause)") == 0.0) | ||
assert(metricsMap(s"$col3|Maximum (where: $check3WhereClause)") == 0.0) | ||
} | ||
|
||
"mark filtered rows to null" in withSparkSession { | ||
sparkSession => | ||
val df = getDfWithNumericValues(sparkSession) | ||
val analyzerOptions = AnalyzerOptions(filteredRow = FilteredRowOutcome.NULL) | ||
|
||
val equalityCheck1 = mkEqualityCheck1(analyzerOptions) | ||
val equalityCheck2 = mkEqualityCheck2(analyzerOptions) | ||
val equalityCheck3 = mkEqualityCheck3(analyzerOptions) | ||
|
||
val verificationResult = VerificationSuite() | ||
.onData(df) | ||
.addChecks(Seq(equalityCheck1, equalityCheck2, equalityCheck3)) | ||
.run() | ||
|
||
val rowLevelResultsDF = VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df) | ||
val metricsDF = VerificationResult.successMetricsAsDataFrame(sparkSession, verificationResult) | ||
|
||
assertCheckResults(verificationResult) | ||
assertRowLevelResults(rowLevelResultsDF, analyzerOptions) | ||
assertMetrics(metricsDF) | ||
} | ||
|
||
"mark filtered rows to true" in withSparkSession { | ||
sparkSession => | ||
val df = getDfWithNumericValues(sparkSession) | ||
val analyzerOptions = AnalyzerOptions(filteredRow = FilteredRowOutcome.TRUE) | ||
|
||
val equalityCheck1 = mkEqualityCheck1(analyzerOptions) | ||
val equalityCheck2 = mkEqualityCheck2(analyzerOptions) | ||
val equalityCheck3 = mkEqualityCheck3(analyzerOptions) | ||
|
||
val verificationResult = VerificationSuite() | ||
.onData(df) | ||
.addChecks(Seq(equalityCheck1, equalityCheck2, equalityCheck3)) | ||
.run() | ||
|
||
val rowLevelResultsDF = VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df) | ||
val metricsDF = VerificationResult.successMetricsAsDataFrame(sparkSession, verificationResult) | ||
|
||
assertCheckResults(verificationResult) | ||
assertRowLevelResults(rowLevelResultsDF, analyzerOptions) | ||
assertMetrics(metricsDF) | ||
} | ||
} | ||
|
||
/** Run anomaly detection using a repository with some previous analysis results for testing */ | ||
private[this] def evaluateWithRepositoryWithHistory(test: MetricsRepository => Unit): Unit = { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had discussed that the default
analyzerOptions
behavior should beFilteredRowOutcome.TRUE
, should we modify 933 to be true? (By default filtered rows aretrue
instead ofnull
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was testing out another scenario that may be problematic.
Given the following dataframe:
where
You'll see that rows 1,2,3 should be skipped -> True
Row 5 should be null as val2 is a null value there.
However, with the above method we convert all nulls to true/null - this doesn't distinguish between null values due to being filtered or null values due to null column values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @eycho-am for the valuable feedback. The latest PR revision contains a new structure for the column that helps maintain the "source" of a row, whether it is in scope and filtered out. That will help in evaluating the correct outcome for each row.