From 87dce8c626b60dad32d38a9eaba3bb216b604c4c Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Wed, 24 Mar 2021 15:10:39 -0700 Subject: [PATCH 1/3] PEFilterIndexRule initial commit --- .../index/rules/PEFilterIndexRule.scala | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala new file mode 100644 index 000000000..003c86679 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala @@ -0,0 +1,137 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.types.StructType + +import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.{IndexConstants, IndexLogEntry} +import com.microsoft.hyperspace.index.rankers.FilterIndexRanker +import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging +import com.microsoft.hyperspace.util.ResolverUtils + +/** + * FilterIndex rule looks for opportunities in a logical plan to replace + * a relation with an available covering index according to columns in + * filter predicate. + */ +object PEFilterIndexRule + extends Rule[LogicalPlan] + with Logging + with HyperspaceEventLogging + with ActiveSparkSession { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan transformDown { + case ExtractFilterNode(originalPlan, filter, _, filterColumns) => + try { + val candidateIndexes = findCoveringIndexes(filter, filterColumns) + FilterIndexRanker.rank(spark, filter, candidateIndexes) match { + case Some(index) => + val transformedPlan = + transformPlan( + spark, + filter, + index, + originalPlan) + transformedPlan + case None => originalPlan + } + } catch { + case e: Exception => + logWarning("Non fatal exception in running filter index rule: " + e.getMessage) + originalPlan + } + } + } + + def transformPlan( + spark: SparkSession, + filter: Filter, + index: IndexLogEntry, + originalPlan: LogicalPlan): LogicalPlan = { + val filteredDf = + spark.read + .parquet(index.content.files.map(_.toString): _*) + .where(filter.condition.sql) + .select(IndexConstants.DATA_FILE_NAME_ID) + val fileIds = filteredDf.rdd.map(r => r(0)).collect.toSet + val fileList = + index.fileIdTracker.getFileToIdMap + .filterKeys(k => fileIds.contains(index.fileIdTracker.getFileToIdMap(k))) + .keys + .map(f => new Path(f._1)) + .toSeq + + // Make InMemoryFileIndex from file list. + val provider = Hyperspace.getContext(spark).sourceProviderManager + val returnVal = originalPlan transformDown { + case l @ LogicalRelation( + fs @ HadoopFsRelation(oldFileIndex: InMemoryFileIndex, _, _, _, _, _), + _, + _, + _) => + val relation = provider.getRelation(l) + val options = relation.partitionBasePath + .map { basePath => + // Set "basePath" so that partitioned columns are also included in the output schema. + Map("basePath" -> basePath) + } + .getOrElse(Map()) + val schema = StructType((l.schema ++ oldFileIndex.partitionSchema).distinct) + val newFileIndex = new InMemoryFileIndex(spark, fileList, options, Some(schema)) + + l.copy(fs.copy(location = newFileIndex)(spark)) + } + + returnVal + } + + private def findCoveringIndexes( + filter: Filter, + filterColumns: Seq[String]): Seq[IndexLogEntry] = { + RuleUtils.getRelation(spark, filter) match { + case Some(r) => + val indexManager = Hyperspace + .getContext(spark) + .indexCollectionManager + + val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + + val candidateIndexes = allIndexes.filter { index => + indexCoversPlan(filterColumns, index.indexedColumns) + } + + RuleUtils.getCandidateIndexes(spark, candidateIndexes, r) + + case None => Nil + } + } + + private def indexCoversPlan( + filterColumns: Seq[String], + indexedColumns: Seq[String]): Boolean = { + ResolverUtils.resolve(spark, indexedColumns.head, filterColumns).isDefined + } +} From ecc455bd846693ca21eeff7723a7d7c59b8ea774 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 25 Mar 2021 11:13:34 -0700 Subject: [PATCH 2/3] add filtering of conditions to choose only compatible conditions with the index --- .../index/rules/PEFilterIndexRule.scala | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala index 003c86679..5d47ce22d 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BinaryComparison, EqualTo, Expression, UnaryExpression} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} @@ -48,14 +49,7 @@ object PEFilterIndexRule try { val candidateIndexes = findCoveringIndexes(filter, filterColumns) FilterIndexRanker.rank(spark, filter, candidateIndexes) match { - case Some(index) => - val transformedPlan = - transformPlan( - spark, - filter, - index, - originalPlan) - transformedPlan + case Some(index) => transformPlan(filter, index, originalPlan) case None => originalPlan } } catch { @@ -67,22 +61,11 @@ object PEFilterIndexRule } def transformPlan( - spark: SparkSession, filter: Filter, index: IndexLogEntry, originalPlan: LogicalPlan): LogicalPlan = { - val filteredDf = - spark.read - .parquet(index.content.files.map(_.toString): _*) - .where(filter.condition.sql) - .select(IndexConstants.DATA_FILE_NAME_ID) - val fileIds = filteredDf.rdd.map(r => r(0)).collect.toSet - val fileList = - index.fileIdTracker.getFileToIdMap - .filterKeys(k => fileIds.contains(index.fileIdTracker.getFileToIdMap(k))) - .keys - .map(f => new Path(f._1)) - .toSeq + // Filter useable files from original data source. + val fileList = getFilteredFiles(filter, index) // Make InMemoryFileIndex from file list. val provider = Hyperspace.getContext(spark).sourceProviderManager @@ -101,13 +84,54 @@ object PEFilterIndexRule .getOrElse(Map()) val schema = StructType((l.schema ++ oldFileIndex.partitionSchema).distinct) val newFileIndex = new InMemoryFileIndex(spark, fileList, options, Some(schema)) + val fsOptions = fs.options ++ Map(IndexConstants.INDEX_RELATION_IDENTIFIER) - l.copy(fs.copy(location = newFileIndex)(spark)) + l.copy(fs.copy(location = newFileIndex, options = fsOptions)(spark)) } returnVal } + def getFilteredFiles(filter: Filter, index: IndexLogEntry): Seq[Path] = { + val condition = getIndexCompatibleCondition(filter.condition, index) + + val filteredDf = + spark.read + .parquet(index.content.files.map(_.toString): _*) + .where(condition.sql) + .select(IndexConstants.DATA_FILE_NAME_ID) + val fileIds = filteredDf.rdd.map(r => r(0)).collect.toSet + + index.fileIdTracker.getFileToIdMap + .filterKeys(k => fileIds.contains(index.fileIdTracker.getFileToIdMap(k))) + .keys + .map(f => new Path(f._1)) + .toSeq + } + + private def extractConditions(condition: Expression): Seq[Expression] = condition match { + case e: BinaryComparison => Seq(e) + case e: UnaryExpression => Seq(e) + case And(left, right) => extractConditions(left) ++ extractConditions(right) + case _ => throw new IllegalStateException("Unsupported condition found") + } + + /** + * Choose only those conditions which work with the index. For filter queries, these conditions + * should reference to only the head column of indexed columns. + * + * @param condition Complete filter condition expression. + * @param index IndexLogEntry for which we are extracting the compatible sub-conditions. + * @return Subset of sub-conditions which work with this index. + */ + private def getIndexCompatibleCondition( + condition: Expression, + index: IndexLogEntry): Expression = { + extractConditions(condition) + .filter(_.references.forall(_.name.equals(index.indexedColumns.head))) + .reduce(And) + } + private def findCoveringIndexes( filter: Filter, filterColumns: Seq[String]): Seq[IndexLogEntry] = { From 2923e77408055566b590629c468110a4671aeab0 Mon Sep 17 00:00:00 2001 From: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com> Date: Fri, 26 Mar 2021 17:39:42 -0700 Subject: [PATCH 3/3] add distinct on file ids Co-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com> --- .../com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala index 5d47ce22d..5186a5f39 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/PEFilterIndexRule.scala @@ -100,6 +100,7 @@ object PEFilterIndexRule .parquet(index.content.files.map(_.toString): _*) .where(condition.sql) .select(IndexConstants.DATA_FILE_NAME_ID) + .distinct val fileIds = filteredDf.rdd.map(r => r(0)).collect.toSet index.fileIdTracker.getFileToIdMap