Skip to content

Commit

Permalink
Merge pull request #29 from phpisciuneri/polish
Browse files Browse the repository at this point in the history
Polish
  • Loading branch information
phpisciuneri authored Nov 4, 2019
2 parents 0379057 + 028f979 commit 4d47a5f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

case class StringLengthCheck(
column: String,
minLength: Option[Json],
maxLength: Option[Json],
threshold: Option[String]
) extends RowBased {
column: String,
minLength: Option[Json],
maxLength: Option[Json],
threshold: Option[String]
) extends RowBased {

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {

val ret = StringLengthCheck(
getVarSub(column, "column", dict),
minLength.map(getVarSubJson(_, "minLength", dict)),
maxLength.map(getVarSubJson(_, "maxLength", dict)),
threshold.map(getVarSub(_, "threshold", dict))
)
getVarSub(column, "column", dict),
minLength.map(getVarSubJson(_, "minLength", dict)),
maxLength.map(getVarSubJson(_, "maxLength", dict)),
threshold.map(getVarSub(_, "threshold", dict))
)
getEvents.foreach(ret.addEvent)
ret
}

private def cmpExpr(colExpr: Expression,
value: Option[Json],
cmp: (Expression, Expression) => Expression
): Option[Expression] = {
value: Option[Json],
cmp: (Expression, Expression) => Expression
): Option[Expression] = {
value.map { v => cmp(colExpr, createLiteralOrUnresolvedAttribute(IntegerType, v)) }
}

Expand All @@ -57,20 +57,20 @@ case class StringLengthCheck(
private def checkMinLessThanOrEqualToMax(values: List[Json]): Unit = {

if (values.forall(_.isNumber)) {
values.flatMap(_.asNumber) match {
case mv :: xv :: Nil if mv.toDouble > xv.toDouble =>
addEvent(ValidatorError(s"min: ${minLength.get} must be less than or equal to max: ${maxLength.get}"))
case _ =>
}
values.flatMap(_.asNumber) match {
case mv :: xv :: Nil if mv.toDouble > xv.toDouble =>
addEvent(ValidatorError(s"min: ${minLength.get} must be less than or equal to max: ${maxLength.get}"))
case _ =>
}
} else if (values.forall(_.isString)) {
values.flatMap(_.asString) match {
case mv :: xv :: Nil if mv == xv =>
addEvent(ValidatorError(s"Min[String]: $mv must be less than max[String]: $xv"))
case _ =>
}
values.flatMap(_.asString) match {
case mv :: xv :: Nil if mv == xv =>
addEvent(ValidatorError(s"Min[String]: $mv must be less than max[String]: $xv"))
case _ =>
}
} else {
// Not Strings or Numbers
addEvent(ValidatorError(s"Unsupported type in ${values.map(debugJson).mkString(", ")}"))
// Not Strings or Numbers
addEvent(ValidatorError(s"Unsupported type in ${values.map(debugJson).mkString(", ")}"))
}
}

Expand All @@ -89,7 +89,7 @@ case class StringLengthCheck(
val colType = findColumnInDataFrame(df, column)
if (colType.isDefined) {
val dataType = colType.get.dataType
if (!(dataType.isInstanceOf[StringType])) {
if (!dataType.isInstanceOf[StringType]) {
addEvent(ValidatorError(s"Data type of column '$column' must be String, but was found to be $dataType"))
}
}
Expand Down Expand Up @@ -123,8 +123,6 @@ object StringLengthCheck extends LazyLogging {
logger.debug(s"minLength: $minLengthJ type: ${minLengthJ.getClass.getCanonicalName}")
logger.debug(s"maxLength: $maxLengthJ type: ${maxLengthJ.getClass.getCanonicalName}")
logger.debug(s"threshold: $threshold type: ${threshold.getClass.getCanonicalName}")

c.focus.foreach {f => logger.info(s"StringLengthCheckJson: ${f.spaces2}")}
scala.util.Right(StringLengthCheck(column, minLengthJ, maxLengthJ, threshold))
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.target.data_validator.validator

import com.target.data_validator.{JsonEncoders, ValidatorError, VarSubstitution}
import com.target.data_validator.JsonUtils.debugJson
import com.target.data_validator.validator.ValidatorBase._
import com.typesafe.scalalogging.LazyLogging
import io.circe.{DecodingFailure, HCursor, Json}
Expand All @@ -12,10 +11,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructType}

case class StringRegexCheck(
column: String,
regex: Option[Json],
threshold: Option[String]
) extends RowBased {
column: String,
regex: Option[Json],
threshold: Option[String]
) extends RowBased {

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {

Expand All @@ -35,13 +34,13 @@ case class StringRegexCheck(
val regexExpression = regex.map { r => RLike(colExp, createLiteralOrUnresolvedAttribute(StringType, r)) }

val ret = regexExpression match {
/*
RLike returns false if the column value is null.
To avoid counting null values as validation failures (like other validations),
an explicit non null check on the column value is required.
*/
case Some(x) => And(Not(x), IsNotNull(colExp))
case _ => throw new RuntimeException("Must define a regex.")
/*
RLike returns false if the column value is null.
To avoid counting null values as validation failures (like other validations),
an explicit non null check on the column value is required.
*/
case Some(x) => And(Not(x), IsNotNull(colExp))
case _ => throw new RuntimeException("Must define a regex.")
}
logger.debug(s"Expr: $ret")
ret
Expand All @@ -59,7 +58,7 @@ case class StringRegexCheck(
val colType = findColumnInDataFrame(df, column)
if (colType.isDefined) {
val dataType = colType.get.dataType
if (!(dataType.isInstanceOf[StringType])) {
if (!dataType.isInstanceOf[StringType]) {
addEvent(ValidatorError(s"Data type of column '$column' must be String, but was found to be $dataType"))
}
}
Expand Down Expand Up @@ -90,8 +89,6 @@ object StringRegexCheck extends LazyLogging {
logger.debug(s"column: $column")
logger.debug(s"regex: $regex type: ${regex.getClass.getCanonicalName}")
logger.debug(s"threshold: $threshold type: ${threshold.getClass.getCanonicalName}")

c.focus.foreach {f => logger.info(s"StringRegexCheckJson: ${f.spaces2}")}
scala.util.Right(StringRegexCheck(column, regex, threshold))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class StringLengthCheckSpec extends FunSpec with Matchers with TestingSparkSessi
assert(config.quickChecks(spark, dict))
assert(sut.failed)
assert(sut.getEvents contains
ValidatorCheckEvent(failure = true, "StringLengthCheck on column 'item'", 4, 2))
ValidatorCheckEvent(failure = true, "StringLengthCheck on column 'item'", 4, 2)) // scalastyle:ignore

assert(sut.getEvents contains
ValidatorQuickCheckError(("item", "Item1") :: Nil, "Item1",
Expand Down Expand Up @@ -428,4 +428,4 @@ class StringLengthCheckSpec extends FunSpec with Matchers with TestingSparkSessi
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.target.data_validator.validator
import com.target.TestingSparkSession
import com.target.data_validator._
import io.circe.Json
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
Expand All @@ -22,8 +22,8 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio
Row("Item1", 2.99),
Row("Item23", 5.35),
Row("I", 1.00),
Row(null, 1.00),
Row(null, 2.00)
Row(null, 1.00), // scalastyle:ignore
Row(null, 2.00) // scalastyle:ignore
)

describe("StringRegexCheck") {
Expand All @@ -40,7 +40,7 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio

it("error if column is not found in df") {
val df = mkDataFrame(spark, defData, schema)
val sut = StringRegexCheck( "bad_column_name", Some(Json.fromString("I%")), None)
val sut = StringRegexCheck("bad_column_name", Some(Json.fromString("I%")), None)
assert(sut.configCheck(df))
assert(sut.getEvents contains ValidatorError("Column: 'bad_column_name' not found in schema."))
assert(sut.failed)
Expand All @@ -50,7 +50,10 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio
val df = mkDataFrame(spark, defData, schema)
val sut = StringRegexCheck("baseprice", Some(Json.fromString("I%")), None)
assert(sut.configCheck(df))
assert(sut.getEvents contains ValidatorError("Data type of column 'baseprice' must be String, but was found to be DoubleType"))
assert(
sut.getEvents contains
ValidatorError("Data type of column 'baseprice' must be String, but was found to be DoubleType")
)
assert(sut.failed)
}
}
Expand All @@ -71,7 +74,7 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio
}

it("substitute with threshold") {
val dict = mkParams(List(("column", "item"), ("regex", "I%"), ("threshold", Json.fromInt(100))))
val dict = mkParams(List(("column", "item"), ("regex", "I%"), ("threshold", Json.fromInt(100)))) // scalastyle:ignore
val sut = StringRegexCheck("$column", Some(Json.fromString("${regex}")), Some("${threshold}"))
assert(sut.substituteVariables(dict) == StringRegexCheck("item", Some(Json.fromString("I%")), Some("100")))
assert(!sut.failed)
Expand All @@ -82,8 +85,10 @@ class StringRegexCheckSpec extends FunSpec with Matchers with TestingSparkSessio

it("regex pattern ab%") {
val sut = StringRegexCheck("item", Some(Json.fromString("ab%")), None)
assert(sut.colTest(schema, mkParams()).sql ==
And(Not(RLike(UnresolvedAttribute("item"), Literal.create("ab%", StringType))), IsNotNull(UnresolvedAttribute("item"))).sql)
assert(sut.colTest(schema, mkParams()).sql == And(
Not(RLike(UnresolvedAttribute("item"), Literal.create("ab%", StringType))),
IsNotNull(UnresolvedAttribute("item"))).sql
)
}
}

Expand Down

0 comments on commit 4d47a5f

Please sign in to comment.