From 4d23d66d17511af5df9eb6e74f31abfa1cb2de3c Mon Sep 17 00:00:00 2001 From: gaoyangxiaozhu Date: Thu, 27 Jun 2024 16:59:44 +0800 Subject: [PATCH 1/7] rename transformhit to fallbackhit --- .../clickhouse/CHSparkPlanExecApi.scala | 8 ++-- .../FallbackBroadcaseHashJoinRules.scala | 16 +++---- .../velox/VeloxSparkPlanExecApi.scala | 8 ++-- .../execution/ScanTransformerFactory.scala | 6 +-- .../gluten/extension/StrategyOverrides.scala | 2 +- .../EnsureLocalSortRequirements.scala | 4 +- .../columnar/ExpandFallbackPolicy.scala | 8 ++-- ...mHintRule.scala => FallbackHintRule.scala} | 44 +++++++++---------- .../columnar/OffloadSingleNode.scala | 22 +++++----- ...RemoveNativeWriteFilesSortAndProject.scala | 2 +- .../enumerated/EnumeratedApplier.scala | 2 +- .../columnar/heuristic/HeuristicApplier.scala | 4 +- .../RewriteSparkPlanRulesManager.scala | 20 ++++----- .../columnar/validator/Validators.scala | 6 +-- .../execution/GlutenFallbackReporter.scala | 6 +-- .../GlutenFormatWriterInjectsBase.scala | 4 +- .../execution/FallbackStrategiesSuite.scala | 6 +-- .../execution/FallbackStrategiesSuite.scala | 6 +-- .../execution/FallbackStrategiesSuite.scala | 6 +-- 19 files changed, 90 insertions(+), 90 deletions(-) rename gluten-core/src/main/scala/org/apache/gluten/extension/columnar/{TransformHintRule.scala => FallbackHintRule.scala} (95%) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 1c83e326eed4..99714a0aa168 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteToDateExpresstionRule} -import org.apache.gluten.extension.columnar.AddTransformHintRule +import org.apache.gluten.extension.columnar.AddFallbackHintRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.sql.shims.SparkShimLoader @@ -147,7 +147,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { child match { case scan: FileSourceScanExec if (checkMergeTreeFileFormat(scan.relation)) => - // For the validation phase of the AddTransformHintRule + // For the validation phase of the AddFallbackHintRule CHFilterExecTransformer(condition, child) case scan: FileSourceScanExecTransformerBase if (checkMergeTreeFileFormat(scan.relation)) => // For the transform phase, the FileSourceScanExec is already transformed @@ -227,7 +227,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { // FIXME: The operation happens inside ReplaceSingleNode(). // Caller may not know it adds project on top of the shuffle. val project = TransformPreOverrides().apply( - AddTransformHintRule().apply( + AddFallbackHintRule().apply( ProjectExec(plan.child.output ++ projectExpressions, plan.child))) var newExprs = Seq[Expression]() for (i <- exprs.indices) { @@ -252,7 +252,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { // FIXME: The operation happens inside ReplaceSingleNode(). // Caller may not know it adds project on top of the shuffle. val project = TransformPreOverrides().apply( - AddTransformHintRule().apply( + AddFallbackHintRule().apply( ProjectExec(plan.child.output ++ projectExpressions, plan.child))) var newOrderings = Seq[SortOrder]() for (i <- orderings.indices) { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala index 873ecb8342a6..1b3ce0188972 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala @@ -19,7 +19,7 @@ package org.apache.gluten.extension import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.TransformHints.EncodeTransformableTagImplicits +import org.apache.gluten.extension.columnar.FallbackHints.EncodeTransformableTagImplicits import org.apache.gluten.utils.PhysicalPlanSelector import org.apache.spark.sql.SparkSession @@ -61,7 +61,7 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend "columnar broadcast exchange is disabled or " + "columnar broadcast join is disabled") } else { - if (TransformHints.isNotTransformable(bhj)) { + if (FallbackHints.isNotTransformable(bhj)) { ValidationResult.notOk("broadcast join is already tagged as not transformable") } else { val bhjTransformer = BackendsApiManager.getSparkPlanExecApiInstance @@ -83,8 +83,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend } } } - TransformHints.tagNotTransformable(bhj, isTransformable) - TransformHints.tagNotTransformable(exchange, isTransformable) + FallbackHints.tagNotTransformable(bhj, isTransformable) + FallbackHints.tagNotTransformable(exchange, isTransformable) case _ => // Skip. This might be the case that the exchange was already // executed in earlier stage @@ -116,7 +116,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl // Currently their doBroadcast() methods just propagate child's broadcast // payloads which is not right in speaking of columnar. if (!enableColumnarBroadcastJoin) { - TransformHints.tagNotTransformable( + FallbackHints.tagNotTransformable( bhj, "columnar BroadcastJoin is not enabled in BroadcastHashJoinExec") } else { @@ -149,7 +149,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl case Some(exchange @ BroadcastExchangeExec(mode, child)) => isBhjTransformable.tagOnFallback(bhj) if (!isBhjTransformable.isValid) { - TransformHints.tagNotTransformable(exchange, isBhjTransformable) + FallbackHints.tagNotTransformable(exchange, isBhjTransformable) } case None => // we are in AQE, find the hidden exchange @@ -182,7 +182,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl // to conform to the underlying exchange's type, columnar or vanilla exchange match { case BroadcastExchangeExec(mode, child) => - TransformHints.tagNotTransformable( + FallbackHints.tagNotTransformable( bhj, "it's a materialized broadcast exchange or reused broadcast exchange") case ColumnarBroadcastExchangeExec(mode, child) => @@ -199,7 +199,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl } } catch { case e: UnsupportedOperationException => - TransformHints.tagNotTransformable( + FallbackHints.tagNotTransformable( p, s"${e.getMessage}, original Spark plan is " + s"${p.getClass}(${p.children.toList.map(_.getClass)})") diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 1f868c4c2044..202bc1b2a5b5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -25,7 +25,7 @@ import org.apache.gluten.expression._ import org.apache.gluten.expression.ExpressionNames.{TRANSFORM_KEYS, TRANSFORM_VALUES} import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension._ -import org.apache.gluten.extension.columnar.TransformHints +import org.apache.gluten.extension.columnar.FallbackHints import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride import org.apache.gluten.sql.shims.SparkShimLoader @@ -372,7 +372,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val newChild = maybeAddAppendBatchesExec(projectTransformer) ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1)) } else { - TransformHints.tagNotTransformable(shuffle, validationResult) + FallbackHints.tagNotTransformable(shuffle, validationResult) shuffle.withNewChildren(child :: Nil) } case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition && num > 1 => @@ -398,7 +398,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { projectTransformer } else { val project = ProjectExec(projectList, child) - TransformHints.tagNotTransformable(project, projectBeforeSortValidationResult) + FallbackHints.tagNotTransformable(project, projectBeforeSortValidationResult) project } val sortOrder = SortOrder(projectBeforeSort.output.head, Ascending) @@ -411,7 +411,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer) ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output) } else { - TransformHints.tagNotTransformable(shuffle, validationResult) + FallbackHints.tagNotTransformable(shuffle, validationResult) shuffle.withNewChildren(child :: Nil) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index fcb9e983e76b..acc8b1ca9cc1 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -17,7 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.exception.GlutenNotSupportException -import org.apache.gluten.extension.columnar.TransformHints +import org.apache.gluten.extension.columnar.FallbackHints import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.catalyst.expressions.Expression @@ -99,7 +99,7 @@ object ScanTransformerFactory { transformer } else { val newSource = batchScan.copy(runtimeFilters = transformer.runtimeFilters) - TransformHints.tagNotTransformable(newSource, validationResult.reason.get) + FallbackHints.tagNotTransformable(newSource, validationResult.reason.get) newSource } } else { @@ -109,7 +109,7 @@ object ScanTransformerFactory { if (validation) { throw new GlutenNotSupportException(s"Unsupported scan ${batchScan.scan}") } - TransformHints.tagNotTransformable(batchScan, "The scan in BatchScanExec is not supported.") + FallbackHints.tagNotTransformable(batchScan, "The scan in BatchScanExec is not supported.") batchScan } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/StrategyOverrides.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/StrategyOverrides.scala index f2f786259393..909a9f230174 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/StrategyOverrides.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/StrategyOverrides.scala @@ -18,8 +18,8 @@ package org.apache.gluten.extension import org.apache.gluten.{GlutenConfig, GlutenSparkExtensionsInjector} import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.extension.columnar.FallbackHints.TAG import org.apache.gluten.extension.columnar.TRANSFORM_UNSUPPORTED -import org.apache.gluten.extension.columnar.TransformHints.TAG import org.apache.gluten.utils.LogicalPlanSelector import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy} diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala index 0f5fc21aff87..87d26560208c 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala @@ -37,7 +37,7 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] { requiredOrdering: Seq[SortOrder]): SparkPlan = { val newChild = SortExec(requiredOrdering, global = false, child = originalChild) if (!GlutenConfig.getConf.enableColumnarSort) { - TransformHints.tagNotTransformable(newChild, "columnar Sort is not enabled in SortExec") + FallbackHints.tagNotTransformable(newChild, "columnar Sort is not enabled in SortExec") newChild } else { val newChildWithTransformer = @@ -50,7 +50,7 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] { if (validationResult.isValid) { newChildWithTransformer } else { - TransformHints.tagNotTransformable(newChild, validationResult) + FallbackHints.tagNotTransformable(newChild, validationResult) newChild } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala index 4ee153173c5c..a1176b087e89 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala @@ -239,11 +239,11 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP // Propagate fallback reason to vanilla SparkPlan glutenPlan.foreach { case _: GlutenPlan => - case p: SparkPlan if TransformHints.isNotTransformable(p) && p.logicalLink.isDefined => + case p: SparkPlan if FallbackHints.isNotTransformable(p) && p.logicalLink.isDefined => originalPlan .find(_.logicalLink.exists(_.fastEquals(p.logicalLink.get))) - .filterNot(TransformHints.isNotTransformable) - .foreach(origin => TransformHints.tag(origin, TransformHints.getHint(p))) + .filterNot(FallbackHints.isNotTransformable) + .foreach(origin => FallbackHints.tag(origin, FallbackHints.getHint(p))) case _ => } @@ -278,7 +278,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP ) { plan } else { - TransformHints.tagAllNotTransformable( + FallbackHints.tagAllNotTransformable( vanillaSparkPlan, TRANSFORM_UNSUPPORTED(fallbackInfo.reason, appendReasonIfExists = false)) FallbackNode(vanillaSparkPlan) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala similarity index 95% rename from gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala index d32cf2d22eb4..876ae3c0b286 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.extension.{GlutenPlan, ValidationResult} -import org.apache.gluten.extension.columnar.TransformHints.EncodeTransformableTagImplicits +import org.apache.gluten.extension.columnar.FallbackHints.EncodeTransformableTagImplicits import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.sql.shims.SparkShimLoader @@ -47,19 +47,19 @@ import org.apache.spark.sql.types.StringType import org.apache.commons.lang3.exception.ExceptionUtils -sealed trait TransformHint { +sealed trait FallbackHint { val stacktrace: Option[String] = - if (TransformHints.DEBUG) { + if (FallbackHints.DEBUG) { Some(ExceptionUtils.getStackTrace(new Throwable())) } else None } case class TRANSFORM_UNSUPPORTED(reason: Option[String], appendReasonIfExists: Boolean = true) - extends TransformHint + extends FallbackHint -object TransformHints { - val TAG: TreeNodeTag[TransformHint] = - TreeNodeTag[TransformHint]("org.apache.gluten.transformhint") +object FallbackHints { + val TAG: TreeNodeTag[FallbackHint] = + TreeNodeTag[FallbackHint]("org.apache.gluten.fallbackhint") val DEBUG = false @@ -92,7 +92,7 @@ object TransformHints { } } - def tag(plan: SparkPlan, hint: TransformHint): Unit = { + def tag(plan: SparkPlan, hint: FallbackHint): Unit = { val mergedHint = getHintOption(plan) .map { case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason), originAppend) => @@ -142,12 +142,12 @@ object TransformHints { } } - def getHint(plan: SparkPlan): TransformHint = { + def getHint(plan: SparkPlan): FallbackHint = { getHintOption(plan).getOrElse( throw new IllegalStateException("Transform hint tag not set in plan: " + plan.toString())) } - def getHintOption(plan: SparkPlan): Option[TransformHint] = { + def getHintOption(plan: SparkPlan): Option[FallbackHint] = { plan.getTagValue(TAG) } @@ -192,7 +192,7 @@ object TransformHints { case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { if (GlutenConfig.getConf.enableAnsiMode) { - plan.foreach(TransformHints.tagNotTransformable(_, "does not support ansi mode")) + plan.foreach(FallbackHints.tagNotTransformable(_, "does not support ansi mode")) } plan } @@ -215,7 +215,7 @@ case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] } def tagNotTransformable(plan: SparkPlan): SparkPlan = { - TransformHints.tagNotTransformable(plan, "fallback multi codegens") + FallbackHints.tagNotTransformable(plan, "fallback multi codegens") plan } @@ -304,11 +304,11 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { if (p.children.exists(_.output.isEmpty)) { // Some backends are not eligible to offload plan with zero-column input. // If any child have empty output, mark the plan and that child as UNSUPPORTED. - TransformHints.tagNotTransformable(p, "at least one of its children has empty output") + FallbackHints.tagNotTransformable(p, "at least one of its children has empty output") p.children.foreach { child => if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) { - TransformHints.tagNotTransformable( + FallbackHints.tagNotTransformable( child, "at least one of its children has empty output") } @@ -323,8 +323,8 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { // The doValidate function will be called to check if the conversion is supported. // If false is returned or any unsupported exception is thrown, a row guard will // be added on the top of that plan to prevent actual conversion. -case class AddTransformHintRule() extends Rule[SparkPlan] { - import AddTransformHintRule._ +case class AddFallbackHintRule() extends Rule[SparkPlan] { + import AddFallbackHintRule._ private val glutenConf: GlutenConfig = GlutenConfig.getConf private val validator = Validators .builder() @@ -352,7 +352,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { val outcome = validator.validate(plan) outcome match { case Validator.Failed(reason) => - TransformHints.tagNotTransformable(plan, reason) + FallbackHints.tagNotTransformable(plan, reason) return case Validator.Passed => } @@ -421,7 +421,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { plan.leftKeys, plan.rightKeys, plan.joinType, - TransformHints.getShuffleHashJoinBuildSide(plan), + FallbackHints.getShuffleHashJoinBuildSide(plan), plan.condition, plan.left, plan.right, @@ -544,7 +544,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { } } catch { case e @ (_: GlutenNotSupportException | _: UnsupportedOperationException) => - TransformHints.tagNotTransformable( + FallbackHints.tagNotTransformable( plan, s"${e.getMessage}, original Spark plan is " + s"${plan.getClass}(${plan.children.toList.map(_.getClass)})") @@ -555,7 +555,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { } } -object AddTransformHintRule { +object AddFallbackHintRule { implicit private class ValidatorBuilderImplicits(builder: Validators.Builder) { /** @@ -593,9 +593,9 @@ object AddTransformHintRule { } } -case class RemoveTransformHintRule() extends Rule[SparkPlan] { +case class RemoveFallbackHintRule() extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - plan.foreach(TransformHints.untag) + plan.foreach(FallbackHints.untag) plan } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 8cd2a5fb67bd..ecc4cdf73b9d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -50,7 +50,7 @@ sealed trait OffloadSingleNode extends Logging { // Aggregation transformation. case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { - case plan if TransformHints.isNotTransformable(plan) => + case plan if FallbackHints.isNotTransformable(plan) => plan case agg: HashAggregateExec => genHashAggregateExec(agg) @@ -66,7 +66,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { * the actually used plan for execution. */ private def genHashAggregateExec(plan: HashAggregateExec): SparkPlan = { - if (TransformHints.isNotTransformable(plan)) { + if (FallbackHints.isNotTransformable(plan)) { return plan } @@ -86,7 +86,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { HashAggregateExecBaseTransformer.from(plan)() case _ => // If the child is not transformable, do not transform the agg. - TransformHints.tagNotTransformable(plan, "child output schema is empty") + FallbackHints.tagNotTransformable(plan, "child output schema is empty") plan } } else { @@ -99,7 +99,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { // Exchange transformation. case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { - case p if TransformHints.isNotTransformable(p) => + case p if FallbackHints.isNotTransformable(p) => p case s: ShuffleExchangeExec if (s.child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) && @@ -118,7 +118,7 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = { - if (TransformHints.isNotTransformable(plan)) { + if (FallbackHints.isNotTransformable(plan)) { logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.") return plan } @@ -132,7 +132,7 @@ case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { plan.leftKeys, plan.rightKeys, plan.joinType, - TransformHints.getShuffleHashJoinBuildSide(plan), + FallbackHints.getShuffleHashJoinBuildSide(plan), plan.condition, left, right, @@ -204,7 +204,7 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { * the actually used plan for execution. */ private def genFilterExec(filter: FilterExec): SparkPlan = { - if (TransformHints.isNotTransformable(filter)) { + if (FallbackHints.isNotTransformable(filter)) { return filter } @@ -213,7 +213,7 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { // Push down the left conditions in Filter into FileSourceScan. val newChild: SparkPlan = filter.child match { case scan @ (_: FileSourceScanExec | _: BatchScanExec) => - if (TransformHints.isTransformable(scan)) { + if (FallbackHints.isTransformable(scan)) { val newScan = FilterHandler.pushFilterToScan(filter.condition, scan) newScan match { @@ -251,7 +251,7 @@ object OffloadOthers { def doReplace(p: SparkPlan): SparkPlan = { val plan = p - if (TransformHints.isNotTransformable(plan)) { + if (FallbackHints.isNotTransformable(plan)) { return plan } plan match { @@ -406,7 +406,7 @@ object OffloadOthers { transformer } else { logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - TransformHints.tagNotTransformable(plan, validationResult.reason.get) + FallbackHints.tagNotTransformable(plan, validationResult.reason.get) plan } case plan: BatchScanExec => @@ -421,7 +421,7 @@ object OffloadOthers { return hiveTableScanExecTransformer } logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - TransformHints.tagNotTransformable(plan, validateResult.reason.get) + FallbackHints.tagNotTransformable(plan, validateResult.reason.get) plan case other => throw new GlutenNotSupportException(s"${other.getClass.toString} is not supported.") diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala index ce94626d999d..fc03ad1eccd9 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala @@ -76,7 +76,7 @@ object NativeWriteFilesWithSkippingSortAndProject extends Logging { } else { // If we can not transform the project, then we fallback to origin plan which means // we also retain the sort operator. - TransformHints.tagNotTransformable(p, validationResult) + FallbackHints.tagNotTransformable(p, validationResult) None } case _ => None diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index d5260f66adba..f5fa078603cf 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -128,7 +128,7 @@ class EnumeratedApplier(session: SparkSession) // when columnar table cache is enabled. (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), - (_: SparkSession) => RemoveTransformHintRule() + (_: SparkSession) => RemoveFallbackHintRule() ) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index d925bc231cd9..0aad29bd544d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -103,7 +103,7 @@ class HeuristicApplier(session: SparkSession) (_: SparkSession) => FallbackEmptySchemaRelation(), (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark), (_: SparkSession) => RewriteSparkPlanRulesManager(), - (_: SparkSession) => AddTransformHintRule() + (_: SparkSession) => AddFallbackHintRule() ) ::: List((_: SparkSession) => TransformPreOverrides()) ::: List( @@ -150,7 +150,7 @@ class HeuristicApplier(session: SparkSession) // when columnar table cache is enabled. (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), - (_: SparkSession) => RemoveTransformHintRule() + (_: SparkSession) => RemoveFallbackHintRule() ) } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala index ac663314bead..9a47abd53618 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.rewrite -import org.apache.gluten.extension.columnar.{AddTransformHintRule, TransformHint, TransformHints} +import org.apache.gluten.extension.columnar.{AddFallbackHintRule, FallbackHint, FallbackHints} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.rdd.RDD @@ -49,7 +49,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] extends Rule[SparkPlan] { private def mayNeedRewrite(plan: SparkPlan): Boolean = { - TransformHints.isTransformable(plan) && { + FallbackHints.isTransformable(plan) && { plan match { case _: SortExec => true case _: TakeOrderedAndProjectExec => true @@ -67,15 +67,15 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } } - private def getTransformHintBack( + private def getFallbackHintBack( origin: SparkPlan, - rewrittenPlan: SparkPlan): Option[TransformHint] = { + rewrittenPlan: SparkPlan): Option[FallbackHint] = { // The rewritten plan may contain more nodes than origin, here use the node name to get it back val target = rewrittenPlan.collect { case p if p.nodeName == origin.nodeName => p } assert(target.size == 1) - TransformHints.getHintOption(target.head) + FallbackHints.getHintOption(target.head) } private def applyRewriteRules(origin: SparkPlan): (SparkPlan, Option[String]) = { @@ -94,7 +94,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } override def apply(plan: SparkPlan): SparkPlan = { - val addHint = AddTransformHintRule() + val addHint = AddFallbackHintRule() plan.transformUp { case origin if mayNeedRewrite(origin) => // Add a wall to avoid transforming unnecessary nodes. @@ -105,18 +105,18 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] // Note, it is not expected, but it happens in CH backend when pulling out // aggregate. // TODO: Fix the exception and remove this branch - TransformHints.tagNotTransformable(origin, error.get) + FallbackHints.tagNotTransformable(origin, error.get) origin } else if (withWall.fastEquals(rewrittenPlan)) { // Return origin if the rewrite rules do nothing. - // We do not add tag and leave it to the outside `AddTransformHintRule`. + // We do not add tag and leave it to the outside `AddFallbackHintRule`. origin } else { addHint.apply(rewrittenPlan) - val hint = getTransformHintBack(origin, rewrittenPlan) + val hint = getFallbackHintBack(origin, rewrittenPlan) if (hint.isDefined) { // If the rewritten plan is still not transformable, return the original plan. - TransformHints.tag(origin, hint.get) + FallbackHints.tag(origin, hint.get) origin } else { rewrittenPlan.transformUp { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 56b63ef8457a..d98b11ed884b 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.validator import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi} import org.apache.gluten.expression.ExpressionUtils -import org.apache.gluten.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackHints, TRANSFORM_UNSUPPORTED} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution._ @@ -108,8 +108,8 @@ object Validators { private object FallbackByHint extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { - if (TransformHints.isNotTransformable(plan)) { - val hint = TransformHints.getHint(plan).asInstanceOf[TRANSFORM_UNSUPPORTED] + if (FallbackHints.isNotTransformable(plan)) { + val hint = FallbackHints.getHint(plan).asInstanceOf[TRANSFORM_UNSUPPORTED] return fail(hint.reason.getOrElse("Reason not recorded")) } pass() diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index 00b0248aee77..2fb0832d0a20 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackHints, TRANSFORM_UNSUPPORTED} import org.apache.gluten.utils.LogLevelUtil import org.apache.spark.sql.SparkSession @@ -57,8 +57,8 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio val validationLogLevel = glutenConfig.validationLogLevel plan.foreachUp { case _: GlutenPlan => // ignore - case p: SparkPlan if TransformHints.isNotTransformable(p) => - TransformHints.getHint(p) match { + case p: SparkPlan if FallbackHints.isNotTransformable(p) => + FallbackHints.getHint(p) match { case TRANSFORM_UNSUPPORTED(Some(reason), append) => logFallbackReason(validationLogLevel, p.nodeName, reason) // With in next round stage in AQE, the physical plan would be a new instance that diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala index fbdbeadba886..08a264524540 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer, TransformSupport, WholeStageTransformer} import org.apache.gluten.execution.datasource.GlutenFormatWriterInjects -import org.apache.gluten.extension.columnar.AddTransformHintRule +import org.apache.gluten.extension.columnar.AddFallbackHintRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager @@ -47,7 +47,7 @@ trait GlutenFormatWriterInjectsBase extends GlutenFormatWriterInjects { val rules = List( RewriteSparkPlanRulesManager(), - AddTransformHintRule(), + AddFallbackHintRule(), TransformPreOverrides() ) val transformed = rules.foldLeft(plan) { case (latestPlan, rule) => rule.apply(latestPlan) } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index fff883d49e86..ca88ee236b36 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, FallbackHints} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -124,10 +124,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - TransformHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = TransformHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 7976288dd4ef..1eaad898612d 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, FallbackHints} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -125,10 +125,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - TransformHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = TransformHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 7976288dd4ef..e1c0a6863a2c 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, TransformHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackHints, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -125,10 +125,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - TransformHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = TransformHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( From 7f1f1b2b11337f48e83513d617c837c28f0524a2 Mon Sep 17 00:00:00 2001 From: gaoyangxiaozhu Date: Thu, 27 Jun 2024 20:30:48 +0800 Subject: [PATCH 2/7] fix format --- .../apache/spark/sql/execution/FallbackStrategiesSuite.scala | 2 +- .../apache/spark/sql/execution/FallbackStrategiesSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index ca88ee236b36..8ad717efdcc7 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, FallbackHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackHints, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 1eaad898612d..e1c0a6863a2c 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, TRANSFORM_UNSUPPORTED, FallbackHints} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackHints, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector From 464af4cf99061e57d486f163e72421290467aca5 Mon Sep 17 00:00:00 2001 From: gaoyangxiaozhu Date: Mon, 1 Jul 2024 14:55:33 +0800 Subject: [PATCH 3/7] triiger CI From 20f57ab02e8e3146064832b87d93a5e80a16935b Mon Sep 17 00:00:00 2001 From: gaoyangxiaozhu Date: Mon, 1 Jul 2024 17:43:09 +0800 Subject: [PATCH 4/7] fix build --- .../gluten/extension/columnar/OffloadSingleNode.scala | 10 +++++----- .../rewrite/RewriteSparkPlanRulesManager.scala | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index fe8e501c5ccc..b720cd106f14 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -291,11 +291,11 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { f } } - val addHint = AddTransformHintRule() + val addHint = AddFallbackHintRule() val newProjectList = projectExec.projectList.filterNot(containsInputFileRelatedExpr) val newProjectExec = ProjectExec(newProjectList, projectExec.child) addHint.apply(newProjectExec) - if (TransformHints.isNotTransformable(newProjectExec)) { + if (FallbackHints.isNotTransformable(newProjectExec)) { // Project is still not transformable after remove `input_file_name` expressions. projectExec } else { @@ -305,7 +305,7 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { // /sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L506 val leafScans = findScanNodes(projectExec) assert(leafScans.size <= 1) - if (leafScans.isEmpty || TransformHints.isNotTransformable(leafScans(0))) { + if (leafScans.isEmpty || FallbackHints.isNotTransformable(leafScans(0))) { // It means // 1. projectExec has `input_file_name` but no scan child. // 2. It has scan child node but the scan node fallback. @@ -326,12 +326,12 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { private def genProjectExec(projectExec: ProjectExec): SparkPlan = { if ( - TransformHints.isNotTransformable(projectExec) && + FallbackHints.isNotTransformable(projectExec) && BackendsApiManager.getSettings.supportNativeInputFileRelatedExpr() && projectExec.projectList.exists(containsInputFileRelatedExpr) ) { tryOffloadProjectExecWithInputFileRelatedExprs(projectExec) - } else if (TransformHints.isNotTransformable(projectExec)) { + } else if (FallbackHints.isNotTransformable(projectExec)) { projectExec } else { logDebug(s"Columnar Processing for ${projectExec.getClass} is currently supported.") diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala index 9c30c2c426e9..24d7d890c4cf 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala @@ -67,7 +67,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } } - private def getFallbackHintBack(rewrittenPlan: SparkPlan): Option[TransformHint] = { + private def getFallbackHintBack(rewrittenPlan: SparkPlan): Option[FallbackHint] = { // The rewritten plan may contain more nodes than origin, for now it should only be // `ProjectExec`. val target = rewrittenPlan.collect { From 27ebc3dd314ccd359486405f6c1e1be8b4ced4f0 Mon Sep 17 00:00:00 2001 From: gaoyangxiaozhu Date: Mon, 1 Jul 2024 19:49:45 +0800 Subject: [PATCH 5/7] rename fallbackhits to fallbackTags --- .../clickhouse/CHSparkPlanExecApi.scala | 8 +- .../FallbackBroadcaseHashJoinRules.scala | 16 +-- .../velox/VeloxSparkPlanExecApi.scala | 8 +- .../execution/FunctionsValidateTest.scala | 2 + .../execution/ScanTransformerFactory.scala | 6 +- .../EnsureLocalSortRequirements.scala | 4 +- .../columnar/ExpandFallbackPolicy.scala | 8 +- .../extension/columnar/FallbackHintRule.scala | 103 ++++++++---------- .../columnar/OffloadSingleNode.scala | 30 ++--- ...RemoveNativeWriteFilesSortAndProject.scala | 2 +- .../enumerated/EnumeratedApplier.scala | 2 +- .../columnar/heuristic/HeuristicApplier.scala | 4 +- .../RewriteSparkPlanRulesManager.scala | 18 +-- .../columnar/validator/Validators.scala | 6 +- .../execution/GlutenFallbackReporter.scala | 6 +- .../GlutenFormatWriterInjectsBase.scala | 4 +- .../WholeStageTransformerSuite.scala | 4 +- .../execution/FallbackStrategiesSuite.scala | 6 +- .../execution/FallbackStrategiesSuite.scala | 6 +- .../execution/FallbackStrategiesSuite.scala | 6 +- 20 files changed, 122 insertions(+), 127 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index fb3560c6f84f..a4a4a20333e7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.extension.{CountDistinctWithoutExpand, FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, RewriteToDateExpresstionRule} -import org.apache.gluten.extension.columnar.AddFallbackHintRule +import org.apache.gluten.extension.columnar.AddFallbackTagRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.sql.shims.SparkShimLoader @@ -146,7 +146,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { child match { case scan: FileSourceScanExec if (checkMergeTreeFileFormat(scan.relation)) => - // For the validation phase of the AddFallbackHintRule + // For the validation phase of the AddFallbackTagRule CHFilterExecTransformer(condition, child) case scan: FileSourceScanExecTransformerBase if (checkMergeTreeFileFormat(scan.relation)) => // For the transform phase, the FileSourceScanExec is already transformed @@ -226,7 +226,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { // FIXME: The operation happens inside ReplaceSingleNode(). // Caller may not know it adds project on top of the shuffle. val project = TransformPreOverrides().apply( - AddFallbackHintRule().apply( + AddFallbackTagRule().apply( ProjectExec(plan.child.output ++ projectExpressions, plan.child))) var newExprs = Seq[Expression]() for (i <- exprs.indices) { @@ -251,7 +251,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { // FIXME: The operation happens inside ReplaceSingleNode(). // Caller may not know it adds project on top of the shuffle. val project = TransformPreOverrides().apply( - AddFallbackHintRule().apply( + AddFallbackTagRule().apply( ProjectExec(plan.child.output ++ projectExpressions, plan.child))) var newOrderings = Seq[SortOrder]() for (i <- orderings.indices) { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala index 1b3ce0188972..59c2d6494bdb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala @@ -19,7 +19,7 @@ package org.apache.gluten.extension import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.FallbackHints.EncodeTransformableTagImplicits +import org.apache.gluten.extension.columnar.FallbackTags.EncodeFallbackTagImplicits import org.apache.gluten.utils.PhysicalPlanSelector import org.apache.spark.sql.SparkSession @@ -61,7 +61,7 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend "columnar broadcast exchange is disabled or " + "columnar broadcast join is disabled") } else { - if (FallbackHints.isNotTransformable(bhj)) { + if (FallbackTags.nonEmpty(bhj)) { ValidationResult.notOk("broadcast join is already tagged as not transformable") } else { val bhjTransformer = BackendsApiManager.getSparkPlanExecApiInstance @@ -83,8 +83,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend } } } - FallbackHints.tagNotTransformable(bhj, isTransformable) - FallbackHints.tagNotTransformable(exchange, isTransformable) + FallbackTags.add(bhj, isTransformable) + FallbackTags.add(exchange, isTransformable) case _ => // Skip. This might be the case that the exchange was already // executed in earlier stage @@ -116,7 +116,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl // Currently their doBroadcast() methods just propagate child's broadcast // payloads which is not right in speaking of columnar. if (!enableColumnarBroadcastJoin) { - FallbackHints.tagNotTransformable( + FallbackTags.add( bhj, "columnar BroadcastJoin is not enabled in BroadcastHashJoinExec") } else { @@ -149,7 +149,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl case Some(exchange @ BroadcastExchangeExec(mode, child)) => isBhjTransformable.tagOnFallback(bhj) if (!isBhjTransformable.isValid) { - FallbackHints.tagNotTransformable(exchange, isBhjTransformable) + FallbackTags.add(exchange, isBhjTransformable) } case None => // we are in AQE, find the hidden exchange @@ -182,7 +182,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl // to conform to the underlying exchange's type, columnar or vanilla exchange match { case BroadcastExchangeExec(mode, child) => - FallbackHints.tagNotTransformable( + FallbackTags.add( bhj, "it's a materialized broadcast exchange or reused broadcast exchange") case ColumnarBroadcastExchangeExec(mode, child) => @@ -199,7 +199,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession) extends Rule[SparkPl } } catch { case e: UnsupportedOperationException => - FallbackHints.tagNotTransformable( + FallbackTags.add( p, s"${e.getMessage}, original Spark plan is " + s"${p.getClass}(${p.children.toList.map(_.getClass)})") diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 49f4a737098f..e13ebd971ef5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -25,7 +25,7 @@ import org.apache.gluten.expression._ import org.apache.gluten.expression.ExpressionNames.{TRANSFORM_KEYS, TRANSFORM_VALUES} import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension._ -import org.apache.gluten.extension.columnar.FallbackHints +import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride import org.apache.gluten.sql.shims.SparkShimLoader @@ -371,7 +371,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val newChild = maybeAddAppendBatchesExec(projectTransformer) ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output.drop(1)) } else { - FallbackHints.tagNotTransformable(shuffle, validationResult) + FallbackTags.add(shuffle, validationResult) shuffle.withNewChildren(child :: Nil) } case RoundRobinPartitioning(num) if SQLConf.get.sortBeforeRepartition && num > 1 => @@ -397,7 +397,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { projectTransformer } else { val project = ProjectExec(projectList, child) - FallbackHints.tagNotTransformable(project, projectBeforeSortValidationResult) + FallbackTags.add(project, projectBeforeSortValidationResult) project } val sortOrder = SortOrder(projectBeforeSort.output.head, Ascending) @@ -410,7 +410,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { val newChild = maybeAddAppendBatchesExec(dropSortColumnTransformer) ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output) } else { - FallbackHints.tagNotTransformable(shuffle, validationResult) + FallbackTags.add(shuffle, validationResult) shuffle.withNewChildren(child :: Nil) } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala index 12f66278f70a..0cb067ede12d 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala @@ -39,6 +39,8 @@ class FunctionsValidateTest extends WholeStageTransformerSuite { .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") .set("spark.sql.sources.useV1SourceList", "avro") + .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") + .set("spark.gluten.sql.columnar.backend.velox.glogVerboseLevel", "1") .set( "spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + "," + diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index acc8b1ca9cc1..44a823834f92 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -17,7 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.exception.GlutenNotSupportException -import org.apache.gluten.extension.columnar.FallbackHints +import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.catalyst.expressions.Expression @@ -99,7 +99,7 @@ object ScanTransformerFactory { transformer } else { val newSource = batchScan.copy(runtimeFilters = transformer.runtimeFilters) - FallbackHints.tagNotTransformable(newSource, validationResult.reason.get) + FallbackTags.add(newSource, validationResult.reason.get) newSource } } else { @@ -109,7 +109,7 @@ object ScanTransformerFactory { if (validation) { throw new GlutenNotSupportException(s"Unsupported scan ${batchScan.scan}") } - FallbackHints.tagNotTransformable(batchScan, "The scan in BatchScanExec is not supported.") + FallbackTags.add(batchScan, "The scan in BatchScanExec is not supported.") batchScan } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala index 87d26560208c..afc29a51e19a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala @@ -37,7 +37,7 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] { requiredOrdering: Seq[SortOrder]): SparkPlan = { val newChild = SortExec(requiredOrdering, global = false, child = originalChild) if (!GlutenConfig.getConf.enableColumnarSort) { - FallbackHints.tagNotTransformable(newChild, "columnar Sort is not enabled in SortExec") + FallbackTags.add(newChild, "columnar Sort is not enabled in SortExec") newChild } else { val newChildWithTransformer = @@ -50,7 +50,7 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] { if (validationResult.isValid) { newChildWithTransformer } else { - FallbackHints.tagNotTransformable(newChild, validationResult) + FallbackTags.add(newChild, validationResult) newChild } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala index a1176b087e89..e334fcfbce88 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala @@ -239,11 +239,11 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP // Propagate fallback reason to vanilla SparkPlan glutenPlan.foreach { case _: GlutenPlan => - case p: SparkPlan if FallbackHints.isNotTransformable(p) && p.logicalLink.isDefined => + case p: SparkPlan if FallbackTags.nonEmpty(p) && p.logicalLink.isDefined => originalPlan .find(_.logicalLink.exists(_.fastEquals(p.logicalLink.get))) - .filterNot(FallbackHints.isNotTransformable) - .foreach(origin => FallbackHints.tag(origin, FallbackHints.getHint(p))) + .filterNot(FallbackTags.nonEmpty) + .foreach(origin => FallbackTags.tag(origin, FallbackTags.getTag(p))) case _ => } @@ -278,7 +278,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP ) { plan } else { - FallbackHints.tagAllNotTransformable( + FallbackTags.addRecursively( vanillaSparkPlan, TRANSFORM_UNSUPPORTED(fallbackInfo.reason, appendReasonIfExists = false)) FallbackNode(vanillaSparkPlan) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala index 404dbf6940bd..d34cb0df4e7e 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala @@ -21,7 +21,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.extension.{GlutenPlan, ValidationResult} -import org.apache.gluten.extension.columnar.FallbackHints.EncodeTransformableTagImplicits +import org.apache.gluten.extension.columnar.FallbackTags.EncodeFallbackTagImplicits import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.sql.shims.SparkShimLoader @@ -45,19 +45,19 @@ import org.apache.spark.sql.types.StringType import org.apache.commons.lang3.exception.ExceptionUtils -sealed trait FallbackHint { +sealed trait FallbackTag { val stacktrace: Option[String] = - if (FallbackHints.DEBUG) { + if (FallbackTags.DEBUG) { Some(ExceptionUtils.getStackTrace(new Throwable())) } else None } case class TRANSFORM_UNSUPPORTED(reason: Option[String], appendReasonIfExists: Boolean = true) - extends FallbackHint + extends FallbackTag -object FallbackHints { - val TAG: TreeNodeTag[FallbackHint] = - TreeNodeTag[FallbackHint]("org.apache.gluten.fallbackhint") +object FallbackTags { + val TAG: TreeNodeTag[FallbackTag] = + TreeNodeTag[FallbackTag]("org.apache.gluten.FallbackTag") val DEBUG = false @@ -69,8 +69,8 @@ object FallbackHints { * validation rule. So user should not consider the plan "transformable" unless all validation * rules are passed. */ - def isNotTransformable(plan: SparkPlan): Boolean = { - getHintOption(plan) match { + def nonEmpty(plan: SparkPlan): Boolean = { + getTagOption(plan) match { case Some(TRANSFORM_UNSUPPORTED(_, _)) => true case _ => false } @@ -82,10 +82,10 @@ object FallbackHints { * within Gluten transformers. If false, the plan node will be guaranteed fallback to Vanilla plan * node while being implemented. */ - def maybeTransformable(plan: SparkPlan): Boolean = !isNotTransformable(plan) + def maybeOffloadable(plan: SparkPlan): Boolean = !nonEmpty(plan) - def tag(plan: SparkPlan, hint: FallbackHint): Unit = { - val mergedHint = getHintOption(plan) + def tag(plan: SparkPlan, hint: FallbackTag): Unit = { + val mergedHint = getTagOption(plan) .map { case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason), originAppend) => hint match { @@ -117,33 +117,33 @@ object FallbackHints { plan.unsetTagValue(TAG) } - def tagNotTransformable(plan: SparkPlan, validationResult: ValidationResult): Unit = { + def add(plan: SparkPlan, validationResult: ValidationResult): Unit = { if (!validationResult.isValid) { tag(plan, TRANSFORM_UNSUPPORTED(validationResult.reason)) } } - def tagNotTransformable(plan: SparkPlan, reason: String): Unit = { + def add(plan: SparkPlan, reason: String): Unit = { tag(plan, TRANSFORM_UNSUPPORTED(Some(reason))) } - def tagAllNotTransformable(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = { + def addRecursively(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = { plan.foreach { case _: GlutenPlan => // ignore case other => tag(other, hint) } } - def getHint(plan: SparkPlan): FallbackHint = { - getHintOption(plan).getOrElse( + def getTag(plan: SparkPlan): FallbackTag = { + getTagOption(plan).getOrElse( throw new IllegalStateException("Transform hint tag not set in plan: " + plan.toString())) } - def getHintOption(plan: SparkPlan): Option[FallbackHint] = { + def getTagOption(plan: SparkPlan): Option[FallbackTag] = { plan.getTagValue(TAG) } - implicit class EncodeTransformableTagImplicits(validationResult: ValidationResult) { + implicit class EncodeFallbackTagImplicits(validationResult: ValidationResult) { def tagOnFallback(plan: SparkPlan): Unit = { if (validationResult.isValid) { return @@ -157,7 +157,7 @@ object FallbackHints { case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { if (GlutenConfig.getConf.enableAnsiMode) { - plan.foreach(FallbackHints.tagNotTransformable(_, "does not support ansi mode")) + plan.foreach(FallbackTags.add(_, "does not support ansi mode")) } plan } @@ -179,11 +179,11 @@ case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] case plan: SortMergeJoinExec if GlutenConfig.getConf.forceShuffledHashJoin => if ((count + 1) >= optimizeLevel) return true plan.children.exists(existsMultiCodegens(_, count + 1)) - case other => false + case _ => false } - def tagNotTransformable(plan: SparkPlan): SparkPlan = { - FallbackHints.tagNotTransformable(plan, "fallback multi codegens") + def addFallbackTag(plan: SparkPlan): SparkPlan = { + FallbackTags.add(plan, "fallback multi codegens") plan } @@ -200,35 +200,35 @@ case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] } } - def tagNotTransformableRecursive(plan: SparkPlan): SparkPlan = { + def addFallbackTagRecursive(plan: SparkPlan): SparkPlan = { plan match { case p: ShuffleExchangeExec => - tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens))) + addFallbackTag(p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens))) case p: BroadcastExchangeExec => - tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens))) + addFallbackTag(p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens))) case p: ShuffledHashJoinExec => - tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableRecursive))) + addFallbackTag(p.withNewChildren(p.children.map(addFallbackTagRecursive))) case p if !supportCodegen(p) => - p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens)) + p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens)) case p if isAQEShuffleReadExec(p) => - p.withNewChildren(p.children.map(tagNotTransformableForMultiCodegens)) + p.withNewChildren(p.children.map(tagOnFallbackForMultiCodegens)) case p: QueryStageExec => p - case p => tagNotTransformable(p.withNewChildren(p.children.map(tagNotTransformableRecursive))) + case p => addFallbackTag(p.withNewChildren(p.children.map(addFallbackTagRecursive))) } } - def tagNotTransformableForMultiCodegens(plan: SparkPlan): SparkPlan = { + def tagOnFallbackForMultiCodegens(plan: SparkPlan): SparkPlan = { plan match { case plan if existsMultiCodegens(plan) => - tagNotTransformableRecursive(plan) + addFallbackTagRecursive(plan) case other => - other.withNewChildren(other.children.map(tagNotTransformableForMultiCodegens)) + other.withNewChildren(other.children.map(tagOnFallbackForMultiCodegens)) } } override def apply(plan: SparkPlan): SparkPlan = { if (physicalJoinOptimize) { - tagNotTransformableForMultiCodegens(plan) + tagOnFallbackForMultiCodegens(plan) } else plan } } @@ -272,13 +272,11 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { if (p.children.exists(_.output.isEmpty)) { // Some backends are not eligible to offload plan with zero-column input. // If any child have empty output, mark the plan and that child as UNSUPPORTED. - FallbackHints.tagNotTransformable(p, "at least one of its children has empty output") + FallbackTags.add(p, "at least one of its children has empty output") p.children.foreach { child => if (child.output.isEmpty && !child.isInstanceOf[WriteFilesExec]) { - FallbackHints.tagNotTransformable( - child, - "at least one of its children has empty output") + FallbackTags.add(child, "at least one of its children has empty output") } } } @@ -291,8 +289,8 @@ case class FallbackEmptySchemaRelation() extends Rule[SparkPlan] { // The doValidate function will be called to check if the conversion is supported. // If false is returned or any unsupported exception is thrown, a row guard will // be added on the top of that plan to prevent actual conversion. -case class AddFallbackHintRule() extends Rule[SparkPlan] { - import AddFallbackHintRule._ +case class AddFallbackTagRule() extends Rule[SparkPlan] { + import AddFallbackTagRule._ private val glutenConf: GlutenConfig = GlutenConfig.getConf private val validator = Validators .builder() @@ -305,22 +303,15 @@ case class AddFallbackHintRule() extends Rule[SparkPlan] { .build() def apply(plan: SparkPlan): SparkPlan = { - addTransformableTags(plan) - } - - /** Inserts a transformable tag on top of those that are not supported. */ - private def addTransformableTags(plan: SparkPlan): SparkPlan = { - // Walk the tree with post-order - val out = plan.mapChildren(addTransformableTags) - addTransformableTag(out) - out + plan.foreachUp { case p => addFallbackTag(p) } + plan } - private def addTransformableTag(plan: SparkPlan): Unit = { + private def addFallbackTag(plan: SparkPlan): Unit = { val outcome = validator.validate(plan) outcome match { case Validator.Failed(reason) => - FallbackHints.tagNotTransformable(plan, reason) + FallbackTags.add(plan, reason) return case Validator.Passed => } @@ -508,11 +499,11 @@ case class AddFallbackHintRule() extends Rule[SparkPlan] { ) transformer.doValidate().tagOnFallback(plan) case _ => - // Currently we assume a plan to be transformable by default. + // Currently we assume a plan to be offload-able by default. } } catch { case e @ (_: GlutenNotSupportException | _: UnsupportedOperationException) => - FallbackHints.tagNotTransformable( + FallbackTags.add( plan, s"${e.getMessage}, original Spark plan is " + s"${plan.getClass}(${plan.children.toList.map(_.getClass)})") @@ -523,7 +514,7 @@ case class AddFallbackHintRule() extends Rule[SparkPlan] { } } -object AddFallbackHintRule { +object AddFallbackTagRule { implicit private class ValidatorBuilderImplicits(builder: Validators.Builder) { /** @@ -561,9 +552,9 @@ object AddFallbackHintRule { } } -case class RemoveFallbackHintRule() extends Rule[SparkPlan] { +case class RemoveFallbackTagRule() extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - plan.foreach(FallbackHints.untag) + plan.foreach(FallbackTags.untag) plan } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index b720cd106f14..7a4222b5cb38 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -56,7 +56,7 @@ sealed trait OffloadSingleNode extends Logging { // Aggregation transformation. case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { - case plan if FallbackHints.isNotTransformable(plan) => + case plan if FallbackTags.nonEmpty(plan) => plan case agg: HashAggregateExec => genHashAggregateExec(agg) @@ -72,7 +72,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { * the actually used plan for execution. */ private def genHashAggregateExec(plan: HashAggregateExec): SparkPlan = { - if (FallbackHints.isNotTransformable(plan)) { + if (FallbackTags.nonEmpty(plan)) { return plan } @@ -92,7 +92,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { HashAggregateExecBaseTransformer.from(plan)() case _ => // If the child is not transformable, do not transform the agg. - FallbackHints.tagNotTransformable(plan, "child output schema is empty") + FallbackTags.add(plan, "child output schema is empty") plan } } else { @@ -105,7 +105,7 @@ case class OffloadAggregate() extends OffloadSingleNode with LogLevelUtil { // Exchange transformation. case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = plan match { - case p if FallbackHints.isNotTransformable(p) => + case p if FallbackTags.nonEmpty(p) => p case s: ShuffleExchangeExec if (s.child.supportsColumnar || GlutenConfig.getConf.enablePreferColumnar) && @@ -124,7 +124,7 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { override def offload(plan: SparkPlan): SparkPlan = { - if (FallbackHints.isNotTransformable(plan)) { + if (FallbackTags.nonEmpty(plan)) { logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.") return plan } @@ -291,11 +291,11 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { f } } - val addHint = AddFallbackHintRule() + val addHint = AddFallbackTagRule() val newProjectList = projectExec.projectList.filterNot(containsInputFileRelatedExpr) val newProjectExec = ProjectExec(newProjectList, projectExec.child) addHint.apply(newProjectExec) - if (FallbackHints.isNotTransformable(newProjectExec)) { + if (FallbackTags.nonEmpty(newProjectExec)) { // Project is still not transformable after remove `input_file_name` expressions. projectExec } else { @@ -305,7 +305,7 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { // /sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L506 val leafScans = findScanNodes(projectExec) assert(leafScans.size <= 1) - if (leafScans.isEmpty || FallbackHints.isNotTransformable(leafScans(0))) { + if (leafScans.isEmpty || FallbackTags.nonEmpty(leafScans(0))) { // It means // 1. projectExec has `input_file_name` but no scan child. // 2. It has scan child node but the scan node fallback. @@ -326,12 +326,12 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { private def genProjectExec(projectExec: ProjectExec): SparkPlan = { if ( - FallbackHints.isNotTransformable(projectExec) && + FallbackTags.nonEmpty(projectExec) && BackendsApiManager.getSettings.supportNativeInputFileRelatedExpr() && projectExec.projectList.exists(containsInputFileRelatedExpr) ) { tryOffloadProjectExecWithInputFileRelatedExprs(projectExec) - } else if (FallbackHints.isNotTransformable(projectExec)) { + } else if (FallbackTags.nonEmpty(projectExec)) { projectExec } else { logDebug(s"Columnar Processing for ${projectExec.getClass} is currently supported.") @@ -366,7 +366,7 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { * the actually used plan for execution. */ private def genFilterExec(filter: FilterExec): SparkPlan = { - if (FallbackHints.isNotTransformable(filter)) { + if (FallbackTags.nonEmpty(filter)) { return filter } @@ -375,7 +375,7 @@ case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil { // Push down the left conditions in Filter into FileSourceScan. val newChild: SparkPlan = filter.child match { case scan @ (_: FileSourceScanExec | _: BatchScanExec) => - if (FallbackHints.maybeTransformable(scan)) { + if (FallbackTags.maybeOffloadable(scan)) { val newScan = FilterHandler.pushFilterToScan(filter.condition, scan) newScan match { @@ -410,7 +410,7 @@ object OffloadOthers { def doReplace(p: SparkPlan): SparkPlan = { val plan = p - if (FallbackHints.isNotTransformable(plan)) { + if (FallbackTags.nonEmpty(plan)) { return plan } plan match { @@ -561,7 +561,7 @@ object OffloadOthers { transformer } else { logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - FallbackHints.tagNotTransformable(plan, validationResult.reason.get) + FallbackTags.add(plan, validationResult.reason.get) plan } case plan: BatchScanExec => @@ -576,7 +576,7 @@ object OffloadOthers { return hiveTableScanExecTransformer } logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - FallbackHints.tagNotTransformable(plan, validateResult.reason.get) + FallbackTags.add(plan, validateResult.reason.get) plan case other => throw new GlutenNotSupportException(s"${other.getClass.toString} is not supported.") diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala index fc03ad1eccd9..d32de32ebb32 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala @@ -76,7 +76,7 @@ object NativeWriteFilesWithSkippingSortAndProject extends Logging { } else { // If we can not transform the project, then we fallback to origin plan which means // we also retain the sort operator. - FallbackHints.tagNotTransformable(p, validationResult) + FallbackTags.add(p, validationResult) None } case _ => None diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index f5fa078603cf..d1aed8c33824 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -128,7 +128,7 @@ class EnumeratedApplier(session: SparkSession) // when columnar table cache is enabled. (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), - (_: SparkSession) => RemoveFallbackHintRule() + (_: SparkSession) => RemoveFallbackTagRule() ) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index 0aad29bd544d..c0ff2a9e3447 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -103,7 +103,7 @@ class HeuristicApplier(session: SparkSession) (_: SparkSession) => FallbackEmptySchemaRelation(), (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark), (_: SparkSession) => RewriteSparkPlanRulesManager(), - (_: SparkSession) => AddFallbackHintRule() + (_: SparkSession) => AddFallbackTagRule() ) ::: List((_: SparkSession) => TransformPreOverrides()) ::: List( @@ -150,7 +150,7 @@ class HeuristicApplier(session: SparkSession) // when columnar table cache is enabled. (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s), (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s), - (_: SparkSession) => RemoveFallbackHintRule() + (_: SparkSession) => RemoveFallbackTagRule() ) } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala index 24d7d890c4cf..2abd4d7d4807 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.rewrite -import org.apache.gluten.extension.columnar.{AddFallbackHintRule, FallbackHint, FallbackHints} +import org.apache.gluten.extension.columnar.{AddFallbackTagRule, FallbackTag, FallbackTags} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.rdd.RDD @@ -49,7 +49,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] extends Rule[SparkPlan] { private def mayNeedRewrite(plan: SparkPlan): Boolean = { - FallbackHints.maybeTransformable(plan) && { + FallbackTags.maybeOffloadable(plan) && { plan match { case _: SortExec => true case _: TakeOrderedAndProjectExec => true @@ -67,14 +67,14 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } } - private def getFallbackHintBack(rewrittenPlan: SparkPlan): Option[FallbackHint] = { + private def getFallbackTagBack(rewrittenPlan: SparkPlan): Option[FallbackTag] = { // The rewritten plan may contain more nodes than origin, for now it should only be // `ProjectExec`. val target = rewrittenPlan.collect { case p if !p.isInstanceOf[ProjectExec] && !p.isInstanceOf[RewrittenNodeWall] => p } assert(target.size == 1) - FallbackHints.getHintOption(target.head) + FallbackTags.getTagOption(target.head) } private def applyRewriteRules(origin: SparkPlan): (SparkPlan, Option[String]) = { @@ -93,7 +93,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } override def apply(plan: SparkPlan): SparkPlan = { - val addHint = AddFallbackHintRule() + val addHint = AddFallbackTagRule() plan.transformUp { case origin if mayNeedRewrite(origin) => // Add a wall to avoid transforming unnecessary nodes. @@ -104,18 +104,18 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] // Note, it is not expected, but it happens in CH backend when pulling out // aggregate. // TODO: Fix the exception and remove this branch - FallbackHints.tagNotTransformable(origin, error.get) + FallbackTags.add(origin, error.get) origin } else if (withWall.fastEquals(rewrittenPlan)) { // Return origin if the rewrite rules do nothing. - // We do not add tag and leave it to the outside `AddFallbackHintRule`. + // We do not add tag and leave it to the outside `AddFallbackTagRule`. origin } else { addHint.apply(rewrittenPlan) - val hint = getFallbackHintBack(rewrittenPlan) + val hint = getFallbackTagBack(rewrittenPlan) if (hint.isDefined) { // If the rewritten plan is still not transformable, return the original plan. - FallbackHints.tag(origin, hint.get) + FallbackTags.tag(origin, hint.get) origin } else { rewrittenPlan.transformUp { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index d98b11ed884b..fef487cb830f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.validator import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi} import org.apache.gluten.expression.ExpressionUtils -import org.apache.gluten.extension.columnar.{FallbackHints, TRANSFORM_UNSUPPORTED} +import org.apache.gluten.extension.columnar.{FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution._ @@ -108,8 +108,8 @@ object Validators { private object FallbackByHint extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { - if (FallbackHints.isNotTransformable(plan)) { - val hint = FallbackHints.getHint(plan).asInstanceOf[TRANSFORM_UNSUPPORTED] + if (FallbackTags.nonEmpty(plan)) { + val hint = FallbackTags.getTag(plan).asInstanceOf[TRANSFORM_UNSUPPORTED] return fail(hint.reason.getOrElse("Reason not recorded")) } pass() diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index b8988a99057f..d41dce882602 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackHints, TRANSFORM_UNSUPPORTED} +import org.apache.gluten.extension.columnar.{FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.utils.LogLevelUtil import org.apache.spark.sql.SparkSession @@ -57,8 +57,8 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio val validationLogLevel = glutenConfig.validationLogLevel plan.foreachUp { case _: GlutenPlan => // ignore - case p: SparkPlan if FallbackHints.isNotTransformable(p) => - FallbackHints.getHint(p) match { + case p: SparkPlan if FallbackTags.nonEmpty(p) => + FallbackTags.getTag(p) match { case TRANSFORM_UNSUPPORTED(Some(reason), append) => logFallbackReason(validationLogLevel, p.nodeName, reason) // With in next round stage in AQE, the physical plan would be a new instance that diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala index 08a264524540..450b88163afc 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer, TransformSupport, WholeStageTransformer} import org.apache.gluten.execution.datasource.GlutenFormatWriterInjects -import org.apache.gluten.extension.columnar.AddFallbackHintRule +import org.apache.gluten.extension.columnar.AddFallbackTagRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager @@ -47,7 +47,7 @@ trait GlutenFormatWriterInjectsBase extends GlutenFormatWriterInjects { val rules = List( RewriteSparkPlanRulesManager(), - AddFallbackHintRule(), + AddFallbackTagRule(), TransformPreOverrides() ) val transformed = rules.foldLeft(plan) { case (latestPlan, rule) => rule.apply(latestPlan) } diff --git a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala index 95391a2c42f5..9b5665a5ba1d 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala @@ -44,7 +44,7 @@ abstract class WholeStageTransformerSuite protected val resourcePath: String protected val fileFormat: String - protected val logLevel: String = "WARN" + protected val logLevel: String = "info" protected val TPCHTables: Seq[Table] = Seq( Table("part", partitionColumns = "p_brand" :: Nil), @@ -305,6 +305,8 @@ abstract class WholeStageTransformerSuite df.cache() } try { + logWarning("gyytest") + logWarning("gyytest + " + df.collect()(0).toString()) if (compareResult) { checkAnswer(df, expected) } else { diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 8ad717efdcc7..b9c9d8a270bf 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackHints, TRANSFORM_UNSUPPORTED} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -124,10 +124,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - FallbackHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = FallbackHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index e1c0a6863a2c..8ce0af8df051 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackHints, TRANSFORM_UNSUPPORTED} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -125,10 +125,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - FallbackHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = FallbackHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index e1c0a6863a2c..8ce0af8df051 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.BasicScanExecTransformer import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackHints, TRANSFORM_UNSUPPORTED} +import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, FallbackTags, TRANSFORM_UNSUPPORTED} import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions import org.apache.gluten.utils.QueryPlanSelector @@ -125,10 +125,10 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait { testGluten("Tag not transformable more than once") { val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true)) - FallbackHints.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) + FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason"))) val rule = FallbackEmptySchemaRelation() val newPlan = rule.apply(originalPlan) - val reason = FallbackHints.getHint(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason + val reason = FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason assert(reason.isDefined) if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) { assert( From 35c9c8f248c3a5a12ef298236d60a979bdc9222a Mon Sep 17 00:00:00 2001 From: gaoyangxiaozhu Date: Tue, 2 Jul 2024 10:39:00 +0800 Subject: [PATCH 6/7] revert unneed debug change --- .../org/apache/gluten/execution/FunctionsValidateTest.scala | 2 -- .../apache/gluten/execution/WholeStageTransformerSuite.scala | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala index 0cb067ede12d..12f66278f70a 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FunctionsValidateTest.scala @@ -39,8 +39,6 @@ class FunctionsValidateTest extends WholeStageTransformerSuite { .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") .set("spark.sql.sources.useV1SourceList", "avro") - .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") - .set("spark.gluten.sql.columnar.backend.velox.glogVerboseLevel", "1") .set( "spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + "," + diff --git a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala index 9b5665a5ba1d..95391a2c42f5 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala @@ -44,7 +44,7 @@ abstract class WholeStageTransformerSuite protected val resourcePath: String protected val fileFormat: String - protected val logLevel: String = "info" + protected val logLevel: String = "WARN" protected val TPCHTables: Seq[Table] = Seq( Table("part", partitionColumns = "p_brand" :: Nil), @@ -305,8 +305,6 @@ abstract class WholeStageTransformerSuite df.cache() } try { - logWarning("gyytest") - logWarning("gyytest + " + df.collect()(0).toString()) if (compareResult) { checkAnswer(df, expected) } else { From d7644dab32ea6994deed7fe7ed685bde1636dfc6 Mon Sep 17 00:00:00 2001 From: gaoyangxiaozhu Date: Tue, 2 Jul 2024 14:23:44 +0800 Subject: [PATCH 7/7] rename fallbackHitRule to fallbackTagRule --- .../columnar/{FallbackHintRule.scala => FallbackTagRule.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename gluten-core/src/main/scala/org/apache/gluten/extension/columnar/{FallbackHintRule.scala => FallbackTagRule.scala} (100%) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala similarity index 100% rename from gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackHintRule.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala