diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContains.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContains.scala new file mode 100644 index 000000000..02877e7e6 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContains.scala @@ -0,0 +1,91 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} + +/** + * Returns true if the sorted array (left) contains the value (right). + * + * If the value (right) is null, null is returned. + * + * Preconditions (unchecked): + * - The array must not be null. + * - Elements in the array must be in ascending order. + * - The array must not contain null elements. + * - The array must not contain duplicate elements. + */ +private[dataskipping] case class SortedArrayContains(left: Expression, right: Expression) + extends BinaryExpression + with Predicate { + + override def prettyName: String = "sorted_array_contains" + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { + val value = right.eval(input) + if (value != null) { + val arr = left.eval(input).asInstanceOf[ArrayData] + val dt = right.dataType + val n = arr.numElements() + if (n > 0 && + ordering.lteq(arr.get(0, dt), value) && + ordering.lteq(value, arr.get(n - 1, dt))) { + val (found, _) = SortedArrayUtils.binarySearch(arr, dt, ordering, 0, n, value) + if (found) return true + } + return false + } + null + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val leftGen = left.genCode(ctx) + val arr = leftGen.value + val rightGen = right.genCode(ctx) + val value = rightGen.value + val dt = right.dataType + val n = ctx.freshName("n") + val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt) + val resultCode = + s""" + |if (!(${rightGen.isNull})) { + | ${leftGen.code} + | ${ev.isNull} = false; + | int $n = $arr.numElements(); + | if ($n > 0 && + | !(${ctx.genGreater(dt, CodeGenerator.getValue(arr, dt, "0"), value)}) && + | !(${ctx.genGreater(dt, value, CodeGenerator.getValue(arr, dt, s"$n - 1"))})) { + | ${ev.value} = $binarySearch($arr, 0, $n, $value).found(); + | } + |} + """.stripMargin + ev.copy(code = code""" + ${rightGen.code} + boolean ${ev.isNull} = true; + boolean ${ev.value} = false; + $resultCode""") + } + + @transient private lazy val ordering: Ordering[Any] = + TypeUtils.getInterpretedOrdering(right.dataType) +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsAny.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsAny.scala new file mode 100644 index 000000000..ab3a14cb4 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsAny.scala @@ -0,0 +1,132 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} +import org.apache.spark.sql.types.DataType + +/** + * Returns true if the sorted array (child) contains any of the values. + * + * If either array is empty, false is returned. + * + * Preconditions (unchecked): + * - Both arrays must not be null. + * - Elements in the arrays must be in ascending order. + * - The left array should not contain duplicate elements. + * - The arrays must not contain null elements. + * + * If the element type can be represented as a primitive type in Scala, + * then the right array must be an array of the primitive type. + */ +private[dataskipping] case class SortedArrayContainsAny( + child: Expression, + values: Any, + elementType: DataType) + extends UnaryExpression + with Predicate { + + override def prettyName: String = "sorted_array_contains_any" + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Boolean = { + val arr1 = child.eval(input).asInstanceOf[ArrayData] + val arr2 = values.asInstanceOf[Array[_]] + val dt = elementType + val n = arr1.numElements() + val m = arr2.length + if (n > 0 && m > 0 && + ordering.lteq(arr1.get(0, dt), arr2(m - 1)) && + ordering.lteq(arr2(0), arr1.get(n - 1, dt))) { + var i = 0 + var j = 0 + do { + val v = arr1.get(i, dt) + while (j < m && ordering.lt(arr2(j), v)) j += 1 + if (j == m) return false + val u = arr2(j) + j += 1 + val (found, k) = SortedArrayUtils.binarySearch(arr1, dt, ordering, i, n, u) + if (found) return true + if (k == n) return false + i = k + } while (j < m) + } + false + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx) + val arr1 = childGen.value + val arr2 = ctx.freshName("values") + val dt = elementType + val javaType = CodeGenerator.javaType(dt) + val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]" + val valuesRef = ctx.addReferenceObj("values", values, arrayType) + val n = ctx.freshName("n") + val m = ctx.freshName("m") + val i = ctx.freshName("i") + val j = ctx.freshName("j") + val v = ctx.freshName("v") + val u = ctx.freshName("u") + val result = ctx.freshName("result") + val binarySearchResultType = + SortedArrayUtils.BinarySearchResult.getClass.getCanonicalName.stripSuffix("$") + val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt) + import CodeGenerator.getValue + val resultCode = + s""" + |int $n = $arr1.numElements(); + |int $m = $arr2.length; + |if ($n > 0 && $m > 0 && + | !(${ctx.genGreater(dt, getValue(arr1, dt, "0"), s"(($javaType) $arr2[$m - 1])")}) && + | !(${ctx.genGreater(dt, s"(($javaType)$arr2[0])", getValue(arr1, dt, s"$n - 1"))})) { + | int $i = 0; + | int $j = 0; + | do { + | $javaType $v = ${getValue(arr1, dt, i)}; + | while ($j < $m && ${ctx.genGreater(dt, v, s"(($javaType) $arr2[$j])")}) $j += 1; + | if ($j == $m) break; + | $javaType $u = ($javaType) $arr2[$j]; + | $j += 1; + | $binarySearchResultType $result = $binarySearch($arr1, $i, $n, $u); + | if ($result.found()) { + | ${ev.value} = true; + | break; + | } + | if ($result.index() == $n) break; + | $i = $result.index(); + | } while ($j < $m); + |} + """.stripMargin + ev.copy( + code = code""" + ${childGen.code} + $arrayType $arr2 = $valuesRef; + boolean ${ev.value} = false; + $resultCode""", + isNull = FalseLiteral) + } + + @transient private lazy val ordering: Ordering[Any] = + TypeUtils.getInterpretedOrdering(elementType) +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/ValueListSketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/ValueListSketch.scala new file mode 100644 index 000000000..fa06f8663 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketches/ValueListSketch.scala @@ -0,0 +1,103 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketches + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.sql.types.{ArrayType, DataType} + +import com.microsoft.hyperspace.index.dataskipping.expressions._ +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils + +/** + * Sketch based on distinct values for a given expression. + * + * This is not really a sketch, as it stores all distinct values for a given + * expression. It can be useful when the number of distinct values is expected to + * be small and each file tends to store only a subset of the values. + */ +case class ValueListSketch( + override val expr: String, + override val dataType: Option[DataType] = None) + extends SingleExprSketch[ValueListSketch](expr, dataType) { + override def name: String = "ValueList" + + override def withNewExpression(newExpr: (String, Option[DataType])): ValueListSketch = { + copy(expr = newExpr._1, dataType = newExpr._2) + } + + override def aggregateFunctions: Seq[Expression] = + new ArraySort(CollectSet(parsedExpr).toAggregateExpression()) :: Nil + + override def convertPredicate( + predicate: Expression, + resolvedExprs: Seq[Expression], + sketchValues: Seq[Expression], + nameMap: Map[ExprId, String], + valueExtractor: ExpressionExtractor): Option[Expression] = { + val valueList = sketchValues.head + val min = ElementAt(valueList, Literal(1)) + val max = ElementAt(valueList, Literal(-1)) + // TODO: Consider shared sketches + // HasNullSketch as described in MinMaxSketch.convertPredicate + // can be useful for ValueListSketch too, as it can be used to + // to optimize Not(EqualTo) as well as IsNull. + val resolvedExpr = resolvedExprs.head + val dataType = resolvedExpr.dataType + val exprExtractor = NormalizedExprExtractor(resolvedExpr, nameMap) + val ExprIsTrue = IsTrueExtractor(exprExtractor) + val ExprIsFalse = IsFalseExtractor(exprExtractor) + val ExprIsNotNull = IsNotNullExtractor(exprExtractor) + val ExprEqualTo = EqualToExtractor(exprExtractor, valueExtractor) + val ExprEqualNullSafe = EqualNullSafeExtractor(exprExtractor, valueExtractor) + val ExprLessThan = LessThanExtractor(exprExtractor, valueExtractor) + val ExprLessThanOrEqualTo = LessThanOrEqualExtractor(exprExtractor, valueExtractor) + val ExprGreaterThan = GreaterThanExtractor(exprExtractor, valueExtractor) + val ExprGreaterThanOrEqualTo = GreaterThanOrEqualExtractor(exprExtractor, valueExtractor) + val ExprIn = InExtractor(exprExtractor, valueExtractor) + val ExprInSet = InSetExtractor(exprExtractor) + def Empty(arr: Expression) = EqualTo(Size(arr), Literal(0)) + Option(predicate).collect { + case ExprIsTrue(_) => ArrayContains(valueList, Literal(true)) + case ExprIsFalse(_) => ArrayContains(valueList, Literal(false)) + case ExprIsNotNull(_) => Not(Empty(valueList)) + case ExprEqualTo(_, v) => SortedArrayContains(valueList, v) + case ExprEqualNullSafe(_, v) => Or(IsNull(v), SortedArrayContains(valueList, v)) + case Not(ExprEqualTo(_, v)) => + And( + IsNotNull(v), + Or( + GreaterThan(Size(valueList), Literal(1)), + Not(EqualTo(ElementAt(valueList, Literal(1)), v)))) + case ExprLessThan(_, v) => LessThan(min, v) + case ExprLessThanOrEqualTo(_, v) => LessThanOrEqual(min, v) + case ExprGreaterThan(_, v) => GreaterThan(max, v) + case ExprGreaterThanOrEqualTo(_, v) => GreaterThanOrEqual(max, v) + case ExprIn(_, vs) => + vs.map(v => SortedArrayContains(valueList, v)).reduceLeft(Or) + case ExprInSet(_, vs) => + SortedArrayContainsAny( + valueList, + ArrayUtils.toArray( + vs.filter(_ != null).toArray.sorted(TypeUtils.getInterpretedOrdering(dataType)), + dataType), + dataType) + // TODO: StartsWith, Like with constant prefix + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala index e10dd7a6b..7bf8bbabc 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala @@ -17,7 +17,7 @@ package com.microsoft.hyperspace.index.dataskipping import org.apache.hadoop.fs.Path -import org.apache.spark.sql.functions.{input_file_name, max, min} +import org.apache.spark.sql.functions.{array_sort, collect_set, input_file_name, max, min} import org.apache.spark.sql.types.{IntegerType, LongType, StringType} import org.apache.spark.util.sketch.BloomFilter @@ -86,6 +86,19 @@ class DataSkippingIndexConfigTest extends DataSkippingSuite with BloomFilterTest checkAnswer(indexData, withFileId(expectedSketchValues)) } + test("createIndex works correctly with a ValueListSketch.") { + val sourceData = + createSourceData(spark.range(100).selectExpr("cast(id / 10 as int) as A").toDF) + val indexConfig = DataSkippingIndexConfig("MyIndex", ValueListSketch("A")) + val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map()) + assert(index.sketches === Seq(ValueListSketch("A", Some(IntegerType)))) + val expectedSketchValues = sourceData + .groupBy(input_file_name().as(fileNameCol)) + .agg(array_sort(collect_set("A"))) + checkAnswer(indexData, withFileId(expectedSketchValues)) + assert(indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "ValueList_A__0")) + } + test("createIndex works correctly with a BloomFilterSketch.") { val sourceData = createSourceData(spark.range(100).toDF("A")) val indexConfig = DataSkippingIndexConfig("MyIndex", BloomFilterSketch("A", 0.001, 20)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala index 263425fb6..95246d5c1 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala @@ -234,6 +234,125 @@ class DataSkippingIndexIntegrationTest extends DataSkippingSuite with IcebergTes assert(ex.getCause().getMessage().contains("BloomFilter does not support DoubleType")) } + test("ValueList index is applied for a filter query (EqualTo).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test("ValueList index is applied for a filter query (Not(EqualTo)).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(10).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A != 1") + checkIndexApplied(query, 9) + } + } + } + + test( + "ValueList index is applied for a filter query (EqualTo) " + + "where some source data files has only null values.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(Seq[Integer](1, 2, 3, null, 5, null, 7, 8, 9, null).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + } + + test("ValueList index is applied for a filter query (multiple EqualTo's).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A = 1 or A = 12 or A = 20") + checkIndexApplied(query, 3) + } + } + } + + test("ValueList index is applied for a filter query (In).") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A in (20, 30, 10, 20)") + checkIndexApplied(query, 3) + } + } + } + + test("ValueList index is applied for a filter query (In) - string type.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(Seq.range(0, 100).map(n => s"foo$n").toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("A"))) + def query: DataFrame = df.filter("A in ('foo31', 'foo12', 'foo1')") + checkIndexApplied(query, 3) + } + } + } + + test("ValueList index is applied for a filter query with UDF returning boolean.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).toDF("A")) + spark.udf.register("F", (a: Int) => a < 15) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("F(A)"))) + def query: DataFrame = df.filter("F(A)") + checkIndexApplied(query, 2) + } + } + } + + test( + "ValueList index is applied for a filter query with UDF " + + "taking two arguments and returning boolean.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + spark.udf.register("F", (a: Int, b: Int) => a < 15 || b > 190) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("F(A, B)"))) + def query: DataFrame = df.filter("F(A, B)") + checkIndexApplied(query, 3) + } + } + } + + test( + "ValueList index is applied for a filter query with UDF " + + "taking binary and returning boolean.") { + withAndWithoutCodegen { + withIndex("myind") { + val df = createSourceData( + Seq( + Array[Byte](0, 0, 0, 0), + Array[Byte](0, 1, 0, 1), + Array[Byte](1, 2, 3, 4), + Array[Byte](5, 6, 7, 8), + Array[Byte](32, 32, 32, 32), + Array[Byte](64, 64, 64, 64), + Array[Byte](1, 1, 1, 1), + Array[Byte](-128, -128, -128, -128), + Array[Byte](127, 127, 127, 127), + Array[Byte](-1, 1, 0, 0)).toDF("A")) + spark.udf.register("F", (a: Array[Byte]) => a.sum == 0) + hs.createIndex(df, DataSkippingIndexConfig("myind", ValueListSketch("F(A)"))) + def query: DataFrame = df.filter("F(A)") + checkIndexApplied(query, 4) + } + } + } + test( "DataSkippingIndex works correctly for CSV where the same source data files can be " + "interpreted differently.") { diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala index 414e3c786..db96141f0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexTest.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.{IntegerType, StructType} import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index.{Content, FileInfo, Index, IndexConstants} -import com.microsoft.hyperspace.index.dataskipping.sketches.MinMaxSketch +import com.microsoft.hyperspace.index.dataskipping.sketches.{MinMaxSketch, ValueListSketch} import com.microsoft.hyperspace.util.JsonUtils class DataSkippingIndexTest extends DataSkippingSuite { @@ -49,11 +49,27 @@ class DataSkippingIndexTest extends DataSkippingSuite { assert(index.indexedColumns === Seq("A", "B")) } + test("indexedColumns returns indexed columns of sketches (mixed sketch types).") { + val index = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B")), emptyStructType) + assert(index.indexedColumns === Seq("A", "B")) + } + + test("indexedColumns returns indexed columns without duplicates.") { + val index = + DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("A")), emptyStructType) + assert(index.indexedColumns === Seq("A")) + } + test("referencedColumns returns indexed columns of sketches.") { val index = DataSkippingIndex(Seq(MinMaxSketch("A"), MinMaxSketch("B")), emptyStructType) assert(index.referencedColumns === Seq("A", "B")) } + test("referencedColumns returns indexed columns of sketches (mixed sketch types).") { + val index = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B")), emptyStructType) + assert(index.referencedColumns === Seq("A", "B")) + } + test( "withNewProperties returns a new index which copies the original index except the " + "properties.") { @@ -68,11 +84,22 @@ class DataSkippingIndexTest extends DataSkippingSuite { assert(index.statistics() === Map("sketches" -> "MinMax(A), MinMax(B)")) } + test("statistics returns a string-formatted list of sketches (mixed sketch types).") { + val index = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B")), emptyStructType) + assert(index.statistics() === Map("sketches" -> "MinMax(A), ValueList(B)")) + } + test("canHandleDeletedFiles returns true.") { val index = DataSkippingIndex(Seq(MinMaxSketch("A")), emptyStructType) assert(index.canHandleDeletedFiles === true) } + test("Two indexes are equal if they have the same set of sketches.") { + val index1 = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B")), emptyStructType) + val index2 = DataSkippingIndex(Seq(ValueListSketch("B"), MinMaxSketch("A")), emptyStructType) + assert(index1 === index2) + } + test("write writes the index data in a Parquet format.") { val sourceData = createSourceData(spark.range(100).toDF("A")) val indexConfig = DataSkippingIndexConfig("myIndex", MinMaxSketch("A")) @@ -253,11 +280,25 @@ class DataSkippingIndexTest extends DataSkippingSuite { assert(ds1.hashCode === ds2.hashCode) } + test("Indexes are equal if they have the same sketches and data types (mixed sketch types).") { + val ds1 = DataSkippingIndex(Seq(MinMaxSketch("A"), ValueListSketch("B")), emptyStructType) + val ds2 = DataSkippingIndex(Seq(ValueListSketch("B"), MinMaxSketch("A")), emptyStructType) + assert(ds1 === ds2) + assert(ds1.hashCode === ds2.hashCode) + } + test("Indexes are not equal to objects which are not indexes.") { val ds = DataSkippingIndex(Seq(MinMaxSketch("A")), emptyStructType) assert(ds !== "ds") } + test("Indexes are not equal if they don't have the same sketches.") { + val ds1 = DataSkippingIndex(Seq(MinMaxSketch("A")), emptyStructType) + val ds2 = DataSkippingIndex(Seq(ValueListSketch("A")), emptyStructType) + assert(ds1 !== ds2) + assert(ds1.hashCode !== ds2.hashCode) + } + test("Index can be serialized.") { val ds = DataSkippingIndex( Seq(MinMaxSketch("A", Some(IntegerType))), diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsAnyTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsAnyTest.scala new file mode 100644 index 000000000..c76c55a34 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsAnyTest.scala @@ -0,0 +1,86 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.ArrayTestUtils +import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray + +class SortedArrayContainsAnyTest + extends HyperspaceSuite + with ArrayTestUtils + with ExpressionEvalHelper { + def test(arr1: Expression, arr2: Expression, expected: Boolean): Unit = { + val elementType = arr2.dataType.asInstanceOf[ArrayType].elementType + checkEvaluation( + SortedArrayContainsAny( + arr1, + toArray( + arr2.asInstanceOf[Literal].value.asInstanceOf[ArrayData].toObjectArray(elementType), + elementType), + elementType), + expected) + } + + test("SortedArrayContainsAny returns true if two arrays intersect.") { + val array1 = createArray(Seq.range(0, 100000).map(_ * 2), IntegerType) + val array2 = createArray(Seq(0), IntegerType) + val array3 = createArray(Seq(2), IntegerType) + val array4 = createArray(Seq(199998), IntegerType) + val array5 = createArray(Seq(2, 4, 5), IntegerType) + val array6 = createArray(Seq(1, 3, 199998), IntegerType) + val array7 = createArray(Seq(-1, 100000), IntegerType) + val array8 = createArray(Seq(100000, 200001), IntegerType) + test(array1, array2, true) + test(array1, array3, true) + test(array1, array4, true) + test(array1, array5, true) + test(array1, array6, true) + test(array1, array7, true) + test(array1, array8, true) + test(array3, array5, true) + test(array4, array6, true) + test(array7, array8, true) + } + + test("SortedArrayContainsAny returns false if two arrays don't intersect.") { + val array1 = createArray(Seq.range(0, 100000).map(_ * 2), IntegerType) + val array2 = createArray(Seq(), IntegerType) + val array3 = createArray(Seq(-1), IntegerType) + val array4 = createArray(Seq(1), IntegerType) + val array5 = createArray(Seq(200001), IntegerType) + val array6 = createArray(Seq(1, 3, 199999), IntegerType) + val array7 = createArray(Seq(-1, 100001), IntegerType) + val array8 = createArray(Seq(49999, 100001), IntegerType) + val array9 = createArray(Seq(-3, 1, 1), IntegerType) + test(array1, array2, false) + test(array1, array3, false) + test(array1, array4, false) + test(array1, array5, false) + test(array1, array6, false) + test(array1, array7, false) + test(array1, array9, false) + test(array2, array3, false) + test(array3, array4, false) + test(array5, array6, false) + test(array6, array7, false) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsTest.scala new file mode 100644 index 000000000..bbd8546a3 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/expressions/SortedArrayContainsTest.scala @@ -0,0 +1,85 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.expressions + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.ArrayTestUtils + +class SortedArrayContainsTest + extends HyperspaceSuite + with ArrayTestUtils + with ExpressionEvalHelper { + test("SortedArrayContains works correctly for an empty array.") { + val array = createArray(Nil, IntegerType) + checkEvaluation(SortedArrayContains(array, Literal(0, IntegerType)), false) + } + + test("SortedArrayContains works correctly for a array of size 1.") { + val array = createArray(Seq(1), IntegerType) + checkEvaluation(SortedArrayContains(array, Literal(0, IntegerType)), false) + checkEvaluation(SortedArrayContains(array, Literal(1, IntegerType)), true) + checkEvaluation(SortedArrayContains(array, Literal(2, IntegerType)), false) + } + + test("SortedArrayContains works correctly for a array of size 2.") { + val array = createArray(Seq(1, 3), IntegerType) + checkEvaluation(SortedArrayContains(array, Literal(0, IntegerType)), false) + checkEvaluation(SortedArrayContains(array, Literal(1, IntegerType)), true) + checkEvaluation(SortedArrayContains(array, Literal(2, IntegerType)), false) + checkEvaluation(SortedArrayContains(array, Literal(3, IntegerType)), true) + checkEvaluation(SortedArrayContains(array, Literal(4, IntegerType)), false) + } + + test("SortedArrayContains works correctly for an int array.") { + val values = Seq.range(0, 50).map(_ * 2) + val array = createArray(values, IntegerType) + values.foreach(v => + checkEvaluation(SortedArrayContains(array, Literal(v, IntegerType)), true)) + checkEvaluation(SortedArrayContains(array, Literal(-10, IntegerType)), false) + checkEvaluation(SortedArrayContains(array, Literal(1, IntegerType)), false) + checkEvaluation(SortedArrayContains(array, Literal(49, IntegerType)), false) + checkEvaluation(SortedArrayContains(array, Literal(1000, IntegerType)), false) + } + + test("SortedArrayContains works correctly for a long array.") { + val values = Seq.range(0L, 50L).map(_ * 2) + val array = createArray(values, LongType) + values.foreach(v => checkEvaluation(SortedArrayContains(array, Literal(v, LongType)), true)) + checkEvaluation(SortedArrayContains(array, Literal(-10L, LongType)), false) + checkEvaluation(SortedArrayContains(array, Literal(1L, LongType)), false) + checkEvaluation(SortedArrayContains(array, Literal(49L, LongType)), false) + checkEvaluation(SortedArrayContains(array, Literal(1000L, LongType)), false) + } + + test("SortedArrayContains works correctly for a string array.") { + val values = Seq("hello", "world", "foo", "bar", "footrix").sorted + val array = createArray(values, StringType) + values.foreach(v => + checkEvaluation(SortedArrayContains(array, Literal.create(v, StringType)), true)) + checkEvaluation(SortedArrayContains(array, Literal.create("abc", StringType)), false) + checkEvaluation(SortedArrayContains(array, Literal.create("fooo", StringType)), false) + checkEvaluation(SortedArrayContains(array, Literal.create("zoo", StringType)), false) + } + + test("SortedArrayContains returns null if the value is null.") { + val array = createArray(Seq(1), IntegerType) + checkEvaluation(SortedArrayContains(array, Literal(null, IntegerType)), null) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala index 9175c766c..7b9d19c47 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rules/ApplyDataSkippingIndexTest.scala @@ -106,6 +106,9 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { 17, null, 19, 20).toDF("A")), "source [A:Int] with nulls") + def dataIS: SourceData = + SourceData(() => createSourceData(spark.range(10).toDF("A")), "source [A:Int] small") + def dataIIP: SourceData = SourceData( () => @@ -217,6 +220,7 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataI, "A is not null", MinMaxSketch("A"), 10), Param(dataI, "!(A is not null)", MinMaxSketch("A"), 10), Param(dataI, "A <=> 10", MinMaxSketch("A"), 1), + Param(dataI, "A <=> 10", ValueListSketch("A"), 1), Param(dataI, "10 <=> A", MinMaxSketch("A"), 1), Param(dataI, "A <=> null", MinMaxSketch("A"), 10), Param(dataI, "A <25", MinMaxSketch("A"), 3), @@ -237,12 +241,24 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataI, "!(A < 20)", MinMaxSketch("A"), 8), Param(dataI, "not (A not in (1, 2, 3))", MinMaxSketch("A"), 1), Param(dataS, "A < 'foo'", MinMaxSketch("A"), 1), + Param(dataS, "A in ('foo1', 'foo9')", ValueListSketch("A"), 2), Param(dataS, "A in ('foo1', 'foo5', 'foo9')", BloomFilterSketch("A", 0.01, 10), 3), Param( dataS, "A in ('foo1','goo1','hoo1','i1','j','k','l','m','n','o','p')", BloomFilterSketch("A", 0.01, 10), 1), + Param(dataI, "A = 10", ValueListSketch("A"), 1), + Param(dataI, "10 = A", ValueListSketch("a"), 1), + Param(dataIS, "A != 5", ValueListSketch("A"), 9), + Param(dataIS, "5 != A", ValueListSketch("A"), 9), + Param(dataIN, "a!=9", ValueListSketch("a"), 6), + Param(dataIN, "9 != A", ValueListSketch("A"), 6), + Param(dataI, "A != 5", ValueListSketch("A"), 10), + Param(dataI, "A < 34", ValueListSketch("A"), 4), + Param(dataI, "34 > A", ValueListSketch("A"), 4), + Param(dataIN, "A < 9", ValueListSketch("a"), 2), + Param(dataIN, "9 > A", ValueListSketch("A"), 2), Param(dataI, "A = 10", BloomFilterSketch("A", 0.01, 10), 1), Param(dataI, "A <=> 20", BloomFilterSketch("A", 0.01, 10), 1), Param(dataI, "A <=> null", BloomFilterSketch("A", 0.01, 10), 10), @@ -261,6 +277,7 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { Param(dataI, "a = 10", MinMaxSketch("A"), 1), Param(dataI, "A = 10", MinMaxSketch("a"), 1), Param(dataI, "A in (1, 2, 3, null, 10)", MinMaxSketch("A"), 2), + Param(dataI, "A in (2, 3, 10, 99)", ValueListSketch("a"), 3), Param(dataI, "A in (10,9,8,7,6,5,4,3,2,1,50,49,48,47,46,45)", MinMaxSketch("A"), 4), Param(dataS, "A in ('foo1', 'foo5', 'foo9')", MinMaxSketch("A"), 3), Param( @@ -276,6 +293,21 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { "A in (x'00',x'01',x'02',x'03',x'04',x'05',x'06',x'07',x'08',x'09',x'0a',x'20202020')", MinMaxSketch("A"), 1), + Param(dataI, "A in (10,9,8,7,6,5,4,3,2,1,50,49,48,47,46,45)", ValueListSketch("A"), 4), + Param(dataS, "A in ('foo1', 'foo5', 'foo9')", ValueListSketch("A"), 3), + Param( + dataS, + "A in ('foo1','a','b','c','d','e','f','g','h','i','j','k')", + ValueListSketch("A"), + 1), + Param(dataD, "A in (1,2,3,15,16,17)", ValueListSketch("A"), 2), + Param(dataD, "A in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16)", ValueListSketch("A"), 2), + Param(dataB, "A in (x'00000000', x'0001', x'0002', x'05060708')", ValueListSketch("A"), 2), + Param( + dataB, + "A in (x'00',x'01',x'02',x'03',x'04',x'05',x'06',x'07',x'08',x'09',x'0a',x'20202020')", + ValueListSketch("A"), + 1), Param(dataI, "A BETWEEN 27 AND 51", MinMaxSketch("A"), 4), Param(dataI, "IF(A=1,2,3)=2", MinMaxSketch("A"), 10), Param(dataII, "A = 10 OR B = 50", Seq(MinMaxSketch("A"), MinMaxSketch("B")), 2), @@ -325,6 +357,12 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { MinMaxSketch("is_less_than_23(A)"), 8, () => spark.udf.register("is_less_than_23", (a: Int) => a < 23)), + Param( + dataI, + "!is_less_than_23(A)", + ValueListSketch("is_less_than_23(A)"), + 8, + () => spark.udf.register("is_less_than_23", (a: Int) => a < 23)), Param( dataII, "A < 50 and F(A,B) < 20", @@ -342,7 +380,13 @@ class ApplyDataSkippingIndexTest extends DataSkippingSuite { "IF(A IS NULL,NULL,F(A))=2", MinMaxSketch("A"), 10, - () => spark.udf.register("F", (a: Int) => a * 2))).foreach { + () => spark.udf.register("F", (a: Int) => a * 2)), + Param( + dataB, + "F(A)", + ValueListSketch("f(A)"), + 4, + () => spark.udf.register("F", (a: Array[Byte]) => a.sum == 0))).foreach { case Param(sourceData, filter, sketches, numExpectedFiles, setup) => test( s"applyIndex works as expected for ${sourceData.description}: " + diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/ValueListSketchTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/ValueListSketchTest.scala new file mode 100644 index 000000000..484fa0af1 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketches/ValueListSketchTest.scala @@ -0,0 +1,267 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.sketches + +import org.apache.spark.sql.{Column, QueryTest} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.expressions._ + +class ValueListSketchTest extends QueryTest with HyperspaceSuite { + import spark.implicits._ + + val valueExtractor = AttrValueExtractor(Map.empty) + + test("indexedColumns returns the indexed column.") { + val sketch = ValueListSketch("A") + assert(sketch.indexedColumns === Seq("A")) + } + + test("referencedColumns returns the indexed column.") { + val sketch = ValueListSketch("A") + assert(sketch.referencedColumns === Seq("A")) + } + + test("aggregateFunctions returns an aggregation function that collects all unique values.") { + val sketch = ValueListSketch("A") + val aggrs = sketch.aggregateFunctions.map(new Column(_)) + val data = Seq(1, -1, 10, 2, 4, 2, 0, 10).toDF("A") + checkAnswer(data.select(aggrs: _*), Seq(Array(-1, 0, 1, 2, 4, 10)).toDF) + } + + test("toString returns a reasonable string.") { + val sketch = ValueListSketch("A") + assert(sketch.toString === "ValueList(A)") + } + + test("Two sketches are equal if their columns are equal.") { + assert(ValueListSketch("A") === ValueListSketch("A")) + assert(ValueListSketch("A") !== ValueListSketch("a")) + assert(ValueListSketch("b") !== ValueListSketch("B")) + assert(ValueListSketch("B") === ValueListSketch("B")) + } + + test("hashCode is reasonably implemented.") { + assert(ValueListSketch("A").hashCode === ValueListSketch("A").hashCode) + assert(ValueListSketch("A").hashCode !== ValueListSketch("a").hashCode) + } + + test("covertPredicate converts EqualTo(