Skip to content

Commit

Permalink
[GLUTEN-8580][CORE][Part-2] Don't validate project generated by PushD…
Browse files Browse the repository at this point in the history
…ownInputFileExpression (apache#8585)
  • Loading branch information
zml1206 authored and baibaichen committed Feb 1, 2025
1 parent e9c6731 commit ad28655
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import org.apache.spark.sql.execution.SparkPlan
// Add fallback tags when validator returns negative outcome.
case class AddFallbackTags(validator: Validator) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
plan.foreachUp { case p => addFallbackTag(p) }
plan.foreachUp {
case p if FallbackTags.maybeOffloadable(p) => addFallbackTag(p)
case _ =>
}
plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ object PushDownInputFileExpression {
}
}

def addFallbackTag(plan: SparkPlan): SparkPlan = {
FallbackTags.add(plan, "fallback input file expression")
plan
}

object PreOffload extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case ProjectExec(projectList, child) if projectList.exists(containsInputFileRelatedExpr) =>
Expand Down Expand Up @@ -82,11 +87,11 @@ object PushDownInputFileExpression {
replacedExprs: mutable.Map[String, Alias]): SparkPlan =
plan match {
case p: LeafExecNode =>
ProjectExec(p.output ++ replacedExprs.values, p)
addFallbackTag(ProjectExec(p.output ++ replacedExprs.values, p))
// Output of SerializeFromObjectExec's child and output of DeserializeToObjectExec must be
// a single-field row.
case p @ (_: SerializeFromObjectExec | _: DeserializeToObjectExec) =>
ProjectExec(p.output ++ replacedExprs.values, p)
addFallbackTag(ProjectExec(p.output ++ replacedExprs.values, p))
case p: ProjectExec =>
p.copy(
projectList = p.projectList ++ replacedExprs.values.toSeq.map(_.toAttribute),
Expand Down Expand Up @@ -115,11 +120,13 @@ object PushDownInputFileExpression {
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
case p1 @ ProjectExec(_, p2: ProjectExec) if canCollapseProject(p2) =>
p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList))
addFallbackTag(
p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList)))
case p1 @ ProjectExecTransformer(_, p2: ProjectExec) if canCollapseProject(p1, p2) =>
p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList))
addFallbackTag(
p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList, p2.projectList)))
}

private def canCollapseProject(project: ProjectExec): Boolean = {
Expand Down

0 comments on commit ad28655

Please sign in to comment.