Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Data Skipping Index Part 5: ValueListSketch
Browse files Browse the repository at this point in the history
  • Loading branch information
Chungmin Lee committed Aug 2, 2021
1 parent f8202d9 commit 42a661e
Show file tree
Hide file tree
Showing 10 changed files with 950 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.sketch

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types.{ArrayType, DataType}

import com.microsoft.hyperspace.index.dataskipping.util._
import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray

/**
* Sketch based on distinct values for a given expression.
*
* This is not really a sketch, as it stores all distinct values for a given
* expression. It can be useful when the number of distinct values is expected to
* be small and each file tends to store only a subset of the values.
*/
case class ValueListSketch(
override val expr: String,
override val dataType: Option[DataType] = None)
extends SingleExprSketch[ValueListSketch](expr, dataType) {
override def name: String = "ValueList"

override def withNewExpression(newExpr: (String, Option[DataType])): ValueListSketch = {
copy(expr = newExpr._1, dataType = newExpr._2)
}

override def aggregateFunctions: Seq[Expression] =
new ArraySort(CollectSet(parsedExpr).toAggregateExpression()) :: Nil

override def convertPredicate(
predicate: Expression,
sketchValues: Seq[Expression],
nameMap: Map[ExprId, String],
resolvedExprs: Seq[Expression]): Option[Expression] = {
val valueList = sketchValues(0)
val min = ElementAt(valueList, Literal(1))
val max = ElementAt(valueList, Literal(-1))
// TODO: Consider shared sketches
// HasNullSketch as described in MinMaxSketch.convertPredicate
// can be useful for ValueListSketch too, as it can be used to
// to optimize Not(EqualTo) as well as IsNull.
val resolvedExpr = resolvedExprs.head
val dataType = resolvedExpr.dataType
val ordering = TypeUtils.getInterpretedOrdering(dataType)
val exprMatcher = NormalizedExprMatcher(resolvedExpr, nameMap)
val ExprIsTrue = IsTrueExtractor(exprMatcher)
val ExprIsFalse = IsFalseExtractor(exprMatcher)
val ExprIsNotNull = IsNotNullExtractor(exprMatcher)
val ExprEqualTo = EqualToExtractor(exprMatcher)
val ExprLessThan = LessThanExtractor(exprMatcher)
val ExprLessThanOrEqual = LessThanOrEqualToExtractor(exprMatcher)
val ExprGreaterThan = GreaterThanExtractor(exprMatcher)
val ExprGreaterThanOrEqual = GreaterThanOrEqualToExtractor(exprMatcher)
val ExprIn = InExtractor(exprMatcher)
val ExprInSet = InSetExtractor(exprMatcher)
def Empty(arr: Expression) = EqualTo(Size(arr), Literal(0))
Option(predicate).collect {
case ExprIsTrue() => ArrayContains(valueList, Literal(true))
case ExprIsFalse() => ArrayContains(valueList, Literal(false))
case ExprIsNotNull() => Not(Empty(valueList))
case ExprEqualTo(v) => SortedArrayContains(valueList, v)
case Not(ExprEqualTo(v)) =>
Or(
GreaterThan(Size(valueList), Literal(1)),
Not(EqualTo(ElementAt(valueList, Literal(1)), v)))
case ExprLessThan(v) => LessThan(min, v)
case ExprLessThanOrEqual(v) => LessThanOrEqual(min, v)
case ExprGreaterThan(v) => GreaterThan(max, v)
case ExprGreaterThanOrEqual(v) => GreaterThanOrEqual(max, v)
case ExprIn(vs) =>
SortedArrayContainsAny(valueList, toArray(vs.map(_.eval()).sorted(ordering), dataType))
case ExprInSet(vs) =>
SortedArrayContainsAny(
valueList,
toArray(vs.filter(_ != null).toArray.sorted(ordering), dataType))
// TODO: StartsWith, Like with constant prefix
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}
import org.apache.spark.sql.types.BooleanType

/**
* Returns true if the sorted array (left) might contain the value (right).
*
* The array must not be null.
* Elements in the array must be in ascending order.
* The array must not contain null elements.
* The array must not contain duplicate elements.
* The value must not be null.
*/
case class SortedArrayContains(left: Expression, right: Expression)
extends BinaryExpression
with Predicate {

override def prettyName: String = "sorted_array_contains"

@transient private lazy val ordering: Ordering[Any] =
TypeUtils.getInterpretedOrdering(right.dataType)

override def nullable: Boolean = false

override def eval(input: InternalRow): Boolean = {
val arr = left.eval(input).asInstanceOf[ArrayData]
val value = right.eval(input)
val dt = right.dataType
val n = arr.numElements()
if (n > 0 &&
ordering.lteq(arr.get(0, dt), value) &&
ordering.lteq(value, arr.get(n - 1, dt))) {
val (found, _) = SortedArrayUtils.binarySearch(arr, dt, ordering, 0, n, value)
if (found) return true
}
false
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val leftGen = left.genCode(ctx)
val arr = leftGen.value
val rightGen = right.genCode(ctx)
val value = rightGen.value
val dt = right.dataType
val n = ctx.freshName("n")
val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt)
val resultCode =
s"""
|int $n = $arr.numElements();
|if ($n > 0 &&
| !(${ctx.genGreater(dt, CodeGenerator.getValue(arr, dt, "0"), value)}) &&
| !(${ctx.genGreater(dt, value, CodeGenerator.getValue(arr, dt, s"$n - 1"))})) {
| ${ev.value} = $binarySearch($arr, 0, $n, $value).found();
|}
""".stripMargin
ev.copy(
code = code"""
${leftGen.code}
${rightGen.code}
boolean ${ev.value} = false;
$resultCode""",
isNull = FalseLiteral)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}
import org.apache.spark.sql.types.{ArrayType, BooleanType}

/**
* Returns true if the sorted array (child) contains any of the values.
*
* If either array is empty, false is returned.
*
* Both arrays must not be null.
* Elements in the arrays must be in ascending order.
* The left array should not contain duplicate elements.
* The arrays must not contain null elements.
*
* If the element type can be represented as a primitive type in Scala,
* then the right array must be an array of the primitive type.
*/
case class SortedArrayContainsAny(child: Expression, values: Any)
extends UnaryExpression
with Predicate {

override def prettyName: String = "sorted_array_contains_any"

@transient private lazy val ordering: Ordering[Any] =
TypeUtils.getInterpretedOrdering(child.dataType.asInstanceOf[ArrayType].elementType)

override def nullable: Boolean = false

override def eval(input: InternalRow): Boolean = {
val arr1 = child.eval(input).asInstanceOf[ArrayData]
val arr2 = values.asInstanceOf[Array[_]]
val dt = child.dataType.asInstanceOf[ArrayType].elementType
val n = arr1.numElements()
val m = arr2.length
if (n > 0 && m > 0 &&
ordering.lteq(arr1.get(0, dt), arr2(m - 1)) &&
ordering.lteq(arr2(0), arr1.get(n - 1, dt))) {
var i = 0
var j = 0
do {
val v = arr1.get(i, dt)
while (j < m && ordering.lt(arr2(j), v)) j += 1
if (j == m) return false
val u = arr2(j)
j += 1
val (found, k) = SortedArrayUtils.binarySearch(arr1, dt, ordering, i, n, u)
if (found) return true
if (k == n) return false
i = k
} while (j < m)
}
false
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
val arr1 = childGen.value
val arr2 = ctx.freshName("values")
val dt = child.dataType.asInstanceOf[ArrayType].elementType
val javaType = CodeGenerator.javaType(dt)
val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]"
val valuesRef = ctx.addReferenceObj("values", values, arrayType)
val n = ctx.freshName("n")
val m = ctx.freshName("m")
val i = ctx.freshName("i")
val j = ctx.freshName("j")
val v = ctx.freshName("v")
val u = ctx.freshName("u")
val result = ctx.freshName("result")
val binarySearchResultType =
SortedArrayUtils.BinarySearchResult.getClass.getCanonicalName.stripSuffix("$")
val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt)
import CodeGenerator.getValue
val resultCode =
s"""
|int $n = $arr1.numElements();
|int $m = $arr2.length;
|if ($n > 0 && $m > 0 &&
| !(${ctx.genGreater(dt, getValue(arr1, dt, "0"), s"($javaType) $arr2[$m - 1]")}) &&
| !(${ctx.genGreater(dt, s"($javaType) $arr2[0]", getValue(arr1, dt, s"$n - 1"))})) {
| int $i = 0;
| int $j = 0;
| do {
| $javaType $v = ${getValue(arr1, dt, i)};
| while ($j < $m && ${ctx.genGreater(dt, v, s"($javaType) $arr2[$j]")}) $j += 1;
| if ($j == $m) break;
| $javaType $u = ($javaType) $arr2[$j];
| $j += 1;
| $binarySearchResultType $result = $binarySearch($arr1, $i, $n, $u);
| if ($result.found()) {
| ${ev.value} = true;
| break;
| }
| if ($result.index() == $n) break;
| $i = $result.index();
| } while ($j < $m);
|}
""".stripMargin
ev.copy(
code = code"""
${childGen.code}
$arrayType $arr2 = $valuesRef;
boolean ${ev.value} = false;
$resultCode""",
isNull = FalseLiteral)
}

override def equals(that: Any): Boolean = {
that match {
case SortedArrayContainsAny(thatChild, thatValues) =>
child == thatChild &&
values.asInstanceOf[Array[_]].sameElements(thatValues.asInstanceOf[Array[_]])
case _ => false
}
}

override def hashCode: Int = {
(child, values.asInstanceOf[Array[_]].toSeq).hashCode
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.microsoft.hyperspace.index.dataskipping

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.{input_file_name, max, min}
import org.apache.spark.sql.types.{LongType, StringType}
import org.apache.spark.sql.functions.{array_sort, collect_set, input_file_name, max, min}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.util.sketch.BloomFilter

import com.microsoft.hyperspace.HyperspaceException
Expand Down Expand Up @@ -86,6 +86,19 @@ class DataSkippingIndexConfigTest extends DataSkippingSuite with BloomFilterTest
checkAnswer(indexData, withFileId(expectedSketchValues))
}

test("createIndex works correctly with a ValueListSketch.") {
val sourceData =
createSourceData(spark.range(100).selectExpr("cast(id / 10 as int) as A").toDF)
val indexConfig = DataSkippingIndexConfig("MyIndex", ValueListSketch("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
assert(index.sketches === Seq(ValueListSketch("A", Some(IntegerType))))
val expectedSketchValues = sourceData
.groupBy(input_file_name().as(fileNameCol))
.agg(array_sort(collect_set("A")))
checkAnswer(indexData, withFileId(expectedSketchValues))
assert(indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "ValueList_A__0"))
}

test("createIndex works correctly with a BloomFilterSketch.") {
val sourceData = createSourceData(spark.range(100).toDF("A"))
val indexConfig = DataSkippingIndexConfig("MyIndex", BloomFilterSketch("A", 0.001, 20))
Expand Down
Loading

0 comments on commit 42a661e

Please sign in to comment.