diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 4d9227dc9..6e0badba7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -466,6 +466,11 @@ case class IndexLogEntry( sourceFileInfoSet.foldLeft(0L)(_ + _.size) } + @JsonIgnore + lazy val indexFilesSizeInBytes: Long = { + content.fileInfos.foldLeft(0L)(_ + _.size) + } + def sourceUpdate: Option[Update] = { relations.head.data.properties.update } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala b/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala index 2943a69a6..4c850218c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala @@ -54,7 +54,11 @@ object FilterIndexRanker { } else { // TODO: Add ranking algorithm to sort candidates. // See https://github.com/microsoft/hyperspace/issues/52 - Some(candidates.head) + + // Pick the index with minimum size. If indexes with same size are found, pick the + // one with lexicographically smaller name. This is required for deterministic selection + // of indexes. + Some(candidates.minBy(index => (index.indexFilesSizeInBytes, index.name))) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala index b09f2505e..0b8bac965 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{IntegerType, StringType} -import com.microsoft.hyperspace.index.{FileInfo, IndexConstants} +import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntryTags} import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite import com.microsoft.hyperspace.util.FileUtils @@ -45,16 +45,36 @@ class FilterIndexRankerTest extends HyperspaceRuleSuite { val t2c1 = AttributeReference("t2c1", IntegerType)() val t2c2 = AttributeReference("t2c2", StringType)() - test("rank() should return the head of the list by default.") { - val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonSourceSizeInBytesTag(ind1, tempPlan, Nil) - val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonSourceSizeInBytesTag(ind2, tempPlan, Nil) - val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false) - setCommonSourceSizeInBytesTag(ind3, tempPlan, Nil) + test("rank() should return the index with smallest size by default.") { + // Index with only 1 file of size 20 + val ind1 = createIndexLogEntry( + "ind1", + Seq(t1c1), + Seq(t1c2), + tempPlan, + writeLog = false, + filenames = Seq("f1.parquet", "f2.parquet")) + + // Index with only 2 files of total size 10 + val ind2 = createIndexLogEntry( + "ind2", + Seq(t1c1), + Seq(t1c2), + tempPlan, + writeLog = false, + filenames = Seq("f1.parquet")) + + // Index with only 3 files of total size 30 + val ind3 = createIndexLogEntry( + "ind3", + Seq(t2c1), + Seq(t2c2), + tempPlan, + writeLog = false, + filenames = Seq("f1.parquet", "f2.parquet", "f3.parquet")) val indexes = Seq(ind1, ind2, ind3) - assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind1)) + assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind2)) } test( diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala index 95c777a18..fc2cc1c87 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala @@ -16,6 +16,7 @@ package com.microsoft.hyperspace.index.rules +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -27,9 +28,10 @@ import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.Hdfs.Properties +import com.microsoft.hyperspace.util.PathUtils trait HyperspaceRuleSuite extends HyperspaceSuite { - private val filenames = Seq("f1.parquet", "f2.parquet") + private val defaultFileNames = Seq("f1.parquet", "f2.parquet") def createIndexLogEntry( name: String, indexCols: Seq[AttributeReference], @@ -37,7 +39,8 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { plan: LogicalPlan, numBuckets: Int = 10, inputFiles: Seq[FileInfo] = Seq(), - writeLog: Boolean = true): IndexLogEntry = { + writeLog: Boolean = true, + filenames: Seq[String] = defaultFileNames): IndexLogEntry = { val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName LogicalPlanSignatureProvider.create(signClass).signature(plan) match { @@ -54,7 +57,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { null, LogicalPlanFingerprint(LogicalPlanFingerprint.Properties(Seq(Signature(signClass, s))))) - val indexFiles = getIndexDataFilesPaths(name).map { path => + val indexFiles = getIndexDataFilesPaths(name, filenames).map { path => new FileStatus(10, false, 1, 10, 10, path) } @@ -82,11 +85,13 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { } } - def getIndexDataFilesPaths(indexName: String): Seq[Path] = + def getIndexDataFilesPaths( + indexName: String, + filenames: Seq[String] = defaultFileNames): Seq[Path] = filenames.map { f => new Path( new Path( - new Path(systemPath, indexName), + getIndexRootPath(indexName), s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0"), f) } @@ -99,7 +104,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { spark) def getIndexRootPath(indexName: String): Path = - new Path(systemPath, indexName) + PathUtils.makeAbsolute(new Path(systemPath, indexName), new Configuration) def setCommonSourceSizeInBytesTag( index: IndexLogEntry,