Skip to content

Commit

Permalink
Merge pull request #35 from target/gh34-sumofnumericcolumncheck
Browse files Browse the repository at this point in the history
Adds a ColumnSumCheck
  • Loading branch information
phpisciuneri committed Apr 6, 2020
2 parents 3bb80d8 + 29908eb commit 092de5d
Show file tree
Hide file tree
Showing 6 changed files with 456 additions and 12 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,17 @@ This is a costly check and requires an additional pass through the table.
|-----|------|-------------|
| `columns` | Array[String] | Each set of values in these columns must be unique.

#### `columnSumCheck`

This check sums a column in all rows. If the sum applied to the `column` doesn't fall within the range specified by (`minValue`, `maxValue`) the check will fail.

| Arg | Type | Description |
|-------------|-------------|------------------------------------------------------------------------|
| `column` | String | The column to be checked. |
| `minValue` | NumericType | The lower bound of the sum. Type depends on the type of the `column`. |
| `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. |
| `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. |

## Example Config

```yaml
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.target.data_validator.validator

import com.target.data_validator.{ValidatorCheckEvent, ValidatorError, VarSubstitution}
import io.circe._
import io.circe.generic.semiauto._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
import org.apache.spark.sql.types._

case class ColumnSumCheck(
column: String,
minValue: Option[Json] = None,
maxValue: Option[Json] = None,
inclusive: Option[Json] = None
) extends ColumnBased(column, Sum(UnresolvedAttribute(column)).toAggregateExpression()) {

private val minOrMax: Either[String, Unit] = if (minValue.isEmpty && maxValue.isEmpty) {
Left("'minValue' or 'maxValue' or both must be defined")
} else {
Right()
}

private val lowerBound: Either[String, Double] = minValue match {
case Some(json) =>
if (json.isNumber) { Right(json.asNumber.get.toDouble) }
else { Left(s"'minValue' defined but type is not a Number, is: ${json.name}") }
case None => Right(Double.MinValue)
}

private val upperBound: Either[String, Double] = maxValue match {
case Some(json) =>
if (json.isNumber) { Right(json.asNumber.get.toDouble) }
else { Left(s"'maxValue' defined but type is not a Number, is: ${json.name}") }
case None => Right(Double.MaxValue)
}

private val minLessThanMax: Either[String, Unit] = (lowerBound, upperBound) match {
case (Right(lower), Right(upper)) if lower >= upper =>
Left(s"'minValue': $lower must be less than 'maxValue': $upper")
case _ => Right()
}

private val inclusiveBounds: Either[String, Boolean] = inclusive match {
case Some(json) =>
if (json.isBoolean) { Right(json.asBoolean.get) }
else { Left(s"'inclusive' defined but type is not Boolean, is: ${json.name}") }
case None => Right(false)
}

override def name: String = "columnSumCheck"

override def quickCheck(r: Row, count: Long, idx: Int): Boolean = {

def evaluate(sum: Double): Boolean = {
if (inclusiveBounds.right.get) { sum > upperBound.right.get || sum < lowerBound.right.get}
else { sum >= upperBound.right.get || sum <= lowerBound.right.get}
}

failed = r.schema(idx).dataType match {
case ShortType => evaluate(r.getShort(idx))
case IntegerType => evaluate(r.getInt(idx))
case LongType => evaluate(r.getLong(idx))
case FloatType => evaluate(r.getFloat(idx))
case DoubleType => evaluate(r.getDouble(idx))
case ut => throw new Exception(s"Unsupported type for $name found in schema: $ut")
}

val bounds = minValue.getOrElse("") :: maxValue.getOrElse("") :: Nil
val prettyBounds = if (inclusiveBounds.right.get) {
r.get(idx) + " in " + bounds.mkString("[", " , ", "]")
} else {
r.get(idx) + " in " + bounds.mkString("(", " , ", ")")
}
val errorValue = if (failed) 1 else 0
addEvent(ValidatorCheckEvent(failed, s"$name on '$column': $prettyBounds", count, errorValue))
failed
}

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {
val ret = copy(
column = getVarSub(column, "column", dict),
minValue = minValue.map(getVarSubJson(_, "minValue", dict)),
maxValue = maxValue.map(getVarSubJson(_, "maxValue", dict)),
inclusive = inclusive.map(getVarSubJson(_, "inclusive", dict))
)
this.getEvents.foreach(ret.addEvent)
ret
}

override def configCheck(df: DataFrame): Boolean = {
logger.debug(s"Full check config: ${this.toString}")
Seq(
minOrMax,
lowerBound,
upperBound,
minLessThanMax,
inclusiveBounds
).foreach {
case Left(msg) =>
logger.error(msg)
addEvent(ValidatorError(msg))
case _ =>
}

findColumnInDataFrame(df, column) match {
case Some(ft) if ft.dataType.isInstanceOf[NumericType] =>
case Some(ft) =>
val msg = s"Column: $column found, but not of numericType type: ${ft.dataType}"
logger.error(msg)
addEvent(ValidatorError(msg))
case None =>
val msg = s"Column: $column not found in schema"
logger.error(msg)
addEvent(ValidatorError(msg))
}
failed
}

override def toJson: Json = {
val additionalFieldsForReport = Json.fromFields(Set(
"type" -> Json.fromString("columnSumCheck"),
"failed" -> Json.fromBoolean(failed)
))

val base = ColumnSumCheck.encoder(this)
base.deepMerge(additionalFieldsForReport)
}
}

object ColumnSumCheck {
val encoder: Encoder[ColumnSumCheck] = deriveEncoder[ColumnSumCheck]
val decoder: Decoder[ColumnSumCheck] = deriveDecoder[ColumnSumCheck]
def fromJson(c: HCursor): Either[DecodingFailure, ValidatorBase] = decoder.apply(c)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,39 @@ package com.target.data_validator.validator

import cats.syntax.either._
import com.typesafe.scalalogging.LazyLogging
import io.circe.{Decoder, HCursor}
import io.circe.{Decoder, DecodingFailure, HCursor}
import io.circe.generic.auto._

object JsonDecoders extends LazyLogging {

implicit val decodeChecks: Decoder[ValidatorBase] = new Decoder[ValidatorBase] {
final def apply(c: HCursor): Decoder.Result[ValidatorBase] = c.downField("type").as[String].flatMap {
case "rowCount" => c.as[MinNumRows]
case "nullCheck" => NullCheck.fromJson(c)
case "negativeCheck" => NegativeCheck.fromJson(c)
case "columnMaxCheck" => c.as[ColumnMaxCheck]
case "rangeCheck" => RangeCheck.fromJson(c)
case "uniqueCheck" => UniqueCheck.fromJson(c)
case "stringLengthCheck" => StringLengthCheck.fromJson(c)
case "stringRegexCheck" => StringRegexCheck.fromJson(c)
case x => logger.error(s"Unknown Check `$x` in config!")
throw new RuntimeException(s"Unknown Check in config `$x`")
// FIXME: specifying this Function here instead of Decoder[ValidatorBase] is a smell that these checks
// ought to have proper decoder objects instead of a method.
// I.e., we're not using the Circe Decoder API as intended.
private lazy val decoders = Map[String, HCursor => Either[DecodingFailure, ValidatorBase]](
"rowCount" -> { _.as[MinNumRows] },
"nullCheck" -> NullCheck.fromJson,
"negativeCheck" -> NegativeCheck.fromJson,
"columnMaxCheck" -> { _.as[ColumnMaxCheck] },
"rangeCheck" -> RangeCheck.fromJson,
"uniqueCheck" -> UniqueCheck.fromJson,
"stringLengthCheck" -> StringLengthCheck.fromJson,
"stringRegexCheck" -> StringRegexCheck.fromJson,
"columnSumCheck" -> ColumnSumCheck.fromJson
)

final def apply(c: HCursor): Decoder.Result[ValidatorBase] = c.downField("type").as[String].flatMap(getDecoder(c))

private def getDecoder(cursor: HCursor)(checkType: String) = {
decoders
.get(checkType)
.map(_(cursor))
match {
case Some(x) => x
case None =>
logger.error(s"Unknown Check `$checkType` in config! Choose one of: ${decoders.keys.mkString(", ")}.")
throw new RuntimeException(s"Unknown Check in config `$checkType`")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ object ValidatorBase extends LazyLogging {
trait CheapCheck extends ValidatorBase {
def select(schema: StructType, dict: VarSubstitution): Expression

/**
* Run a check on a particular column on a row
*
* @param r the row under inspection
* @param count the number of rows total
* @param idx the index of the column under inspection
* @return true if the check fails, false if is passes
*/
def quickCheck(r: Row, count: Long, idx: Int): Boolean
}

Expand Down
2 changes: 2 additions & 0 deletions src/test/scala/com/target/data_validator/TestHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ object TestHelpers {
}

def guessType(v: Any): DataType = v.getClass.getCanonicalName match {
case "java.lang.Short" => ShortType
case "java.lang.String" => StringType
case "java.lang.Integer" => IntegerType
case "java.lang.Double" => DoubleType
case "java.lang.Boolean" => BooleanType
case "java.lang.Long" => LongType
case _ => throw new IllegalArgumentException(s"Unknown type '${v.getClass.getCanonicalName}'")
}

Expand Down
Loading

0 comments on commit 092de5d

Please sign in to comment.