diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 89ee96288bc3a..139b8b3131ff0 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -62,7 +62,6 @@ object CHRuleApi { injector.injectResolutionRule(spark => new RewriteToDateExpresstionRule(spark)) injector.injectResolutionRule(spark => new RewriteDateTimestampComparisonRule(spark)) injector.injectResolutionRule(spark => new CollapseGetJsonObjectExpressionRule(spark)) - injector.injectResolutionRule(spark => new RemoveUselessAttributesInDstinct(spark)) injector.injectOptimizerRule(spark => new CommonSubexpressionEliminateRule(spark)) injector.injectOptimizerRule(spark => new ExtendedColumnPruning(spark)) injector.injectOptimizerRule(spark => CHAggregateFunctionRewriteRule(spark)) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala index 48b0d73361030..eada5ae4381db 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala @@ -136,6 +136,18 @@ case class CHHashAggregateExecTransformer( } } + // CH does not support duplicate columns in a block. So there should not be duplicate attributes + // in child's output. + // There is an exception case, when a shuffle result is reused, the child's output may contain + // duplicate columns. It's mismatched with the the real output of CH. + protected lazy val childOutput: Seq[Attribute] = { + val distinctChildOutput = child.output.distinct + if (distinctChildOutput.length != child.output.length) { + logWarning(s"Found duplicate columns in child's output: ${child.output}") + } + distinctChildOutput + } + override protected def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) val operatorId = context.nextOperatorId(this.nodeName) @@ -168,12 +180,12 @@ case class CHHashAggregateExecTransformer( if (modes.isEmpty || modes.forall(_ == Complete)) { // When there is no aggregate function or there is complete mode, it does not need // to handle outputs according to the AggregateMode - for (attr <- child.output) { + for (attr <- childOutput) { typeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) nameList.add(ConverterUtils.genColumnNameWithExprId(attr)) nameList.addAll(ConverterUtils.collectStructFieldNames(attr.dataType)) } - (child.output, output) + (childOutput, output) } else if (!modes.contains(Partial)) { // non-partial mode var resultAttrIndex = 0 @@ -193,13 +205,13 @@ case class CHHashAggregateExecTransformer( (aggregateResultAttributes, output) } else { // partial mode - for (attr <- child.output) { + for (attr <- childOutput) { typeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) nameList.add(ConverterUtils.genColumnNameWithExprId(attr)) nameList.addAll(ConverterUtils.collectStructFieldNames(attr.dataType)) } - (child.output, aggregateResultAttributes) + (childOutput, aggregateResultAttributes) } } @@ -238,7 +250,7 @@ case class CHHashAggregateExecTransformer( // Use 'child.output' as based Seq[Attribute], the originalInputAttributes // may be different for each backend. val exprNode = ExpressionConverter - .replaceWithExpressionTransformer(expr, child.output) + .replaceWithExpressionTransformer(expr, childOutput) .doTransform(args) groupingList.add(exprNode) }) @@ -258,7 +270,7 @@ case class CHHashAggregateExecTransformer( aggExpr => { if (aggExpr.filter.isDefined) { val exprNode = ExpressionConverter - .replaceWithExpressionTransformer(aggExpr.filter.get, child.output) + .replaceWithExpressionTransformer(aggExpr.filter.get, childOutput) .doTransform(args) aggFilterList.add(exprNode) } else { @@ -272,7 +284,7 @@ case class CHHashAggregateExecTransformer( aggregateFunc.children.toList.map( expr => { ExpressionConverter - .replaceWithExpressionTransformer(expr, child.output) + .replaceWithExpressionTransformer(expr, childOutput) .doTransform(args) }) case PartialMerge if distinct_modes.contains(Partial) => diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala index 9d1ea8f2d0ad8..65a01dea30730 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala @@ -590,40 +590,5 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite { ) sql("drop table test_8142") } - - test("GLUTEN-8432 count(distinct) contains grouping keys") { - compareResultsAgainstVanillaSpark( - s""" - |select n_regionkey, n_nationkey, count(distinct n_name, n_nationkey, n_comment) as x - |from ( - | select - | n_regionkey, - | n_nationkey, - | if(n_nationkey = 0, null, n_name) as n_name, - | if(n_nationkey = 0, null, n_comment) as n_comment - | from nation - |) - |group by n_regionkey, n_nationkey - |order by n_regionkey, n_nationkey - |""".stripMargin, - true, - { df => } - ) - compareResultsAgainstVanillaSpark( - s""" - |select n_regionkey, n_nationkey, count(distinct n_nationkey) as x - |from ( - | select - | n_regionkey, - | if (n_nationkey = 0, null, n_nationkey) as n_nationkey - | from nation - |) - |group by n_regionkey, n_nationkey - |order by n_regionkey, n_nationkey - |""".stripMargin, - true, - { df => } - ) - } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala index 98144e41eba7f..1c627140b6940 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala @@ -407,40 +407,5 @@ class GlutenClickHouseTPCHParquetAQESuite assert(result.length == 1) } } - - test("GLUTEN-8432 count(distinct) contains grouping keys") { - compareResultsAgainstVanillaSpark( - s""" - |select n_regionkey, n_nationkey, count(distinct n_name, n_nationkey, n_comment) as x - |from ( - | select - | n_regionkey, - | n_nationkey, - | if(n_nationkey = 0, null, n_name) as n_name, - | if(n_nationkey = 0, null, n_comment) as n_comment - | from nation - |) - |group by n_regionkey, n_nationkey - |order by n_regionkey, n_nationkey - |""".stripMargin, - true, - { df => } - ) - compareResultsAgainstVanillaSpark( - s""" - |select n_regionkey, n_nationkey, count(distinct n_nationkey) as x - |from ( - | select - | n_regionkey, - | if (n_nationkey = 0, null, n_nationkey) as n_nationkey - | from nation - |) - |group by n_regionkey, n_nationkey - |order by n_regionkey, n_nationkey - |""".stripMargin, - true, - { df => } - ) - } } // scalastyle:off line.size.limit