Skip to content

Commit

Permalink
Merge pull request #62 from phpisciuneri/varsub-rowcount
Browse files Browse the repository at this point in the history
Adds variable substitution for minNumRows
  • Loading branch information
phpisciuneri authored May 13, 2021
2 parents 2839ed2 + 2d603d4 commit dd677f1
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 29 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ A tool to validate data in HIVE tables.
Assemble fat jar: `sbt clean assembly`

```
spark-submit --master local data-validator-assembly-0.10.0.jar --help
spark-submit --master local data-validator-assembly-0.13.0.jar --help
data-validator v0.10.0
data-validator v0.13.0
Usage: data-validator [options]
--version
Expand All @@ -31,7 +31,7 @@ Usage: data-validator [options]
spark-submit \
--num-executors 10 \
--executor-cores 2 \
data-validator-assembly-0.10.0.jar \
data-validator-assembly-0.13.0.jar \
--config config.yaml \
--jsonReport report.json
```
Expand Down Expand Up @@ -302,7 +302,7 @@ The minimum number of rows a table must have to pass the validator.

| Arg | Type | Description |
|-----|------|-------------|
| `minNumRows` | Long | The minimum number of rows a table must have to pass. **Note:** Currently this cannot be a variable.
| `minNumRows` | Long | The minimum number of rows a table must have to pass.

See Example Config file below to see how the checks are configured.

Expand Down Expand Up @@ -463,7 +463,7 @@ Example oozie wf snippet:
<argument>${principal}</argument>
<argument>--files</argument>
<argument>config.yaml</argument>
<argument>data-validator-assembly-0.10.0.jar</argument>
<argument>data-validator-assembly-0.13.0.jar</argument>
<argument>--config</argument>
<argument>config.yaml</argument>
<argument>--exitErrorOnFail</argument>
Expand Down Expand Up @@ -511,7 +511,7 @@ Example oozie wf snippet:
<argument>${keytab}</argument>
<argument>--principal</argument>
<argument>${principal}</argument>
<argument>data-validator-assembly-0.10.0.jar</argument>
<argument>data-validator-assembly-0.13.0.jar</argument>
<argument>--config</argument>
<argument>config.yaml</argument>
<argument>--exitErrorOnFail</argument>
Expand Down Expand Up @@ -554,7 +554,7 @@ A tool is provided to generate a sample `.orc` file for use in local development
```sh
spark-submit \
--master "local[*]" \
data-validator-assembly-0.10.0.jar \
data-validator-assembly-0.13.0.jar \
--config local_validators.yaml \
--jsonReport report.json \
--htmlReport report.html
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name := "data-validator"
organization := "com.target"

scalaVersion := "2.11.10"
scalaVersion := "2.11.12"

val sparkVersion = "2.3.1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,40 @@ abstract class ColumnBased(column: String, condTest: Expression) extends CheapCh
}
}

case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0) {
case class MinNumRows(minNumRows: Json) extends ColumnBased("", ValidatorBase.L0) {
override def name: String = "MinNumRows"

override def substituteVariables(dict: VarSubstitution): ValidatorBase = this
override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
val ret = MinNumRows(getVarSubJson(minNumRows, "minNumRows", dict))
getEvents.foreach(ret.addEvent)
ret
}

override def configCheck(df: DataFrame): Boolean = {
if (minNumRows <= 0) {
val msg = s"MinNumRows: $minNumRows <= 0"

def notNaturalNumber(): Unit = {
val msg = "minNumRows must be a natural number"
logger.error(msg)
addEvent(ValidatorError(msg))
failed = true
true
} else {
false
}

minNumRows.asNumber match {
case Some(jsonNumber) => jsonNumber.toLong match {
case Some(x) if x > 0 =>
case _ => notNaturalNumber()
}
case _ => notNaturalNumber()
}
failed
}

override def quickCheck(row: Row, count: Long, idx: Int): Boolean = {
failed = count < minNumRows
val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00%"
// Convert to `JsonNumber` then to `Long`
// safe because already handled in `configCheck`
val minNumRowsLong = minNumRows.asNumber.get.toLong.get

failed = count < minNumRowsLong
val pctError = if (failed) calculatePctError(minNumRowsLong, count) else "0.00%"
addEvent(ValidatorCounter("rowCount", count))
val msg = s"MinNumRowsCheck Expected: $minNumRows Actual: $count Relative Error: $pctError"
val data = ListMap("expected" -> minNumRows.toString, "actual" -> count.toString, "relative_error" -> pctError)
Expand All @@ -64,7 +78,7 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0

override def toJson: Json = Json.obj(
("type", Json.fromString("rowCount")),
("minNumRows", Json.fromLong(minNumRows)),
("minNumRows", minNumRows),
("failed", Json.fromBoolean(failed)),
("events", this.getEvents.asJson)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ConfigParserSpec extends FunSpec with BeforeAndAfterAll {
"bar",
Some(List("one", "two")),
None,
List(MinNumRows(10294), NullCheck("mdse_item_i", None)) // scalastyle:ignore magic.number
List(MinNumRows(Json.fromInt(10294)), NullCheck("mdse_item_i", None)) // scalastyle:ignore magic.number
),
ValidatorOrcFile("LocalFile.orc", None, Some("foo < 10"), List(NullCheck("start_d", None))),
ValidatorParquetFile("LocFile.parquet", None, Some("bar < 10"), List(NullCheck("end_d", None)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ class ConfigVarSubSpec extends FunSpec with Matchers with TestingSparkSession {

describe("MinNumRows") {

it("MinNumRows doesn't support substitutions so it should be equal, no changes.") {
val sut = MinNumRows(100) // scalastyle:ignore
assert(sut.substituteVariables(dict) == sut)
it("should substitute variables properly") {
val sut = MinNumRows(Json.fromString("$one"))
assert(sut.substituteVariables(dict) == MinNumRows(Json.fromInt(1)))
}

}
Expand Down
19 changes: 13 additions & 6 deletions src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,27 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {

describe("ValidatorMinNumRows") {

val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore

it("configCheck() should fail for minNumRows as non-numeric") {
val dict = new VarSubstitution
val config = mkConfig(df, List(MinNumRows(Json.fromString("badinput"))))
assert(config.configCheck(spark, dict))
assert(config.failed)
assert(config.tables.head.failed)
}

it("configCheck() should fail for negative minNumRows") {
val dict = new VarSubstitution
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
val config = mkConfig(df, List(MinNumRows(-10))) //scalastyle:ignore
val config = mkConfig(df, List(MinNumRows(Json.fromLong(-10)))) // scalastyle:ignore magic.number
assert(config.configCheck(spark, dict))
assert(config.failed)
assert(config.tables.head.failed)
}

it("quickCheck() should fail when rowCount < minNumRows") {
val dict = new VarSubstitution
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
val minNumRowsCheck = MinNumRows(10) // scalastyle:ignore magic.number
val minNumRowsCheck = MinNumRows(Json.fromLong(10)) // scalastyle:ignore magic.number
val config = mkConfig(df, List(minNumRowsCheck))
assert(config.quickChecks(spark, dict))
assert(config.failed)
Expand All @@ -197,8 +205,7 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {

it("quickCheck() should succeed when rowCount > minNumRows") {
val dict = new VarSubstitution
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
val minNumRowsCheck = MinNumRows(1)
val minNumRowsCheck = MinNumRows(Json.fromInt(1))
val config = mkConfig(df, List(minNumRowsCheck))
assert(!config.configCheck(spark, dict))
assert(!config.quickChecks(spark, dict))
Expand Down

0 comments on commit dd677f1

Please sign in to comment.