diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 68b6fd98a..dcc930432 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -47,6 +47,12 @@ object IndexConstants { "spark.hyperspace.index.hybridscan.maxAppendedRatio" val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3" + // If this config is enabled, Hybrid Scan won't be applied when it causes unnecessary shuffle + // to merge appended data. + val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED = + "spark.hyperspace.index.hybridscan.shuffleCheck.enabled" + val INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT = "true" + // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. // In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala index 5d689e5bd..6cf7109ea 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala @@ -24,13 +24,17 @@ import org.apache.spark.sql.catalyst.analysis.CleanupAliases import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, AttributeSet, EqualTo, Expression} import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.SortMergeJoinExec import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.rankers.JoinIndexRanker import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} +import com.microsoft.hyperspace.util.HyperspaceConf import com.microsoft.hyperspace.util.ResolverUtils._ /** @@ -68,18 +72,54 @@ object JoinIndexRule right = RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true)) + def getShuffleCnt(sparkPlan: SparkPlan): Long = { + sparkPlan.collect { case _: ShuffleExchangeExec => true }.length + } + + val resultPlan = + if (!HyperspaceConf.hybridScanEnabled(spark) || !HyperspaceConf + .hybridScanShuffleCheckEnabled(spark)) { + updatedPlan + } else { + val shuffleCntPair = spark.sessionState + .executePlan(updatedPlan) + .executedPlan + .collect { + case smj: SortMergeJoinExec => + (getShuffleCnt(smj.left), getShuffleCnt(smj.right)) + } + assert(shuffleCntPair.length <= 1) + shuffleCntPair.headOption.flatMap { + // If the number of shuffle is 2, the candidate index pair cannot remove + // the shuffles in both left and right for join and also Hybrid Scan causes + // an additional shuffle for merging appended files. We don't apply the index + // for the child with 2 shuffle nodes using JoinIndexRule. + // However, the child node is still applicable for FilterIndexRule. + case (leftCnt, _) if leftCnt == 2 => Some(updatedPlan.copy(left = l)) + case (_, rightCnt) if rightCnt == 2 => Some(updatedPlan.copy(right = r)) + case _ => None + }.getOrElse { + updatedPlan + } + } + + val appliedIndexes = resultPlan match { + case j: Join if j.left.equals(l) => Seq(rIndex) + case j: Join if j.right.equals(r) => Seq(lIndex) + case _ => Seq(lIndex, rIndex) + } + logEvent( HyperspaceIndexUsageEvent( AppInfo( sparkContext.sparkUser, sparkContext.applicationId, sparkContext.appName), - Seq(lIndex, rIndex), + appliedIndexes, join.toString, - updatedPlan.toString, + resultPlan.toString, "Join index rule applied.")) - - updatedPlan + resultPlan } .getOrElse(join) } catch { @@ -325,11 +365,7 @@ object JoinIndexRule compatibleIndexPairs.map( indexPairs => JoinIndexRanker - .rank( - spark, - leftRel, - rightRel, - indexPairs) + .rank(spark, leftRel, rightRel, indexPairs) .head) } diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 7fa6778bb..709aaa792 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -36,6 +36,14 @@ object HyperspaceConf { hybridScanDeletedRatioThreshold(spark) > 0.0 } + def hybridScanShuffleCheckEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED, + IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED_DEFAULT) + .toBoolean + } + def optimizeFileSizeThreshold(spark: SparkSession): Long = { spark.conf .get( diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index 313e3cc50..a8f891402 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -20,9 +20,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.SortExec +import org.apache.spark.sql.execution.{SortExec, SparkPlan => SparkPlanNode} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.SortMergeJoinExec import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils} import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK} @@ -670,13 +671,95 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { // Refreshed index as quick mode can be applied with Hybrid Scan config. withSQLConf(TestConfig.HybridScanEnabled: _*) { - spark.disableHyperspace() - val dfWithHyperspaceDisabled = query() - val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan - spark.enableHyperspace() - val dfWithHyperspaceEnabled = query() - assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan)) - checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) + spark.disableHyperspace() + val dfWithHyperspaceDisabled = query() + val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan + spark.enableHyperspace() + val dfWithHyperspaceEnabled = query() + assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan)) + checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) + } + } + } + } + + test("Verify Hybrid Scan is not applied when shuffle is not removed.") { + withTempPathAsString { testPath => + val indexConfig = IndexConfig("indexRight", Seq("c2"), Seq("c4")) + val indexConfig2 = IndexConfig("indexLeft", Seq("c2"), Seq("c3")) + import spark.implicits._ + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(10) + .write + .parquet(testPath) + val df = spark.read.load(testPath) + + withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "11") { + // Create an index with bucket num 11. + hyperspace.createIndex(df, indexConfig) + } + withSQLConf(IndexConstants.INDEX_NUM_BUCKETS -> "12") { + // Create an index with bucket num 12. + hyperspace.createIndex(df, indexConfig2) + } + + // Append to original data. + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(3) + .write + .mode("append") + .parquet(testPath) + + { + // Create a join query. + val df2 = spark.read.parquet(testPath) + + def query(): DataFrame = { + df2.select("c2", "c3").join(df2.select("c2", "c4"), "c2") + } + + val inputFiles = df.inputFiles + val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_)) + + spark.enableHyperspace() + withSQLConf(TestConfig.HybridScanEnabled: _*) { + def getShuffleCnt(sparkPlan: SparkPlanNode): Long = { + sparkPlan.collect { case _: ShuffleExchangeExec => true }.length + } + withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "false") { + val execPlan = query().queryExecution.executedPlan + val shuffleCnt = execPlan.collect { + case smj: SortMergeJoinExec => + (getShuffleCnt(smj.left), getShuffleCnt(smj.right)) + }.head + assert(shuffleCnt._1 === 1) + // Right child of join has 2 shuffle nodes because of Hybrid Scan for appended files. + assert(shuffleCnt._2 === 2) + + // Verify indexes are used, and all index files are picked. + verifyIndexUsage( + query, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++ + getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles) + } + withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_SHUFFLE_CHECK_ENABLED -> "true") { + val execPlan = query().queryExecution.executedPlan + val shuffleCnt = execPlan.collect { + case smj: SortMergeJoinExec => + (getShuffleCnt(smj.left), getShuffleCnt(smj.right)) + }.head + assert(shuffleCnt._1 === 1) + // One shuffle node of right child is removed with shuffle count check. + assert(shuffleCnt._2 === 1) + + // For right child, indexRight can be still applied by FilterIndexRule. + verifyIndexUsage( + query, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles ++ + getIndexFilesPath(indexConfig2.indexName, Seq(0)) ++ appendedFiles) + } } } }