From 472f7843d9847151026f278d989bd635a14866cf Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 16 Jan 2025 16:22:38 +0800 Subject: [PATCH] update --- .../CHHashAggregateExecTransformer.scala | 5 --- .../extension/RemoveDuplicatedColumns.scala | 33 +++++++++++++++++-- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala index 40381a60567a5..a3f97492927f4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala @@ -141,11 +141,6 @@ case class CHHashAggregateExecTransformer( // There is an exception case, when a shuffle result is reused, the child's output may contain // duplicate columns. It's mismatched with the the real output of CH. protected lazy val childOutput: Seq[Attribute] = { - val distinctChildOutput = child.output.distinct - if (distinctChildOutput.length != child.output.length) { - logWarning(s"Found duplicate columns in child's output: ${child.output}\n$child") - } - // distinctChildOutput child.output } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RemoveDuplicatedColumns.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RemoveDuplicatedColumns.scala index 7f378b5a41a0b..a259a1cb0e9b3 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RemoveDuplicatedColumns.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RemoveDuplicatedColumns.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.CHColumnarToRowExec +import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.exchange._ /* * CH doesn't support will for duplicate columns in the a block. @@ -51,8 +53,35 @@ case class RemoveDuplicatedColumns(session: SparkSession) extends Rule[SparkPlan } case hashAgg: CHHashAggregateExecTransformer => val newChildren = hashAgg.children.map(visitPlan) - val newHashAgg = uniqueHashAggregateColumns(hashAgg) - newHashAgg.withNewChildren(newChildren) + var newHashAgg = uniqueHashAggregateColumns(hashAgg) + newHashAgg = + newHashAgg.withNewChildren(newChildren).asInstanceOf[CHHashAggregateExecTransformer] + newHashAgg.child match { + case aqeShuffleRead @ AQEShuffleReadExec( + child @ ShuffleQueryStageExec( + id, + reusedShuffle @ ReusedExchangeExec(output, shuffle: ColumnarShuffleExchangeExec), + canonicalized), + partitionSpecs) => + if (output.length != shuffle.output.length) { + // reused exchange may remain duplicate columns in the output, event its child has + // removed the duplicate columns. In design, reused exchange's output could be + // different from its child, so we cannot use the child's output as the output of the + // reused exchange directly. + // TODO: we cannot build a UT for this case. + val uniqueOutput = uniqueExpressions(output.map(_.asInstanceOf[NamedExpression])) + .map(_.asInstanceOf[Attribute]) + val newReusedShuffle = ReusedExchangeExec(uniqueOutput, shuffle) + val newChild = AQEShuffleReadExec( + ShuffleQueryStageExec(id, newReusedShuffle, canonicalized), + partitionSpecs) + // newHashAgg.copy(child = newChild) + newHashAgg + } else { + newHashAgg + } + case _ => newHashAgg + } case _ => plan.withNewChildren(plan.children.map(visitPlan)) }