Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Data Skipping Index Part 5: ValueListSketch
Browse files Browse the repository at this point in the history
  • Loading branch information
Chungmin Lee committed Sep 13, 2021
1 parent 99304dc commit e6a4931
Show file tree
Hide file tree
Showing 10 changed files with 984 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}

/**
* Returns true if the sorted array (left) contains the value (right).
*
* If the value (right) is null, null is returned.
*
* Preconditions (unchecked):
* - The array must not be null.
* - Elements in the array must be in ascending order.
* - The array must not contain null elements.
* - The array must not contain duplicate elements.
*/
private[dataskipping] case class SortedArrayContains(left: Expression, right: Expression)
extends BinaryExpression
with Predicate {

override def prettyName: String = "sorted_array_contains"

override def nullable: Boolean = true

override def eval(input: InternalRow): Any = {
val value = right.eval(input)
if (value != null) {
val arr = left.eval(input).asInstanceOf[ArrayData]
val dt = right.dataType
val n = arr.numElements()
if (n > 0 &&
ordering.lteq(arr.get(0, dt), value) &&
ordering.lteq(value, arr.get(n - 1, dt))) {
val (found, _) = SortedArrayUtils.binarySearch(arr, dt, ordering, 0, n, value)
if (found) return true
}
return false
}
null
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val leftGen = left.genCode(ctx)
val arr = leftGen.value
val rightGen = right.genCode(ctx)
val value = rightGen.value
val dt = right.dataType
val n = ctx.freshName("n")
val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt)
val resultCode =
s"""
|if (!(${rightGen.isNull})) {
| ${leftGen.code}
| ${ev.isNull} = false;
| int $n = $arr.numElements();
| if ($n > 0 &&
| !(${ctx.genGreater(dt, CodeGenerator.getValue(arr, dt, "0"), value)}) &&
| !(${ctx.genGreater(dt, value, CodeGenerator.getValue(arr, dt, s"$n - 1"))})) {
| ${ev.value} = $binarySearch($arr, 0, $n, $value).found();
| }
|}
""".stripMargin
ev.copy(code = code"""
${rightGen.code}
boolean ${ev.isNull} = true;
boolean ${ev.value} = false;
$resultCode""")
}

@transient private lazy val ordering: Ordering[Any] =
TypeUtils.getInterpretedOrdering(right.dataType)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}
import org.apache.spark.sql.types.DataType

/**
* Returns true if the sorted array (child) contains any of the values.
*
* If either array is empty, false is returned.
*
* Preconditions (unchecked):
* - Both arrays must not be null.
* - Elements in the arrays must be in ascending order.
* - The left array should not contain duplicate elements.
* - The arrays must not contain null elements.
*
* If the element type can be represented as a primitive type in Scala,
* then the right array must be an array of the primitive type.
*/
private[dataskipping] case class SortedArrayContainsAny(
child: Expression,
values: Any,
elementType: DataType)
extends UnaryExpression
with Predicate {

override def prettyName: String = "sorted_array_contains_any"

override def nullable: Boolean = false

override def eval(input: InternalRow): Boolean = {
val arr1 = child.eval(input).asInstanceOf[ArrayData]
val arr2 = values.asInstanceOf[Array[_]]
val dt = elementType
val n = arr1.numElements()
val m = arr2.length
if (n > 0 && m > 0 &&
ordering.lteq(arr1.get(0, dt), arr2(m - 1)) &&
ordering.lteq(arr2(0), arr1.get(n - 1, dt))) {
var i = 0
var j = 0
do {
val v = arr1.get(i, dt)
while (j < m && ordering.lt(arr2(j), v)) j += 1
if (j == m) return false
val u = arr2(j)
j += 1
val (found, k) = SortedArrayUtils.binarySearch(arr1, dt, ordering, i, n, u)
if (found) return true
if (k == n) return false
i = k
} while (j < m)
}
false
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = child.genCode(ctx)
val arr1 = childGen.value
val arr2 = ctx.freshName("values")
val dt = elementType
val javaType = CodeGenerator.javaType(dt)
val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]"
val valuesRef = ctx.addReferenceObj("values", values, arrayType)
val n = ctx.freshName("n")
val m = ctx.freshName("m")
val i = ctx.freshName("i")
val j = ctx.freshName("j")
val v = ctx.freshName("v")
val u = ctx.freshName("u")
val result = ctx.freshName("result")
val binarySearchResultType =
SortedArrayUtils.BinarySearchResult.getClass.getCanonicalName.stripSuffix("$")
val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt)
import CodeGenerator.getValue
val resultCode =
s"""
|int $n = $arr1.numElements();
|int $m = $arr2.length;
|if ($n > 0 && $m > 0 &&
| !(${ctx.genGreater(dt, getValue(arr1, dt, "0"), s"(($javaType) $arr2[$m - 1])")}) &&
| !(${ctx.genGreater(dt, s"(($javaType)$arr2[0])", getValue(arr1, dt, s"$n - 1"))})) {
| int $i = 0;
| int $j = 0;
| do {
| $javaType $v = ${getValue(arr1, dt, i)};
| while ($j < $m && ${ctx.genGreater(dt, v, s"(($javaType) $arr2[$j])")}) $j += 1;
| if ($j == $m) break;
| $javaType $u = ($javaType) $arr2[$j];
| $j += 1;
| $binarySearchResultType $result = $binarySearch($arr1, $i, $n, $u);
| if ($result.found()) {
| ${ev.value} = true;
| break;
| }
| if ($result.index() == $n) break;
| $i = $result.index();
| } while ($j < $m);
|}
""".stripMargin
ev.copy(
code = code"""
${childGen.code}
$arrayType $arr2 = $valuesRef;
boolean ${ev.value} = false;
$resultCode""",
isNull = FalseLiteral)
}

@transient private lazy val ordering: Ordering[Any] =
TypeUtils.getInterpretedOrdering(elementType)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.sketches

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types.{ArrayType, DataType}

import com.microsoft.hyperspace.index.dataskipping.expressions._
import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils

/**
* Sketch based on distinct values for a given expression.
*
* This is not really a sketch, as it stores all distinct values for a given
* expression. It can be useful when the number of distinct values is expected to
* be small and each file tends to store only a subset of the values.
*/
case class ValueListSketch(
override val expr: String,
override val dataType: Option[DataType] = None)
extends SingleExprSketch[ValueListSketch](expr, dataType) {
override def name: String = "ValueList"

override def withNewExpression(newExpr: (String, Option[DataType])): ValueListSketch = {
copy(expr = newExpr._1, dataType = newExpr._2)
}

override def aggregateFunctions: Seq[Expression] =
new ArraySort(CollectSet(parsedExpr).toAggregateExpression()) :: Nil

override def convertPredicate(
predicate: Expression,
resolvedExprs: Seq[Expression],
sketchValues: Seq[Expression],
nameMap: Map[ExprId, String],
valueExtractor: ExpressionExtractor): Option[Expression] = {
val valueList = sketchValues.head
val min = ElementAt(valueList, Literal(1))
val max = ElementAt(valueList, Literal(-1))
// TODO: Consider shared sketches
// HasNullSketch as described in MinMaxSketch.convertPredicate
// can be useful for ValueListSketch too, as it can be used to
// to optimize Not(EqualTo) as well as IsNull.
val resolvedExpr = resolvedExprs.head
val dataType = resolvedExpr.dataType
val exprExtractor = NormalizedExprExtractor(resolvedExpr, nameMap)
val ExprIsTrue = IsTrueExtractor(exprExtractor)
val ExprIsFalse = IsFalseExtractor(exprExtractor)
val ExprIsNotNull = IsNotNullExtractor(exprExtractor)
val ExprEqualTo = EqualToExtractor(exprExtractor, valueExtractor)
val ExprEqualNullSafe = EqualNullSafeExtractor(exprExtractor, valueExtractor)
val ExprLessThan = LessThanExtractor(exprExtractor, valueExtractor)
val ExprLessThanOrEqualTo = LessThanOrEqualExtractor(exprExtractor, valueExtractor)
val ExprGreaterThan = GreaterThanExtractor(exprExtractor, valueExtractor)
val ExprGreaterThanOrEqualTo = GreaterThanOrEqualExtractor(exprExtractor, valueExtractor)
val ExprIn = InExtractor(exprExtractor, valueExtractor)
val ExprInSet = InSetExtractor(exprExtractor)
def Empty(arr: Expression) = EqualTo(Size(arr), Literal(0))
Option(predicate).collect {
case ExprIsTrue(_) => ArrayContains(valueList, Literal(true))
case ExprIsFalse(_) => ArrayContains(valueList, Literal(false))
case ExprIsNotNull(_) => Not(Empty(valueList))
case ExprEqualTo(_, v) => SortedArrayContains(valueList, v)
case ExprEqualNullSafe(_, v) => Or(IsNull(v), SortedArrayContains(valueList, v))
case Not(ExprEqualTo(_, v)) =>
And(
IsNotNull(v),
Or(
GreaterThan(Size(valueList), Literal(1)),
Not(EqualTo(ElementAt(valueList, Literal(1)), v))))
case ExprLessThan(_, v) => LessThan(min, v)
case ExprLessThanOrEqualTo(_, v) => LessThanOrEqual(min, v)
case ExprGreaterThan(_, v) => GreaterThan(max, v)
case ExprGreaterThanOrEqualTo(_, v) => GreaterThanOrEqual(max, v)
case ExprIn(_, vs) =>
vs.map(v => SortedArrayContains(valueList, v)).reduceLeft(Or)
case ExprInSet(_, vs) =>
SortedArrayContainsAny(
valueList,
ArrayUtils.toArray(
vs.filter(_ != null).toArray.sorted(TypeUtils.getInterpretedOrdering(dataType)),
dataType),
dataType)
// TODO: StartsWith, Like with constant prefix
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.microsoft.hyperspace.index.dataskipping

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.{input_file_name, max, min}
import org.apache.spark.sql.functions.{array_sort, collect_set, input_file_name, max, min}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.util.sketch.BloomFilter

Expand Down Expand Up @@ -86,6 +86,19 @@ class DataSkippingIndexConfigTest extends DataSkippingSuite with BloomFilterTest
checkAnswer(indexData, withFileId(expectedSketchValues))
}

test("createIndex works correctly with a ValueListSketch.") {
val sourceData =
createSourceData(spark.range(100).selectExpr("cast(id / 10 as int) as A").toDF)
val indexConfig = DataSkippingIndexConfig("MyIndex", ValueListSketch("A"))
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
assert(index.sketches === Seq(ValueListSketch("A", Some(IntegerType))))
val expectedSketchValues = sourceData
.groupBy(input_file_name().as(fileNameCol))
.agg(array_sort(collect_set("A")))
checkAnswer(indexData, withFileId(expectedSketchValues))
assert(indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "ValueList_A__0"))
}

test("createIndex works correctly with a BloomFilterSketch.") {
val sourceData = createSourceData(spark.range(100).toDF("A"))
val indexConfig = DataSkippingIndexConfig("MyIndex", BloomFilterSketch("A", 0.001, 20))
Expand Down
Loading

0 comments on commit e6a4931

Please sign in to comment.