Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

String Length Check #14

Merged
merged 7 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,17 @@ Takes 2 - 4 parameters, described below. If the value in the column doesn't fall

**Note:** To specify another column in the table, you must prefix the column name with a **`** (backtick).

#### `stringLengthCheck`

Takes 2 or 3 parameters, described in the table below. If the length of the string in the column doesn't fall within the range specified by (`minValue`, `maxValue`), both inclusive, the check will fail.
At least one of `minValue` or `maxValue` must be specified. The data type of `column` must be String.

| Arg | Type | Description |
|-----|------|-------------|
| `column` | String | Table column to be checked. The DataType of the column must be a String
| `minValue` | Integer | Lower bound of the length of the string, inclusive.
| `maxValue` | Integer | Upper bound of the length of the string, inclusive.

#### `rowCount`

The minimum number of rows a table must have to pass the validator.
Expand Down Expand Up @@ -324,6 +335,12 @@ tables:
# nullCheck - checks if the column is null, counts number of rows with null for this column.
- type: nullCheck
column: occupation

# stringLengthCheck - checks if the length of the string in the column falls within the specified range, counts number of rows in which the length of the string is outside the specified range.
- type: stringLengthCheck
column: occupation
minValue: 1
maxValue: 5
```

## Working with OOZIE Workflows
Expand Down Expand Up @@ -470,4 +487,4 @@ tables:

- type: nullCheck
column: nullCol
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ object JsonDecoders extends LazyLogging {
case "columnMaxCheck" => c.as[ColumnMaxCheck]
case "rangeCheck" => RangeCheck.fromJson(c)
case "uniqueCheck" => UniqueCheck.fromJson(c)
case "stringLengthCheck" => StringLengthCheck.fromJson(c)
case x => logger.error(s"Unknown Check `$x` in config!")
throw new RuntimeException(s"Unknown Check in config `$x`")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.target.data_validator.validator

import com.target.data_validator.{JsonEncoders, ValidatorError, VarSubstitution}
import com.target.data_validator.JsonUtils.debugJson
import com.target.data_validator.validator.ValidatorBase._
import com.typesafe.scalalogging.LazyLogging
import io.circe.{DecodingFailure, HCursor, Json}
import io.circe.syntax._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

case class StringLengthCheck(
column: String,
minValue: Option[Json],
maxValue: Option[Json]
) extends RowBased {

override def substituteVariables(dict: VarSubstitution): ValidatorBase = {

val ret = StringLengthCheck(
getVarSub(column, "column", dict),
minValue.map(getVarSubJson(_, "minValue", dict)),
maxValue.map(getVarSubJson(_, "maxValue", dict))
)
getEvents.foreach(ret.addEvent)
ret
}

private def cmpExpr(colExpr: Expression,
value: Option[Json],
cmp: (Expression, Expression) => Expression
): Option[Expression] = {
value.map { v => cmp(colExpr, createLiteralOrUnresolvedAttribute(IntegerType, v)) }
}

override def colTest(schema: StructType, dict: VarSubstitution): Expression = {

val colExp = Length(UnresolvedAttribute(column))

val minValueExpression = cmpExpr(colExp, minValue, LessThan)
val maxValueExpression = cmpExpr(colExp, maxValue, GreaterThan)

val ret = (minValueExpression, maxValueExpression) match {
case (Some(x), None) => x
case (None, Some(y)) => y
case (Some(x), Some(y)) => Or(x, y)
case _ => throw new RuntimeException("Must define min or max value.")
}
logger.debug(s"Expr: $ret")
ret
}

private def checkMinLessThanOrEqualToMax(values: List[Json]): Unit = {

if (values.forall(_.isNumber)) {
values.flatMap(_.asNumber) match {
case mv :: xv :: Nil if mv.toDouble > xv.toDouble =>
addEvent(ValidatorError(s"min: ${minValue.get} must be less than or equal to max: ${maxValue.get}"))
case _ =>
}
} else if (values.forall(_.isString)) {
values.flatMap(_.asString) match {
case mv :: xv :: Nil if mv == xv =>
addEvent(ValidatorError(s"Min[String]: $mv must be less than max[String]: $xv"))
case _ =>
}
} else {
// Not Strings or Numbers
addEvent(ValidatorError(s"Unsupported type in ${values.map(debugJson).mkString(", ")}"))
}
}

override def configCheck(df: DataFrame): Boolean = {

// Verify if at least one of min or max is specified.
val values = (minValue::maxValue::Nil).flatten
if (values.isEmpty) {
addEvent(ValidatorError("Must define minValue or maxValue or both."))
}

// Verify that min is less than max
checkMinLessThanOrEqualToMax(values)

// Verify that the data type of the specified column is a String.
val colType = findColumnInDataFrame(df, column)
if (colType.isDefined) {
val dataType = colType.get.dataType
if (!(dataType.isInstanceOf[StringType])) {
addEvent(ValidatorError(s"Data type of column '$column' must be String, but was found to be $dataType"))
}
}

failed
}

override def toJson: Json = {
import JsonEncoders.eventEncoder
val fields = Seq(
("type", Json.fromString("stringLengthCheck")),
("column", Json.fromString(column))
) ++
minValue.map(mv => ("minValue", mv)) ++
maxValue.map(mv => ("maxValue", mv)) ++
Seq(
("events", getEvents.asJson)
)
Json.obj(fields: _*)
}
}

object StringLengthCheck extends LazyLogging {
def fromJson(c: HCursor): Either[DecodingFailure, ValidatorBase] = {
val column = c.downField("column").as[String].right.get
val minValueJ = c.downField("minValue").as[Json].right.toOption
val maxValueJ = c.downField("maxValue").as[Json].right.toOption

logger.debug(s"column: $column")
logger.debug(s"minValue: $minValueJ type: ${minValueJ.getClass.getCanonicalName}")
logger.debug(s"maxValue: $maxValueJ type: ${maxValueJ.getClass.getCanonicalName}")

c.focus.foreach {f => logger.info(s"StringLengthCheckJson: ${f.spaces2}")}
scala.util.Right(StringLengthCheck(column, minValueJ, maxValueJ))
}
}
Loading