From 363802fa6e25bffbe2b2f09eddde08417f18cfda Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Fri, 17 May 2024 08:52:56 +1000 Subject: [PATCH 1/6] Add DateTimeMetric, Analyzer and Example --- .../com/amazon/deequ/analyzers/Analyzer.scala | 46 +++++- .../analyzers/DateTimeDistribution.scala | 143 ++++++++++++++++++ .../deequ/analyzers/MaximumDateTime.scala | 54 +++++++ .../deequ/analyzers/MinimumDateTime.scala | 54 +++++++ .../catalyst/DateTimeAggregation.scala | 49 ++++++ .../analyzers/catalyst/DeequFunctions.scala | 13 +- .../scala/com/amazon/deequ/checks/Check.scala | 91 +++++++++++ .../amazon/deequ/constraints/Constraint.scala | 37 +++++ .../examples/DateTimeMetricExample.scala | 51 +++++++ .../amazon/deequ/examples/ExampleUtils.scala | 5 + .../com/amazon/deequ/examples/entities.scala | 8 + .../com/amazon/deequ/metrics/Metric.scala | 13 ++ 12 files changed, 562 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala create mode 100644 src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala create mode 100644 src/main/scala/com/amazon/deequ/analyzers/MinimumDateTime.scala create mode 100644 src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala create mode 100644 src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index 9367f31e1..c0cffd80e 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -20,7 +20,7 @@ import com.amazon.deequ.analyzers.Analyzers._ import com.amazon.deequ.analyzers.FilteredRowOutcome.FilteredRowOutcome import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.runners._ -import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.{DateTimeMetric, DoubleMetric} import com.amazon.deequ.metrics.Entity import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.metrics.Metric @@ -32,6 +32,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ +import java.time.Instant import scala.language.existentials import scala.util.Failure import scala.util.Success @@ -62,6 +63,10 @@ trait DoubleValuedState[S <: DoubleValuedState[S]] extends State[S] { def metricValue(): Double } +trait DateTimeValuedState[S <: DateTimeValuedState[S]] extends State[S] { + def metricValue(): Instant +} + /** Common trait for all analyzers which generates metrics from states computed on data frames */ trait Analyzer[S <: State[_], +M <: Metric[_]] extends Serializable { @@ -244,6 +249,29 @@ abstract class StandardScanShareableAnalyzer[S <: DoubleValuedState[_]]( } } +/** A scan-shareable analyzer that produces a DateTimeMetric */ +abstract class TimestampScanShareableAnalyzer[S <: DateTimeValuedState[_]]( + name: String, + instance: String, + entity: Entity.Value = Entity.Column) + extends ScanShareableAnalyzer[S, DateTimeMetric] { + + override def computeMetricFrom(state: Option[S]): DateTimeMetric = state match { + case Some(theState) => + DateTimeMetric(entity, name, instance, Success(theState.metricValue())) + case _ => + DateTimeMetric(entity, name, instance, Failure( + MetricCalculationException.wrapIfNecessary(emptyStateException(this)))) + } + + override private[deequ] def toFailureMetric(exception: Exception) = DateTimeMetric(entity, name, instance, Failure( + MetricCalculationException.wrapIfNecessary(exception))) + + override def preconditions: Seq[StructType => Unit] = additionalPreconditions() ++ super.preconditions + + protected def additionalPreconditions(): Seq[StructType => Unit] = Seq.empty +} + /** A state for computing ratio-based metrics, * contains #rows that match a predicate and overall #rows */ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullColumn: Option[Column] = None) @@ -329,6 +357,8 @@ object Preconditions { private[this] val nestedDataTypes = Set(StructType, MapType, ArrayType) + private[this] val dateTypes = Set(TimestampType, DateType) + private[this] val caseSensitive = { SparkSession.builder().getOrCreate() .sqlContext.getConf("spark.sql.caseSensitive").equalsIgnoreCase("true") @@ -405,6 +435,20 @@ object Preconditions { } } + /** Specified column has string type */ + def isDateType(column: String): StructType => Unit = { schema => + val columnDataType = structField(column, schema).dataType + val hasDateType = columnDataType match { + case DateType | TimestampType => true + case _ => false + } + + if (hasDateType) { + throw new WrongColumnTypeException(s"Expected type of column $column to be one of" + + s"${dateTypes.mkString(",")}, but found $columnDataType instead!") + } + } + /** Specified column has a numeric type */ def isNumeric(column: String): StructType => Unit = { schema => val columnDataType = structField(column, schema).dataType diff --git a/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala b/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala new file mode 100644 index 000000000..80768f352 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala @@ -0,0 +1,143 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +import java.time.Instant +import com.amazon.deequ.analyzers.Analyzers._ +import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isDateType} +import com.amazon.deequ.analyzers.runners.MetricCalculationException +import com.amazon.deequ.metrics.{Distribution, DistributionValue, HistogramMetric} +import org.apache.spark.sql.DeequFunctions.dateTimeDistribution +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Column, Row} + +import scala.util.{Failure, Success} + +object DistributionInterval extends Enumeration { + val QUARTER_HOUR, HOURLY, DAILY, WEEKLY, MONTHLY = Value +} + +case class DateTimeDistributionState(distribution: Map[(Instant, Instant), Long]) + extends State[DateTimeDistributionState] { + + override def sum(other: DateTimeDistributionState): DateTimeDistributionState = { + + DateTimeDistributionState(distribution ++ other.distribution.map { + case (k, v) => k -> (v + distribution.getOrElse(k, 0L)) + }) + } +} + +object DateTimeDistributionState { + + def computeStateFromResult( + result: Map[Long, Long], + frequency: Long + ): Map[(Instant, Instant), Long] = { + result.map({ + case (x, y) => (new Instant(x), new Instant(x + frequency - 1L)) -> y + }) + } + + def toDistribution(histogram: DateTimeDistributionState): Distribution = { + val totalCount = histogram.distribution.foldLeft(0L)(_ + _._2) + Distribution( + histogram.distribution.map { + case (x, y) => + ("(" + x._1.toString + " to " + x._2.toString + ")") -> DistributionValue(y, y.toDouble / totalCount) + }, + totalCount + ) + } +} + +/** + * + * @param column : column on which distribution analysis is to be performed + * @param interval : interval of the distribution; + * @param where : optional filter condition + */ +case class DateTimeDistribution( + column: String, + interval: Long, + where: Option[String] = None) + extends ScanShareableAnalyzer[DateTimeDistributionState, HistogramMetric] + with FilterableAnalyzer { + + /** Defines the aggregations to compute on the data */ + override private[deequ] def aggregationFunctions(): Seq[Column] = { + dateTimeDistribution(conditionalSelection(column, where), interval) :: Nil + } + + /** Computes the state from the result of the aggregation functions */ + override private[deequ] def fromAggregationResult( + result: Row, + offset: Int + ): Option[DateTimeDistributionState] = { + ifNoNullsIn(result, offset) { _ => + DateTimeDistributionState( + DateTimeDistributionState.computeStateFromResult( + Map.empty[Long, Long] ++ result.getMap(0), + interval + ) + ) + } + } + + override def preconditions: Seq[StructType => Unit] = { + hasColumn(column) +: isDateType(column) +: super.preconditions + } + + override def filterCondition: Option[String] = where + + /** + * Compute the metric from the state (sufficient statistics) + * + * @param state wrapper holding a state of type S (required due to typing issues...) + * @return + */ + override def computeMetricFrom(state: Option[DateTimeDistributionState]): HistogramMetric = { + state match { + case Some(histogram) => + HistogramMetric(column, Success(DateTimeDistributionState.toDistribution(histogram))) + case _ => + toFailureMetric(emptyStateException(this)) + } + } + + override private[deequ] def toFailureMetric(failure: Exception): HistogramMetric = { + HistogramMetric(column, Failure(MetricCalculationException.wrapIfNecessary(failure))) + } +} + +object DateTimeDistribution { + def apply(column: String, + interval: DistributionInterval.Value, + where: Option[String] = None): DateTimeDistribution = + new DateTimeDistribution(column, interval = getDateTimeAggIntervalValue(interval), where) + + def getDateTimeAggIntervalValue(interval: DistributionInterval.Value): Long = { + interval match { + case DistributionInterval.QUARTER_HOUR => 900000L // 15 Minutes + case DistributionInterval.HOURLY => 3600000L // 60 Minutes + case DistributionInterval.DAILY => 86400000L // 24 Hours + case DistributionInterval.WEEKLY => 604800000L + case _ => 604800000L // 7 * 24 Hours + } + } + +} diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala b/src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala new file mode 100644 index 000000000..3dc2139fe --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala @@ -0,0 +1,54 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isDateType} +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.functions.max +import org.apache.spark.sql.types.{TimestampType, StructType} +import Analyzers._ +import java.time.Instant + +case class MaxDateTimeState(maxValue: Instant) extends DateTimeValuedState[MaxDateTimeState] { + + override def sum(other: MaxDateTimeState): MaxDateTimeState = { + MaxDateTimeState(if (maxValue.compareTo(other.maxValue) > 0) maxValue else other.maxValue) + } + + override def metricValue(): Instant = maxValue +} + +case class MaximumDateTime(column: String, where: Option[String] = None) + extends TimestampScanShareableAnalyzer[MaxDateTimeState]("Maximum Date Time", column) + with FilterableAnalyzer { + + override def aggregationFunctions(): Seq[Column] = { + max(conditionalSelection(column, where)).cast(TimestampType) :: Nil + } + + override def fromAggregationResult(result: Row, offset: Int): Option[MaxDateTimeState] = { + ifNoNullsIn(result, offset) { _ => + MaxDateTimeState(result.getInstant(offset)) + } + } + + override protected def additionalPreconditions(): Seq[StructType => Unit] = { + hasColumn(column) :: isDateType(column) :: Nil + } + + override def filterCondition: Option[String] = where +} diff --git a/src/main/scala/com/amazon/deequ/analyzers/MinimumDateTime.scala b/src/main/scala/com/amazon/deequ/analyzers/MinimumDateTime.scala new file mode 100644 index 000000000..999a25f21 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/MinimumDateTime.scala @@ -0,0 +1,54 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isDateType} +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.functions.min +import org.apache.spark.sql.types.{TimestampType, StructType} +import Analyzers._ +import java.time.Instant + +case class MinDateTimeState(minValue: Instant) extends DateTimeValuedState[MinDateTimeState] { + + override def sum(other: MinDateTimeState): MinDateTimeState = { + MinDateTimeState(if (minValue.compareTo(other.minValue) < 0) minValue else other.minValue) + } + + override def metricValue(): Instant = { + minValue + } +} + +case class MinimumDateTime(column: String, where: Option[String] = None) + extends TimestampScanShareableAnalyzer[MinDateTimeState]("Minimum Date Time", column) + with FilterableAnalyzer { + + override def aggregationFunctions(): Seq[Column] = { + min(conditionalSelection(column, where)).cast(TimestampType) :: Nil + } + + override def fromAggregationResult(result: Row, offset: Int): Option[MinDateTimeState] = { + ifNoNullsIn(result, offset) { _ => MinDateTimeState(result.getInstant(offset)) } + } + + override protected def additionalPreconditions(): Seq[StructType => Unit] = { + hasColumn(column) :: isDateType(column) :: Nil + } + + override def filterCondition: Option[String] = where +} diff --git a/src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala new file mode 100644 index 000000000..bf30b6ddd --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala @@ -0,0 +1,49 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder + +import java.time.Instant + + +private[sql] class DateTimeAggregation( + frequency: Long +) extends Aggregator[Instant, Map[Long, Long], Map[Long, Long]] { + override def zero: Map[Long, Long] = Map.empty[Long, Long] + + override def reduce(agg: Map[Long, Long], input: Instant): Map[Long, Long] = { + val dateTime = input.toEpochMilli + val batchTime = dateTime - (dateTime % frequency) + agg + (batchTime -> (agg.getOrElse(batchTime, 0L) + 1L)) + } + + override def merge(b1: Map[Long, Long], b2: Map[Long, Long]): Map[Long, Long] = { + b1 ++ b2.map { + case (k, v) => k -> (v + b1.getOrElse(k, 0L)) + } + } + + override def finish(reduction: Map[Long, Long]): Map[Long, Long] = reduction + + // Define encoder for buffer + def bufferEncoder: Encoder[Map[Long, Long]] = ExpressionEncoder() + + // Define encoder for output + def outputEncoder: Encoder[Map[Long, Long]] = ExpressionEncoder() +} \ No newline at end of file diff --git a/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala index dd973b301..e0da1ef81 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala @@ -16,7 +16,6 @@ package org.apache.spark.sql - import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, StatefulApproxQuantile, StatefulHyperloglogPlus} import org.apache.spark.sql.catalyst.expressions.Literal @@ -87,6 +86,18 @@ object DeequFunctions { val statefulKLL = new StatefulKLLSketch(sketchSize, shrinkingFactor) statefulKLL(column) } + + /** + * return DateTime distribution aggregation function + * + * @param column : column on which aggregation to be performed + * @param interval : interval of date time aggregation + * @return Column: aggregation function Column + * */ + def dateTimeDistribution(column: Column, interval: Long): Column = { + val dateTimeAgg = new DateTimeAggregation(interval) + dateTimeAgg.toColumn(column) + } } diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 1e1048921..0c5138395 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -40,6 +40,7 @@ import com.amazon.deequ.repository.MetricsRepository import org.apache.spark.sql.DataFrame import org.apache.spark.sql.expressions.UserDefinedFunction +import java.time.Instant import scala.util.matching.Regex object CheckLevel extends Enumeration { @@ -1236,6 +1237,96 @@ case class Check( columns = List(column), analyzerOptions = analyzerOptions) } + def hasMinTimestamp( + column: String, + assertion: Instant => Boolean, + hint: Option[String] = None) + : CheckWithLastConstraintFilterable = addFilterableConstraint { filter => + minTimestampConstraint(column, assertion, filter, hint) + } + + def hasMaxTimestamp( + column: String, + assertion: Instant => Boolean, + hint: Option[String] = None) + : CheckWithLastConstraintFilterable = addFilterableConstraint { filter => + maxTimestampConstraint(column, assertion, filter, hint) + } + + /** + * Asserts that, in each row, the value of column (DateType or TimestampType) + * is less than the given datetime (Timestamp) + * + * @param column Column to run the assertion on + * @param datetime value of Timestamp to run assert + * @param assertion Function that receives a Timestamp input parameter and returns a boolean + * @param hint A hint to provide additional context why a constraint could have failed + * @return + */ + def isDateTimeLessThan( + column: String, + datetime: Instant, + assertion: Double => Boolean = Check.IsOne, + hint: Option[String] = None) + : CheckWithLastConstraintFilterable = satisfies(s"$column < to_timestamp('${datetime.toString}')", + s"$column is less than '${datetime.toString}'", assertion, + hint = hint) + + /** + * + * Asserts that, in each row, the value of column (DateType or TimestampType) + * is greater than the given datetime (Timestamp) + * + * @param column Column to run the assertion on + * @param datetime value of Timestamp to run assert + * @param assertion Function that receives a Timestamp input parameter and returns a boolean + * @param hint A hint to provide additional context why a constraint could have failed + * @return + */ + def isDateTimeGreaterThan( + column: String, + datetime: Instant, + assertion: Double => Boolean = Check.IsOne, + hint: Option[String] = None) + : CheckWithLastConstraintFilterable = satisfies(s"$column > to_timestamp('${datetime.toString}')", + s"$column is greater than '${datetime.toString}'", assertion, + hint = hint) + + /** + * + * Asserts that, in each row, the value of column (DateType or TimestampType) contains a past date + * + * @param column Column to run the assertion on + * @param assertion Function that receives a Timestamp input parameter and returns a boolean + * @param hint A hint to provide additional context why a constraint could have failed + * @return + */ + def hasPastDates( + column: String, + assertion: Double => Boolean = Check.IsOne, + hint: Option[String] = None) + : CheckWithLastConstraintFilterable = satisfies(s"$column < now()", + s"$column has all past dates", assertion, + hint = hint) + + /** + * + * Asserts that, in each row, the value of column (DateType or TimestampType) + * contains a future date + * + * @param column Column to run the assertion on + * @param assertion Function that receives a Timestamp input parameter and returns a boolean + * @param hint A hint to provide additional context why a constraint could have failed + * @return + */ + def hasFutureDates( + column: String, + assertion: Double => Boolean = Check.IsOne, + hint: Option[String] = None) + : CheckWithLastConstraintFilterable = satisfies(s"$column > now()", + s"$column has all future dates", assertion, + hint = hint) + /** * Evaluate this check on computed metrics * @param context result of the metrics computation diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index e289b3859..eda0a19de 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -22,6 +22,7 @@ import com.amazon.deequ.metrics.Distribution import com.amazon.deequ.metrics.Metric import org.apache.spark.sql.expressions.UserDefinedFunction +import java.time.Instant import scala.util.Failure import scala.util.Success import scala.util.matching.Regex @@ -614,6 +615,42 @@ object Constraint { sparkAssertion) } + def minTimestampConstraint( + column: String, + assertion: Instant => Boolean, + where: Option[String] = None, + hint: Option[String] = None) + : Constraint = { + + val minimum = MinimumDateTime(column, where) + + val constraint = AnalysisBasedConstraint[MinDateTimeState, Instant, Instant]( + minimum, + assertion, + hint = hint + ) + + new NamedConstraint(constraint, s"MinimumTimestampConstraint($minimum)") + } + + def maxTimestampConstraint( + column: String, + assertion: Instant => Boolean, + where: Option[String] = None, + hint: Option[String] = None) + : Constraint = { + + val maximum = MaximumDateTime(column, where) + + val constraint = AnalysisBasedConstraint[MaxDateTimeState, Instant, Instant]( + maximum, + assertion, + hint = hint + ) + + new NamedConstraint(constraint, s"MaximumTimestampConstraint($maximum)") + } + /** * Runs minimum analysis on the given column and executes the assertion * diff --git a/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala b/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala new file mode 100644 index 000000000..558a1a751 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala @@ -0,0 +1,51 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.examples + +import java.time.Instant +import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext} +import com.amazon.deequ.analyzers.{DateTimeDistribution, DistributionInterval, MaximumDateTime, MinimumDateTime} +import com.amazon.deequ.examples.ExampleUtils.{customerAsDataframe, withSpark} +import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame + +private[examples] object DateTimeMetricExample extends App { + withSpark { session => + + val data = customerAsDataframe(session, + Customer(1, "john doe", Instant.parse("2021-11-11T07:15:00Z")), + Customer(2, "marry jane", Instant.parse("2022-01-11T07:45:00Z")), + Customer(3, "Thomas Yu", Instant.parse("2023-02-11T08:15:00Z")), + Customer(4, "Steve Powell", Instant.parse("2019-04-11T12:15:00Z")), + Customer(5, "Andrej Kar", Instant.parse("2020-08-11T12:30:00Z")), + ) + + val analysisResult: AnalyzerContext = { AnalysisRunner + .onData(data) + .addAnalyzer(DateTimeDistribution("dateOfBirth", DistributionInterval.HOURLY)) + .addAnalyzer(MinimumDateTime("dateOfBirth")) + .addAnalyzer(MaximumDateTime("dateOfBirth")) + .run() + } + + successMetricsAsDataFrame(session, analysisResult).show(false) + + analysisResult.metricMap.foreach( x => + println(s"column '${x._2.instance}' has ${x._2.name} : ${x._2.value.get}") + ) + + } +} diff --git a/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala b/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala index 699711a5d..6e8f2ef03 100644 --- a/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala +++ b/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala @@ -45,4 +45,9 @@ private[deequ] object ExampleUtils { val rdd = session.sparkContext.parallelize(manufacturers) session.createDataFrame(rdd) } + + def customerAsDataframe(session: SparkSession, customer: Customer*): DataFrame = { + val rdd = session.sparkContext.parallelize(customer) + session.createDataFrame(rdd) + } } diff --git a/src/main/scala/com/amazon/deequ/examples/entities.scala b/src/main/scala/com/amazon/deequ/examples/entities.scala index f2750ecfe..bb08d3f04 100644 --- a/src/main/scala/com/amazon/deequ/examples/entities.scala +++ b/src/main/scala/com/amazon/deequ/examples/entities.scala @@ -16,6 +16,8 @@ package com.amazon.deequ.examples +import java.time.Instant + private[deequ] case class Item( id: Long, productName: String, @@ -29,3 +31,9 @@ private[deequ] case class Manufacturer( manufacturerName: String, countryCode: String ) + +private[deequ] case class Customer( + id: Long, + name: String, + dateOfBirth: Instant +) diff --git a/src/main/scala/com/amazon/deequ/metrics/Metric.scala b/src/main/scala/com/amazon/deequ/metrics/Metric.scala index 30225e246..4dcb3562f 100644 --- a/src/main/scala/com/amazon/deequ/metrics/Metric.scala +++ b/src/main/scala/com/amazon/deequ/metrics/Metric.scala @@ -18,6 +18,7 @@ package com.amazon.deequ.metrics import org.apache.spark.sql.Column +import java.time.Instant import scala.util.{Failure, Success, Try} object Entity extends Enumeration { @@ -89,3 +90,15 @@ case class KeyedDoubleMetric( } } } + +case class DateTimeMetric( + entity: Entity.Value, + name: String, + instance: String, + value: Try[Instant] +) extends Metric[Instant] { + override def flatten(): Seq[DoubleMetric] = value match { + case Success(v) => Seq(DoubleMetric(entity, "Timestamp milliseconds", instance, Success(v.toEpochMilli.toDouble))) + case Failure(e) => Seq(DoubleMetric(entity, "Timestamp milliseconds", instance, Failure(e))) + } +} From a0338072da1f8ae64e0a2cac1f6eb1cee6cf3795 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 11 Sep 2024 11:12:01 +1000 Subject: [PATCH 2/6] Add java8API enabled for sample --- .../scala/com/amazon/deequ/analyzers/Analyzer.scala | 6 +++--- .../amazon/deequ/analyzers/DateTimeDistribution.scala | 10 +++++++--- .../com/amazon/deequ/analyzers/MaximumDateTime.scala | 2 +- .../deequ/analyzers/catalyst/DateTimeAggregation.scala | 2 +- .../amazon/deequ/examples/DateTimeMetricExample.scala | 6 +++--- .../scala/com/amazon/deequ/examples/ExampleUtils.scala | 1 + 6 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index c0cffd80e..a9ad82ec5 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -443,9 +443,9 @@ object Preconditions { case _ => false } - if (hasDateType) { - throw new WrongColumnTypeException(s"Expected type of column $column to be one of" + - s"${dateTypes.mkString(",")}, but found $columnDataType instead!") + if (!hasDateType) { + throw new WrongColumnTypeException(s"Expected type of column $column to be one of " + + s"${dateTypes.mkString(", ")}, but found $columnDataType instead!") } } diff --git a/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala b/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala index 80768f352..05d0b8907 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala @@ -1,4 +1,4 @@ -/** +/** * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not @@ -49,7 +49,7 @@ object DateTimeDistributionState { frequency: Long ): Map[(Instant, Instant), Long] = { result.map({ - case (x, y) => (new Instant(x), new Instant(x + frequency - 1L)) -> y + case (x, y) => (Instant.ofEpochMilli(x), Instant.ofEpochMilli(x + frequency - 1L)) -> y }) } @@ -127,9 +127,13 @@ case class DateTimeDistribution( object DateTimeDistribution { def apply(column: String, interval: DistributionInterval.Value, - where: Option[String] = None): DateTimeDistribution = + where: Option[String]): DateTimeDistribution = new DateTimeDistribution(column, interval = getDateTimeAggIntervalValue(interval), where) + def apply(column: String, + interval: DistributionInterval.Value): DateTimeDistribution = + new DateTimeDistribution(column, interval = getDateTimeAggIntervalValue(interval), None) + def getDateTimeAggIntervalValue(interval: DistributionInterval.Value): Long = { interval match { case DistributionInterval.QUARTER_HOUR => 900000L // 15 Minutes diff --git a/src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala b/src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala index 3dc2139fe..b74b87ec1 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/MaximumDateTime.scala @@ -1,4 +1,4 @@ -/** +/** * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not diff --git a/src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala index bf30b6ddd..2c9ac79b9 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DateTimeAggregation.scala @@ -1,4 +1,4 @@ -/** +/** * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not diff --git a/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala b/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala index 558a1a751..22d6e560a 100644 --- a/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala +++ b/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala @@ -1,4 +1,4 @@ -/** +/** * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not @@ -36,8 +36,8 @@ private[examples] object DateTimeMetricExample extends App { val analysisResult: AnalyzerContext = { AnalysisRunner .onData(data) .addAnalyzer(DateTimeDistribution("dateOfBirth", DistributionInterval.HOURLY)) - .addAnalyzer(MinimumDateTime("dateOfBirth")) - .addAnalyzer(MaximumDateTime("dateOfBirth")) +// .addAnalyzer(MinimumDateTime("dateOfBirth")) +// .addAnalyzer(MaximumDateTime("dateOfBirth")) .run() } diff --git a/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala b/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala index 6e8f2ef03..5e56654f5 100644 --- a/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala +++ b/src/main/scala/com/amazon/deequ/examples/ExampleUtils.scala @@ -25,6 +25,7 @@ private[deequ] object ExampleUtils { .master("local") .appName("test") .config("spark.ui.enabled", "false") + .config("spark.sql.datetime.java8API.enabled", "true") .getOrCreate() session.sparkContext.setCheckpointDir(System.getProperty("java.io.tmpdir")) From 1878160a87c7e7bfd9154c11662dbc9336caeead Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 11 Sep 2024 12:32:32 +1000 Subject: [PATCH 3/6] Fix dateTime Aggregator error --- .../com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala | 5 +++-- .../com/amazon/deequ/examples/DateTimeMetricExample.scala | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala index e0da1ef81..3963bdb3d 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/catalyst/DeequFunctions.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, StatefulApproxQuantile, StatefulHyperloglogPlus} import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.functions.udaf /* Custom aggregation functions used internally by deequ */ object DeequFunctions { @@ -95,8 +96,8 @@ object DeequFunctions { * @return Column: aggregation function Column * */ def dateTimeDistribution(column: Column, interval: Long): Column = { - val dateTimeAgg = new DateTimeAggregation(interval) - dateTimeAgg.toColumn(column) + val dateTimeDistribution = udaf(new DateTimeAggregation(interval)) + dateTimeDistribution(column) } } diff --git a/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala b/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala index 22d6e560a..14b6bfe03 100644 --- a/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala +++ b/src/main/scala/com/amazon/deequ/examples/DateTimeMetricExample.scala @@ -31,13 +31,14 @@ private[examples] object DateTimeMetricExample extends App { Customer(3, "Thomas Yu", Instant.parse("2023-02-11T08:15:00Z")), Customer(4, "Steve Powell", Instant.parse("2019-04-11T12:15:00Z")), Customer(5, "Andrej Kar", Instant.parse("2020-08-11T12:30:00Z")), + Customer(6, "Ji Sung", Instant.parse("2020-08-11T12:30:00Z")), ) val analysisResult: AnalyzerContext = { AnalysisRunner .onData(data) .addAnalyzer(DateTimeDistribution("dateOfBirth", DistributionInterval.HOURLY)) -// .addAnalyzer(MinimumDateTime("dateOfBirth")) -// .addAnalyzer(MaximumDateTime("dateOfBirth")) + .addAnalyzer(MinimumDateTime("dateOfBirth")) + .addAnalyzer(MaximumDateTime("dateOfBirth")) .run() } From 74d4e16032651706413758ca0aec2d537520850b Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Sat, 14 Sep 2024 15:56:03 +1000 Subject: [PATCH 4/6] Add Tests for dateTime analyzers --- .../analyzers/DateTimeDistribution.scala | 5 +- .../com/amazon/deequ/SparkContextSpec.scala | 14 +++ .../deequ/analyzers/AnalyzerTests.scala | 87 +++++++++++++++++++ .../amazon/deequ/utils/FixtureSupport.scala | 13 +++ 4 files changed, 116 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala b/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala index 05d0b8907..a44fa0b30 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/DateTimeDistribution.scala @@ -57,10 +57,9 @@ object DateTimeDistributionState { val totalCount = histogram.distribution.foldLeft(0L)(_ + _._2) Distribution( histogram.distribution.map { - case (x, y) => - ("(" + x._1.toString + " to " + x._2.toString + ")") -> DistributionValue(y, y.toDouble / totalCount) + case (x, y) => (s"(${x._1} to ${x._2})") -> DistributionValue(y, y.toDouble / totalCount) }, - totalCount + histogram.distribution.keys.size ) } } diff --git a/src/test/scala/com/amazon/deequ/SparkContextSpec.scala b/src/test/scala/com/amazon/deequ/SparkContextSpec.scala index 81b1fd190..46153d5f7 100644 --- a/src/test/scala/com/amazon/deequ/SparkContextSpec.scala +++ b/src/test/scala/com/amazon/deequ/SparkContextSpec.scala @@ -51,6 +51,20 @@ trait SparkContextSpec { } } + /** + * @param testFun thunk to run with SparkSession as an argument + */ + def withSparkSessionJava8APIEnabled(testFun: SparkSession => Any): Unit = { + val session = setupSparkSession() + session.conf.set("spark.sql.datetime.java8API.enabled", "true") + try { + testFun(session) + } finally { + /* empty cache of RDD size, as the referred ids are only valid within a session */ + tearDownSparkSession(session) + } + } + def withSparkSessionIcebergCatalog(testFun: SparkSession => Any): Unit = { val session = setupSparkSession(Some(tmpWareHouseDir.toAbsolutePath.toString)) session.conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala index be5bdc5a6..55e0f1024 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala @@ -33,6 +33,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import java.time.{Instant, LocalDate} import scala.util.Failure import scala.util.Success @@ -495,6 +496,92 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with } } + "DateTimeDistribution analyzer" should { + + def distributionFrom( + nonZeroValues: (Instant, Instant, DistributionValue)*) + : Distribution = { + + val distributionValues = nonZeroValues + .map { case (from, to, distValue) => s"($from to $to)" -> distValue }.toMap + + Distribution(distributionValues, numberOfBins = distributionValues.keys.size) + } + + "fail for non DateType or TimestampType column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfFull(sparkSession) + assert(DateTimeDistribution("item", DistributionInterval.HOURLY).calculate(df).value.isFailure) + } + + "success for DateType column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + assert(DateTimeDistribution("signupDate", DistributionInterval.MONTHLY).calculate(df).value.isSuccess) + } + + "success for Timestamp column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + assert(DateTimeDistribution("dateOfBirth", DistributionInterval.HOURLY).calculate(df).value.isSuccess) + } + + "success get datetimeDistribution with Daily Interval" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + val actualDistribution = DateTimeDistribution("dateOfBirth", DistributionInterval.DAILY).calculate(df).value + actualDistribution shouldBe Success( + distributionFrom( + (Instant.parse("2021-11-11T00:00:00Z"), Instant.parse("2021-11-11T23:59:59.999Z"), DistributionValue(3, 0.6)), + (Instant.parse("2019-04-11T00:00:00Z"), Instant.parse("2019-04-11T23:59:59.999Z"), DistributionValue(2, 0.4)) + ) + ) + } + + "success get datetimeDistribution with Long Interval" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + val actualDistribution = DateTimeDistribution("dateOfBirth", 86400000L).calculate(df).value + actualDistribution shouldBe Success( + distributionFrom( + (Instant.parse("2021-11-11T00:00:00Z"), Instant.parse("2021-11-11T23:59:59.999Z"), DistributionValue(3, 0.6)), + (Instant.parse("2019-04-11T00:00:00Z"), Instant.parse("2019-04-11T23:59:59.999Z"), DistributionValue(2, 0.4)) + ) + ) + } + } + + "MinimumDateTime analyzer" should { + + "fail for non DateType or TimestampType column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfFull(sparkSession) + assert(MinimumDateTime("item").calculate(df).value.isFailure) + } + + "success for DateType column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + assert(MinimumDateTime("signupDate").calculate(df).value.isSuccess) + } + + "success for Timestamp column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + MinimumDateTime("dateOfBirth").calculate(df).value shouldBe Success(Instant.parse("2019-04-11T12:15:00Z")) + } + } + + "MaximumDateTime analyzer" should { + + "fail for non DateType or TimestampType column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfFull(sparkSession) + assert(MaximumDateTime("item").calculate(df).value.isFailure) + } + + "success for DateType column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + assert(MaximumDateTime("signupDate").calculate(df).value.isSuccess) + } + + "success for Timestamp column" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + MaximumDateTime("dateOfBirth").calculate(df).value shouldBe Success(Instant.parse("2021-11-11T09:15:00Z")) + } + } + "Basic statistics" should { "compute mean correctly for numeric data" in withSparkSession { sparkSession => val df = getDfWithNumericValues(sparkSession) diff --git a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala index 3a0866d2f..b3632311c 100644 --- a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala +++ b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession +import java.time.{Instant, LocalDate} import scala.util.Random @@ -146,6 +147,18 @@ trait FixtureSupport { ).toDF("item", "att1", "att2") } + def getDfWithLocalDateAndInstant(sparkSession: SparkSession): DataFrame = { + import sparkSession.implicits._ + + Seq( + (1, "john doe", Instant.parse("2021-11-11T07:15:00Z"), LocalDate.of(2017, 10, 14)), + (2, "marry jane", Instant.parse("2021-11-11T08:15:00Z"), LocalDate.of(2017, 10, 14)), + (3, "Thomas Yu", Instant.parse("2021-11-11T09:15:00Z"), LocalDate.of(2017, 10, 14)), + (4, "Steve Powell", Instant.parse("2019-04-11T12:15:00Z"), LocalDate.of(2017, 11, 14)), + (5, "Andrej Kar", Instant.parse("2019-04-11T13:15:00Z"), LocalDate.of(2017, 11, 14)), + ).toDF("id", "name", "dateOfBirth", "signupDate") + } + def getDfCompleteAndInCompleteColumns(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ From f1087f053b32270472913dc709f2528e57811719 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 9 Oct 2024 07:57:19 +1100 Subject: [PATCH 5/6] Revert pom changes --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 18098249e..0652f5008 100644 --- a/pom.xml +++ b/pom.xml @@ -233,7 +233,7 @@ 1.0.0 false - false + true true false ${project.basedir}/src/main/scala From ba68ad898793eb1d2775ef3a65195edd775c8df4 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Fri, 11 Oct 2024 23:35:06 +1100 Subject: [PATCH 6/6] DateTimeDistribution with verificationSuite Test --- .../amazon/deequ/analyzers/AnalyzerTests.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala index 55e0f1024..341d3ecaf 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala @@ -22,6 +22,8 @@ import com.amazon.deequ.metrics.Distribution import com.amazon.deequ.metrics.DistributionValue import com.amazon.deequ.metrics.DoubleMetric import com.amazon.deequ.metrics.Entity +import com.amazon.deequ.repository.ResultKey +import com.amazon.deequ.repository.memory.InMemoryMetricsRepository import com.amazon.deequ.utils.AssertionUtils.TryUtils import com.amazon.deequ.utils.FixtureSupport import org.apache.spark.sql.Row @@ -534,6 +536,22 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with ) } + "datetimeDistribution analyzer with VerificationSuite" in withSparkSessionJava8APIEnabled { sparkSession => + val df = getDfWithLocalDateAndInstant(sparkSession) + val repository = new InMemoryMetricsRepository + val resultKey = ResultKey(0, Map.empty) + VerificationSuite().onData(df).useRepository(repository) + .addRequiredAnalyzer(DateTimeDistribution("dateOfBirth", DistributionInterval.DAILY)) + .saveOrAppendResult(resultKey).run() + val metric = repository.loadByKey(resultKey).get.allMetrics.head + metric.value shouldBe Success( + distributionFrom( + (Instant.parse("2021-11-11T00:00:00Z"), Instant.parse("2021-11-11T23:59:59.999Z"), DistributionValue(3, 0.6)), + (Instant.parse("2019-04-11T00:00:00Z"), Instant.parse("2019-04-11T23:59:59.999Z"), DistributionValue(2, 0.4)) + ) + ) + } + "success get datetimeDistribution with Long Interval" in withSparkSessionJava8APIEnabled { sparkSession => val df = getDfWithLocalDateAndInstant(sparkSession) val actualDistribution = DateTimeDistribution("dateOfBirth", 86400000L).calculate(df).value