Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Deterministic index selection for filter rule with no hybrid scan #428

Merged
merged 4 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ case class IndexLogEntry(
sourceFileInfoSet.foldLeft(0L)(_ + _.size)
}

@JsonIgnore
lazy val indexFilesSizeInBytes: Long = {
content.fileInfos.foldLeft(0L)(_ + _.size)
}

def sourceUpdate: Option[Update] = {
relations.head.data.properties.update
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ object FilterIndexRanker {
} else {
// TODO: Add ranking algorithm to sort candidates.
// See https://github.com/microsoft/hyperspace/issues/52
Some(candidates.head)

// Pick the index with minimum size. If indexes with same size are found, pick the
// one with lexicographically smaller name. This is required for deterministic selection
// of indexes.
Some(candidates.minBy(index => (index.indexFilesSizeInBytes, index.name)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package com.microsoft.hyperspace.index

import java.io.File

import org.apache.hadoop.conf.Configuration
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite
import org.apache.spark.util.hyperspace.Utils

import com.microsoft.hyperspace.{Hyperspace, SparkInvolvedSuite}
import com.microsoft.hyperspace.util.FileUtils
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved

trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
// This is the system path that PathResolver uses to get the root of the indexes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{IntegerType, StringType}

import com.microsoft.hyperspace.index.{FileInfo, IndexConstants}
import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntryTags}
import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite
import com.microsoft.hyperspace.util.FileUtils

Expand All @@ -45,16 +45,36 @@ class FilterIndexRankerTest extends HyperspaceRuleSuite {
val t2c1 = AttributeReference("t2c1", IntegerType)()
val t2c2 = AttributeReference("t2c2", StringType)()

test("rank() should return the head of the list by default.") {
val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false)
setCommonSourceSizeInBytesTag(ind1, tempPlan, Nil)
val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false)
setCommonSourceSizeInBytesTag(ind2, tempPlan, Nil)
val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false)
setCommonSourceSizeInBytesTag(ind3, tempPlan, Nil)
test("rank() should return the index with smallest size by default.") {
// Index with only 1 file of size 10
val ind1 = createIndexLogEntry(
"ind1",
Seq(t1c1),
Seq(t1c2),
tempPlan,
writeLog = false,
filenames = Seq("f1.parquet", "f2.parquet"))

// Index with only 2 files of total size 20
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
val ind2 = createIndexLogEntry(
"ind2",
Seq(t1c1),
Seq(t1c2),
tempPlan,
writeLog = false,
filenames = Seq("f1.parquet"))

// Index with only 3 files of total size 30
val ind3 = createIndexLogEntry(
"ind3",
Seq(t2c1),
Seq(t2c2),
tempPlan,
writeLog = false,
filenames = Seq("f1.parquet", "f2.parquet", "f3.parquet"))

val indexes = Seq(ind1, ind2, ind3)
assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind1))
assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind2))
}

test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.microsoft.hyperspace.index.rules

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -27,17 +28,18 @@ import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.Hdfs.Properties
import com.microsoft.hyperspace.util.PathUtils

trait HyperspaceRuleSuite extends HyperspaceSuite {
private val filenames = Seq("f1.parquet", "f2.parquet")
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
def createIndexLogEntry(
name: String,
indexCols: Seq[AttributeReference],
includedCols: Seq[AttributeReference],
plan: LogicalPlan,
numBuckets: Int = 10,
inputFiles: Seq[FileInfo] = Seq(),
writeLog: Boolean = true): IndexLogEntry = {
writeLog: Boolean = true,
filenames: Seq[String] = Seq("f1.parquet", "f2.parquet")): IndexLogEntry = {
val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName

LogicalPlanSignatureProvider.create(signClass).signature(plan) match {
Expand All @@ -54,7 +56,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite {
null,
LogicalPlanFingerprint(LogicalPlanFingerprint.Properties(Seq(Signature(signClass, s)))))

val indexFiles = getIndexDataFilesPaths(name).map { path =>
val indexFiles = getIndexDataFilesPaths(name, filenames).map { path =>
new FileStatus(10, false, 1, 10, 10, path)
}

Expand Down Expand Up @@ -82,11 +84,13 @@ trait HyperspaceRuleSuite extends HyperspaceSuite {
}
}

def getIndexDataFilesPaths(indexName: String): Seq[Path] =
def getIndexDataFilesPaths(
indexName: String,
filenames: Seq[String] = Seq("f1.parquet", "f2.parquet")): Seq[Path] =
filenames.map { f =>
new Path(
new Path(
new Path(systemPath, indexName),
getIndexRootPath(indexName),
s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0"),
f)
}
Expand All @@ -99,7 +103,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite {
spark)

def getIndexRootPath(indexName: String): Path =
new Path(systemPath, indexName)
PathUtils.makeAbsolute(new Path(systemPath, indexName), new Configuration)
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved

def setCommonSourceSizeInBytesTag(
index: IndexLogEntry,
Expand Down