diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index d7fd941b97be..36fae86ea9de 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -98,6 +98,13 @@ def _get_overflow_df(spark, data, data_type, expr): StructType([StructField('a', data_type)]) ).selectExpr(expr) +def test_hll(): + assert_gpu_and_cpu_are_equal_sql( + lambda spark : spark.read.parquet("/home/chongg/a"), + "tab", + "select c1, APPROX_COUNT_DISTINCT(c1) from tab group by c1" + ) + @pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn) @disable_ansi_mode def test_addition(data_gen): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 97440388c9f8..2ecb748b3c3e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3918,6 +3918,16 @@ object GpuOverrides extends Logging { GpuDynamicPruningExpression(child) } }), + expr[HyperLogLogPlusPlus]( + "Aggregation approximate count distinct", + ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG, + Seq(ParamCheck("input", TypeSig.all, TypeSig.all))), + (a, conf, p, r) => new UnaryExprMeta[HyperLogLogPlusPlus](a, conf, p, r) { + override def convertToGpu(child: Expression): GpuExpression = { + GpuHLL(child, 0) + } + } + ), SparkShimImpl.ansiCastRule ).collect { case r if r != null => (r.getClassFor.asSubclass(classOf[Expression]), r)}.toMap diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHLL.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHLL.scala new file mode 100644 index 000000000000..cd2da6789e0f --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHLL.scala @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.aggregate + +import ai.rapids.cudf +import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation} +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.withResourceIfAllowed +import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression +import com.nvidia.spark.rapids.jni.HLL +import com.nvidia.spark.rapids.shims.ShimExpression + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class CudfHLL(override val dataType: DataType) extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => input.reduce(ReductionAggregation.HLL(), DType.LIST) + override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.HLL(32 * 1024) + override val name: String = "CudfHLL" +} + +case class CudfMergeHLL(override val dataType: DataType) + extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => + input.reduce(ReductionAggregation.mergeHLL(), DType.LIST) + + override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHLL() + override val name: String = "CudfMergeHLL" +} + +/** + * Perform the final evaluation step to compute approximate count distinct from sketches. + */ +case class GpuHLLEvaluation(childExpr: Expression, precision: Int) + extends GpuExpression with ShimExpression { + override def dataType: DataType = LongType + + override def prettyName: String = "HLL_evaluation" + + override def nullable: Boolean = false + + override def children: Seq[Expression] = Seq(childExpr) + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches => + val distinctValues = HLL.estimateDistinctValueFromSketches(sketches.getBase, precision) + GpuColumnVector.from(distinctValues, LongType) + } + } +} + +case class GpuHLL(childExpr: Expression, precision: Int) + extends GpuAggregateFunction with Serializable { + + // specify the HLL sketch type: list + private lazy val hllBufferType: DataType = ArrayType(ByteType, containsNull = false) + + private lazy val hllBufferAttribute: AttributeReference = + AttributeReference("hllAttr", hllBufferType)() + + override lazy val initialValues: Seq[Expression] = + Seq(GpuLiteral.create(new GenericArrayData(Array.ofDim[Byte](32 * 1024)), hllBufferType)) + + override lazy val inputProjection: Seq[Expression] = Seq(childExpr) + + override lazy val updateAggregates: Seq[CudfAggregate] = Seq(CudfHLL(hllBufferType)) + + override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(CudfMergeHLL(hllBufferType)) + + override lazy val evaluateExpression: Expression = GpuHLLEvaluation(hllBufferAttribute, precision) + + override def aggBufferAttributes: Seq[AttributeReference] = hllBufferAttribute :: Nil + + override def dataType: DataType = hllBufferType + + override def prettyName: String = "approx_count_distinct" + + override def nullable: Boolean = true + + override def children: Seq[Expression] = Seq(childExpr) +}