From e7a0161748f3f0e5d86d84dea0ea3ee126dec6b2 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Wed, 21 Apr 2021 15:50:20 -0700 Subject: [PATCH 1/3] Deterministic index selection for filter rule with no hybrid scan --- .../hyperspace/index/IndexLogEntryTags.scala | 3 +++ .../index/rankers/FilterIndexRanker.scala | 7 ++++++- .../hyperspace/index/rules/RuleUtils.scala | 5 +++++ .../index/rankers/FilterIndexRankerTest.scala | 17 +++++++++++------ 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index 4decd6b83..2038f8481 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -27,6 +27,9 @@ object IndexLogEntryTags { val COMMON_SOURCE_SIZE_IN_BYTES: IndexLogEntryTag[Long] = IndexLogEntryTag[Long]("commonSourceSizeInBytes") + // INDEX_SIZE_IN_BYTES stores size of index source files. + val INDEX_SIZE_IN_BYTES: IndexLogEntryTag[Long] = IndexLogEntryTag[Long]("indexSizeInBytes") + // SIGNATURE_MATCHED indicates if the plan has the same signature value with the index or not. val SIGNATURE_MATCHED: IndexLogEntryTag[Boolean] = IndexLogEntryTag[Boolean]("signatureMatched") diff --git a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala b/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala index 2943a69a6..0ec22571a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala @@ -54,7 +54,12 @@ object FilterIndexRanker { } else { // TODO: Add ranking algorithm to sort candidates. // See https://github.com/microsoft/hyperspace/issues/52 - Some(candidates.head) + + // Pick the index with minimum size. If indexes with same size are found, pick the + // one with lexicographically smaller name. This is required for deterministic selection + // of indexes. + Some(candidates.minBy(index => + (index.getTagValue(plan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES), index.name))) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 98f5625f3..b3b6c022d 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -113,6 +113,11 @@ object RuleUtils { IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes) + entry.setTagValue( + relation.plan, + IndexLogEntryTags.INDEX_SIZE_IN_BYTES, + entry.content.fileInfos.foldLeft(0L)(_ + _.size)) + // If there is no change in source dataset, the index will be applied by // transformPlanToUseIndexOnlyScan. entry.setTagValue( diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala index b09f2505e..1c296b5fa 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{IntegerType, StringType} -import com.microsoft.hyperspace.index.{FileInfo, IndexConstants} +import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntryTags} import com.microsoft.hyperspace.index.rules.HyperspaceRuleSuite import com.microsoft.hyperspace.util.FileUtils @@ -45,16 +45,21 @@ class FilterIndexRankerTest extends HyperspaceRuleSuite { val t2c1 = AttributeReference("t2c1", IntegerType)() val t2c2 = AttributeReference("t2c2", StringType)() - test("rank() should return the head of the list by default.") { + test("rank() should return the index with smallest size by default.") { + // Index with only 1 file of size 10 val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonSourceSizeInBytesTag(ind1, tempPlan, Nil) + ind1.setTagValue(tempPlan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES, 20L) + + // Index with only 2 files of total size 20 val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonSourceSizeInBytesTag(ind2, tempPlan, Nil) + ind2.setTagValue(tempPlan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES, 10L) + + // Index with only 3 files of total size 30 val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false) - setCommonSourceSizeInBytesTag(ind3, tempPlan, Nil) + ind3.setTagValue(tempPlan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES, 30L) val indexes = Seq(ind1, ind2, ind3) - assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind1)) + assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind2)) } test( From 37c3ccb5a4d5aa534e0ac4df332c7dabf2ea9c50 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Wed, 21 Apr 2021 17:53:49 -0700 Subject: [PATCH 2/3] review comments --- .../hyperspace/index/IndexLogEntry.scala | 5 ++++ .../hyperspace/index/IndexLogEntryTags.scala | 3 --- .../index/rankers/FilterIndexRanker.scala | 3 +-- .../hyperspace/index/rules/RuleUtils.scala | 5 ---- .../hyperspace/index/HyperspaceSuite.scala | 3 ++- .../index/rankers/FilterIndexRankerTest.scala | 27 ++++++++++++++----- .../index/rules/HyperspaceRuleSuite.scala | 16 ++++++----- 7 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 4d9227dc9..6e0badba7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -466,6 +466,11 @@ case class IndexLogEntry( sourceFileInfoSet.foldLeft(0L)(_ + _.size) } + @JsonIgnore + lazy val indexFilesSizeInBytes: Long = { + content.fileInfos.foldLeft(0L)(_ + _.size) + } + def sourceUpdate: Option[Update] = { relations.head.data.properties.update } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index 2038f8481..4decd6b83 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -27,9 +27,6 @@ object IndexLogEntryTags { val COMMON_SOURCE_SIZE_IN_BYTES: IndexLogEntryTag[Long] = IndexLogEntryTag[Long]("commonSourceSizeInBytes") - // INDEX_SIZE_IN_BYTES stores size of index source files. - val INDEX_SIZE_IN_BYTES: IndexLogEntryTag[Long] = IndexLogEntryTag[Long]("indexSizeInBytes") - // SIGNATURE_MATCHED indicates if the plan has the same signature value with the index or not. val SIGNATURE_MATCHED: IndexLogEntryTag[Boolean] = IndexLogEntryTag[Boolean]("signatureMatched") diff --git a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala b/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala index 0ec22571a..4c850218c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRanker.scala @@ -58,8 +58,7 @@ object FilterIndexRanker { // Pick the index with minimum size. If indexes with same size are found, pick the // one with lexicographically smaller name. This is required for deterministic selection // of indexes. - Some(candidates.minBy(index => - (index.getTagValue(plan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES), index.name))) + Some(candidates.minBy(index => (index.indexFilesSizeInBytes, index.name))) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index b3b6c022d..98f5625f3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -113,11 +113,6 @@ object RuleUtils { IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes) - entry.setTagValue( - relation.plan, - IndexLogEntryTags.INDEX_SIZE_IN_BYTES, - entry.content.fileInfos.foldLeft(0L)(_ + _.size)) - // If there is no change in source dataset, the index will be applied by // transformPlanToUseIndexOnlyScan. entry.setTagValue( diff --git a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala index 85178f806..f7f9892a2 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala @@ -18,12 +18,13 @@ package com.microsoft.hyperspace.index import java.io.File +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkFunSuite import org.apache.spark.util.hyperspace.Utils import com.microsoft.hyperspace.{Hyperspace, SparkInvolvedSuite} -import com.microsoft.hyperspace.util.FileUtils +import com.microsoft.hyperspace.util.{FileUtils, PathUtils} trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite { // This is the system path that PathResolver uses to get the root of the indexes. diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala index 1c296b5fa..0c5dfd562 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala @@ -47,16 +47,31 @@ class FilterIndexRankerTest extends HyperspaceRuleSuite { test("rank() should return the index with smallest size by default.") { // Index with only 1 file of size 10 - val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - ind1.setTagValue(tempPlan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES, 20L) + val ind1 = createIndexLogEntry( + "ind1", + Seq(t1c1), + Seq(t1c2), + tempPlan, + writeLog = false, + filenames = Seq("f1.parquet", "f2.parquet")) // Index with only 2 files of total size 20 - val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - ind2.setTagValue(tempPlan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES, 10L) + val ind2 = createIndexLogEntry( + "ind2", + Seq(t1c1), + Seq(t1c2), + tempPlan, + writeLog = false, + filenames = Seq("f1.parquet")) // Index with only 3 files of total size 30 - val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false) - ind3.setTagValue(tempPlan, IndexLogEntryTags.INDEX_SIZE_IN_BYTES, 30L) + val ind3 = createIndexLogEntry( + "ind3", + Seq(t2c1), + Seq(t2c2), + tempPlan, + writeLog = false, + filenames = Seq("f1.parquet", "f2.parquet", "f3.parquet")) val indexes = Seq(ind1, ind2, ind3) assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind2)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala index 95c777a18..1d9b0ecfe 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala @@ -16,6 +16,7 @@ package com.microsoft.hyperspace.index.rules +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -27,9 +28,9 @@ import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.Hdfs.Properties +import com.microsoft.hyperspace.util.PathUtils trait HyperspaceRuleSuite extends HyperspaceSuite { - private val filenames = Seq("f1.parquet", "f2.parquet") def createIndexLogEntry( name: String, indexCols: Seq[AttributeReference], @@ -37,7 +38,8 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { plan: LogicalPlan, numBuckets: Int = 10, inputFiles: Seq[FileInfo] = Seq(), - writeLog: Boolean = true): IndexLogEntry = { + writeLog: Boolean = true, + filenames: Seq[String] = Seq("f1.parquet", "f2.parquet")): IndexLogEntry = { val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName LogicalPlanSignatureProvider.create(signClass).signature(plan) match { @@ -54,7 +56,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { null, LogicalPlanFingerprint(LogicalPlanFingerprint.Properties(Seq(Signature(signClass, s))))) - val indexFiles = getIndexDataFilesPaths(name).map { path => + val indexFiles = getIndexDataFilesPaths(name, filenames).map { path => new FileStatus(10, false, 1, 10, 10, path) } @@ -82,11 +84,13 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { } } - def getIndexDataFilesPaths(indexName: String): Seq[Path] = + def getIndexDataFilesPaths( + indexName: String, + filenames: Seq[String] = Seq("f1.parquet", "f2.parquet")): Seq[Path] = filenames.map { f => new Path( new Path( - new Path(systemPath, indexName), + getIndexRootPath(indexName), s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0"), f) } @@ -99,7 +103,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { spark) def getIndexRootPath(indexName: String): Path = - new Path(systemPath, indexName) + PathUtils.makeAbsolute(new Path(systemPath, indexName), new Configuration) def setCommonSourceSizeInBytesTag( index: IndexLogEntry, From c4aa4d635ad44da67929a74a6a4fe132e3d8c84f Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 22 Apr 2021 10:14:00 -0700 Subject: [PATCH 3/3] review comments --- .../com/microsoft/hyperspace/index/HyperspaceSuite.scala | 3 +-- .../hyperspace/index/rankers/FilterIndexRankerTest.scala | 4 ++-- .../hyperspace/index/rules/HyperspaceRuleSuite.scala | 5 +++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala index f7f9892a2..85178f806 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala @@ -18,13 +18,12 @@ package com.microsoft.hyperspace.index import java.io.File -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkFunSuite import org.apache.spark.util.hyperspace.Utils import com.microsoft.hyperspace.{Hyperspace, SparkInvolvedSuite} -import com.microsoft.hyperspace.util.{FileUtils, PathUtils} +import com.microsoft.hyperspace.util.FileUtils trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite { // This is the system path that PathResolver uses to get the root of the indexes. diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala index 0c5dfd562..0b8bac965 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala @@ -46,7 +46,7 @@ class FilterIndexRankerTest extends HyperspaceRuleSuite { val t2c2 = AttributeReference("t2c2", StringType)() test("rank() should return the index with smallest size by default.") { - // Index with only 1 file of size 10 + // Index with only 1 file of size 20 val ind1 = createIndexLogEntry( "ind1", Seq(t1c1), @@ -55,7 +55,7 @@ class FilterIndexRankerTest extends HyperspaceRuleSuite { writeLog = false, filenames = Seq("f1.parquet", "f2.parquet")) - // Index with only 2 files of total size 20 + // Index with only 2 files of total size 10 val ind2 = createIndexLogEntry( "ind2", Seq(t1c1), diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala index 1d9b0ecfe..fc2cc1c87 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala @@ -31,6 +31,7 @@ import com.microsoft.hyperspace.index.Hdfs.Properties import com.microsoft.hyperspace.util.PathUtils trait HyperspaceRuleSuite extends HyperspaceSuite { + private val defaultFileNames = Seq("f1.parquet", "f2.parquet") def createIndexLogEntry( name: String, indexCols: Seq[AttributeReference], @@ -39,7 +40,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { numBuckets: Int = 10, inputFiles: Seq[FileInfo] = Seq(), writeLog: Boolean = true, - filenames: Seq[String] = Seq("f1.parquet", "f2.parquet")): IndexLogEntry = { + filenames: Seq[String] = defaultFileNames): IndexLogEntry = { val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName LogicalPlanSignatureProvider.create(signClass).signature(plan) match { @@ -86,7 +87,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { def getIndexDataFilesPaths( indexName: String, - filenames: Seq[String] = Seq("f1.parquet", "f2.parquet")): Seq[Path] = + filenames: Seq[String] = defaultFileNames): Seq[Path] = filenames.map { f => new Path( new Path(