Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Jan 16, 2025
1 parent d83f15a commit 9b4c0af
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,6 @@ case class CHHashAggregateExecTransformer(
// There is an exception case, when a shuffle result is reused, the child's output may contain
// duplicate columns. It's mismatched with the the real output of CH.
protected lazy val childOutput: Seq[Attribute] = {
val distinctChildOutput = child.output.distinct
if (distinctChildOutput.length != child.output.length) {
logWarning(s"Found duplicate columns in child's output: ${child.output}\n$child")
}
// distinctChildOutput
child.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.CHColumnarToRowExec
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.exchange._

/*
* CH doesn't support will for duplicate columns in the a block.
Expand Down Expand Up @@ -51,8 +53,34 @@ case class RemoveDuplicatedColumns(session: SparkSession) extends Rule[SparkPlan
}
case hashAgg: CHHashAggregateExecTransformer =>
val newChildren = hashAgg.children.map(visitPlan)
val newHashAgg = uniqueHashAggregateColumns(hashAgg)
newHashAgg.withNewChildren(newChildren)
var newHashAgg = uniqueHashAggregateColumns(hashAgg)
newHashAgg =
newHashAgg.withNewChildren(newChildren).asInstanceOf[CHHashAggregateExecTransformer]
newHashAgg.child match {
case aqeShuffleRead @ AQEShuffleReadExec(
child @ ShuffleQueryStageExec(
id,
reusedShuffle @ ReusedExchangeExec(output, shuffle: ColumnarShuffleExchangeExec),
canonicalized),
partitionSpecs) =>
if (output.length != shuffle.output.length) {
// reused exchange may remain duplicate columns in the output, even its child has
// removed the duplicate columns. In design, reused exchange's output could be
// different from its child, so we cannot use the child's output as the output of the
// reused exchange directly.
// TODO: we cannot build a UT for this case.
val uniqueOutput = uniqueExpressions(output.map(_.asInstanceOf[NamedExpression]))
.map(_.asInstanceOf[Attribute])
val newReusedShuffle = ReusedExchangeExec(uniqueOutput, shuffle)
val newChild = AQEShuffleReadExec(
ShuffleQueryStageExec(id, newReusedShuffle, canonicalized),
partitionSpecs)
newHashAgg.copy(child = newChild)
} else {
newHashAgg
}
case _ => newHashAgg
}
case _ =>
plan.withNewChildren(plan.children.map(visitPlan))
}
Expand Down

0 comments on commit 9b4c0af

Please sign in to comment.