Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Filter index file paths if bucket pruning is applicable
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Feb 4, 2021
1 parent 9ddf44b commit 63dfc53
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE}
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
import com.microsoft.hyperspace.util.HyperspaceConf
import com.microsoft.hyperspace.util.{HyperspaceConf, LogicalPlanUtils}

object RuleUtils {

Expand Down Expand Up @@ -273,8 +273,9 @@ object RuleUtils {
// Project(A,B) -> Filter(C = 10) -> Index Scan (A,B,C)
plan transformDown {
case baseRelation @ LogicalRelation(_: HadoopFsRelation, baseOutput, _, _) =>
val indexFiles = LogicalPlanUtils.indexFilesSelectedBuecktdOnly(plan, index)
val location = index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY) {
new InMemoryFileIndex(spark, index.content.files, Map(), None)
new InMemoryFileIndex(spark, indexFiles, Map(), None)
}

val relation = new IndexHadoopFsRelation(
Expand Down Expand Up @@ -353,6 +354,8 @@ object RuleUtils {
}
}

val indexFiles = LogicalPlanUtils.indexFilesSelectedBuecktdOnly(plan, index)

val filesToRead = {
if (useBucketSpec || !index.hasParquetAsSourceFormat || filesDeleted.nonEmpty ||
location.partitionSchema.nonEmpty) {
Expand All @@ -367,12 +370,12 @@ object RuleUtils {
// If the source relation is partitioned, we cannot read the appended files with the
// index data as the schema of partitioned files are not equivalent to the index data.
unhandledAppendedFiles = filesAppended
index.content.files
indexFiles
} else {
// If BucketSpec of index data isn't used (e.g., in the case of FilterIndex currently)
// and the source format is parquet, we could read the appended files along
// with the index data.
index.content.files ++ filesAppended
indexFiles ++ filesAppended
}
}

Expand All @@ -386,7 +389,7 @@ object RuleUtils {

def fileIndex: InMemoryFileIndex =
new InMemoryFileIndex(spark, filesToRead, Map(), None)
val newLocation = if (filesToRead.length == index.content.files.size) {
val newLocation = if (filesToRead.length == indexFiles.size) {
index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY)(fileIndex)
} else {
index.withCachedTag(plan, IndexLogEntryTags.INMEMORYFILEINDEX_HYBRID_SCAN)(fileIndex)
Expand Down
139 changes: 138 additions & 1 deletion src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@

package com.microsoft.hyperspace.util

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.spark.util.collection.BitSet

import com.microsoft.hyperspace.index.IndexLogEntry

/**
* Utility functions for logical plan.
Expand All @@ -35,4 +44,132 @@ object LogicalPlanUtils {
case _ => false
}
}

/**
* BucketSelector returns the selected buckets if bucket pruning is applicable for the given
* query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark.
*/
object BucketSelector {
// should prune buckets iff num buckets is greater than 1 and there is only one bucket column
private def shouldPruneBuckets(spec: BucketSpec): Boolean = {
spec.bucketColumnNames.length == 1 && spec.numBuckets > 1
}

private def getExpressionBuckets(
expr: Expression,
bucketColumnName: String,
numBuckets: Int): BitSet = {

def getBucketNumber(attr: Attribute, v: Any): Int = {
BucketingUtils.getBucketIdFromValue(attr, numBuckets, v)
}

def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
iter
.map(v => getBucketNumber(attr, v))
.foreach(bucketNum => matchedBuckets.set(bucketNum))
matchedBuckets
}

def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = {
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.set(getBucketNumber(attr, v))
matchedBuckets
}

expr match {
case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName =>
getBucketSetFromValue(a, v)
case expressions.In(a: Attribute, list)
if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName =>
getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow)))
case expressions.InSet(a: Attribute, hset) if a.name == bucketColumnName =>
getBucketSetFromIterable(a, hset)
case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
getBucketSetFromValue(a, null)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == FloatType =>
getBucketSetFromValue(a, Float.NaN)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == DoubleType =>
getBucketSetFromValue(a, Double.NaN)
case expressions.And(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) &
getExpressionBuckets(right, bucketColumnName, numBuckets)
case expressions.Or(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) |
getExpressionBuckets(right, bucketColumnName, numBuckets)
case _ =>
val matchedBuckets = new BitSet(numBuckets)
matchedBuckets.setUntil(numBuckets)
matchedBuckets
}
}

private def genBucketSet(
normalizedFilters: Seq[Expression],
bucketSpec: BucketSpec): Option[BitSet] = {
if (normalizedFilters.isEmpty) {
return None
}

val bucketColumnName = bucketSpec.bucketColumnNames.head
val numBuckets = bucketSpec.numBuckets

val normalizedFiltersAndExpr = normalizedFilters
.reduce(expressions.And)
val matchedBuckets =
getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets)

val numBucketsSelected = matchedBuckets.cardinality()

// None means all the buckets need to be scanned
if (numBucketsSelected == numBuckets) {
None
} else {
Some(matchedBuckets)
}
}

def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match {
case PhysicalOperation(
projects,
filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) =>
// The attribute name of predicate could be different than the one in schema in case of
// case insensitive, we should change them to match the one in schema, so we do not need to
// worry about case sensitivity anymore.
val normalizedFilters = filters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(l.output.find(_.semanticEquals(a)).get.name)
}
}
// subquery expressions are filtered out because they can't be used to prune buckets or
// pushed down as data filters, yet they would be executed
val normalizedFiltersWithoutSubqueries =
normalizedFilters.filterNot(SubqueryExpression.hasSubquery)

val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec)
} else {
None
}
bucketSet
case _ => None
}
}

private[hyperspace] def indexFilesSelectedBuecktdOnly(
plan: LogicalPlan,
index: IndexLogEntry): Seq[Path] = {
BucketSelector(plan, index.bucketSpec) match {
case Some(selectedBuckets) =>
index.content.files.filter(f =>
selectedBuckets.get(BucketingUtils.getBucketId(f.getName).get))
case None =>
index.content.files
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec

import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils}
Expand All @@ -30,6 +30,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags._
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule}
import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector
import com.microsoft.hyperspace.util.PathUtils

class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
Expand Down Expand Up @@ -112,7 +113,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {

def query(): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c1")

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
verifyIndexUsage(
query,
getIndexFilesPathWithBucketSelector(
query().queryExecution.optimizedPlan,
indexConfig.indexName))
}
}
}
Expand All @@ -128,7 +133,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
def query(): DataFrame = df.filter("C3 == 'facebook'").select("C3", "c1")

// Verify if case-insensitive index works with case-insensitive query.
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
verifyIndexUsage(
query,
getIndexFilesPathWithBucketSelector(
query().queryExecution.optimizedPlan,
indexConfig.indexName))
}

test("E2E test for case sensitive filter query where changing conf changes behavior.") {
Expand All @@ -145,7 +154,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}

withSQLConf("spark.sql.caseSensitive" -> "false") {
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
verifyIndexUsage(
query,
getIndexFilesPathWithBucketSelector(
query().queryExecution.optimizedPlan,
indexConfig.indexName))
}
}

Expand All @@ -165,9 +178,12 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1")

// Verify no Project node is present in the query plan, as a result of using SELECT *
assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty)
val queryPlan = query().queryExecution.optimizedPlan
assert(queryPlan.collect { case p: Project => p }.isEmpty)

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
verifyIndexUsage(
query,
getIndexFilesPathWithBucketSelector(queryPlan, indexConfig.indexName))
}
}
}
Expand Down Expand Up @@ -388,10 +404,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {

spark.enableHyperspace()
val dfWithHyperspaceEnabled = query(df)
val planWithHyperspaceEnabled = dfWithHyperspaceEnabled.queryExecution.optimizedPlan

verifyQueryPlanHasExpectedRootPaths(
dfWithHyperspaceEnabled.queryExecution.optimizedPlan,
getIndexFilesPath(indexConfig.indexName))
planWithHyperspaceEnabled,
getIndexFilesPathWithBucketSelector(planWithHyperspaceEnabled, indexConfig.indexName))

assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema))
assert(sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfWithHyperspaceEnabled)))
Expand Down Expand Up @@ -503,7 +520,11 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
def query(): DataFrame =
spark.read.parquet(testPath).filter("c3 == 'facebook'").select("c3", "c1")

verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName))
verifyIndexUsage(
query,
getIndexFilesPathWithBucketSelector(
query().queryExecution.optimizedPlan,
indexConfig.indexName))

// Delete some source data file.
TestUtils.deleteFiles(testPath, "*parquet", 1)
Expand All @@ -518,7 +539,12 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)

// Verify index usage on latest version of index (v=1) after refresh.
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(1)))
verifyIndexUsage(
query,
getIndexFilesPathWithBucketSelector(
query().queryExecution.optimizedPlan,
indexConfig.indexName,
Seq(1)))
}
}
}
Expand Down Expand Up @@ -951,6 +977,47 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}
}

test("Verify excluding index data file path using bucket pruning.") {
withTempPathAsString { testPath =>
// Setup. Create data.
val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4"))
import spark.implicits._
SampleData.testData
.toDF("c1", "c2", "c3", "c4", "c5")
.limit(10)
.write
.json(testPath)
val df = spark.read.json(testPath)

// Create index.
hyperspace.createIndex(df, indexConfig)
spark.enableHyperspace()

def query(): DataFrame =
df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")

withIndex("index") {
val index = TestUtils.latestIndexLogEntry(systemPath, indexConfig.indexName)
val plan = query().queryExecution.optimizedPlan
val buckets = BucketSelector(plan, index.bucketSpec)
assert(buckets.isDefined)

val locs = getFsLocation(plan)
assert(locs.size == 1)
assert(buckets.get.cardinality() == 3)

val indexFiles = locs.head.inputFiles
assert(indexFiles.length == buckets.get.cardinality())
assert(indexFiles.length < index.content.files.length)

val indexFilesBitIdSet = indexFiles.map(BucketingUtils.getBucketId(_).get).toSet
indexFilesBitIdSet.forall(buckets.get.get(_))
(1 to index.bucketSpec.numBuckets).forall(n =>
!(buckets.get.get(n) ^ indexFilesBitIdSet.contains(n)))
}
}
}

/**
* Verify that the query plan has the expected rootPaths.
*
Expand Down Expand Up @@ -980,6 +1047,19 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
}.flatten
}

private def getIndexFilesPathWithBucketSelector(
plan: LogicalPlan,
indexName: String,
versions: Seq[Int] = Seq(0)): Seq[Path] = {
val paths = getIndexFilesPath(indexName, versions)
BucketSelector(plan, TestUtils.latestIndexLogEntry(systemPath, indexName).bucketSpec) match {
case Some(buckets) =>
paths.filter(f => buckets.get(BucketingUtils.getBucketId(f.getName).get))
case None =>
paths
}
}

private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = {
versions.flatMap { v =>
Content
Expand Down

0 comments on commit 63dfc53

Please sign in to comment.