From d42d80a80c5bff32a022023a7148f3b9e346d883 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 22 Oct 2024 16:49:10 +0800 Subject: [PATCH] Fix --- .../spark/sql/rapids/aggregate/GpuHLL.scala | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHLL.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHLL.scala index cd2da6789e0f..b303dd02d31c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHLL.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHLL.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.rapids.aggregate +import scala.collection.immutable.Seq + import ai.rapids.cudf import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation} import com.nvidia.spark.rapids._ @@ -25,24 +27,29 @@ import com.nvidia.spark.rapids.jni.HLL import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.catalyst.util.{GenericArrayData, HyperLogLogPlusPlusHelper} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch -case class CudfHLL(override val dataType: DataType) extends CudfAggregate { +case class CudfHLL(override val dataType: DataType, + numRegistersPerSketch: Int) extends CudfAggregate { override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = - (input: cudf.ColumnVector) => input.reduce(ReductionAggregation.HLL(), DType.LIST) - override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.HLL(32 * 1024) + (input: cudf.ColumnVector) => input.reduce( + ReductionAggregation.HLL(numRegistersPerSketch), DType.STRUCT) + override lazy val groupByAggregate: GroupByAggregation = + GroupByAggregation.HLL(numRegistersPerSketch) override val name: String = "CudfHLL" } -case class CudfMergeHLL(override val dataType: DataType) +case class CudfMergeHLL(override val dataType: DataType, + numRegistersPerSketch: Int) extends CudfAggregate { override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = (input: cudf.ColumnVector) => - input.reduce(ReductionAggregation.mergeHLL(), DType.LIST) + input.reduce(ReductionAggregation.mergeHLL(numRegistersPerSketch), DType.STRUCT) - override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHLL() + override lazy val groupByAggregate: GroupByAggregation = + GroupByAggregation.mergeHLL(numRegistersPerSketch) override val name: String = "CudfMergeHLL" } @@ -67,27 +74,45 @@ case class GpuHLLEvaluation(childExpr: Expression, precision: Int) } } -case class GpuHLL(childExpr: Expression, precision: Int) +case class GpuHLL(childExpr: Expression, relativeSD: Double) extends GpuAggregateFunction with Serializable { - // specify the HLL sketch type: list - private lazy val hllBufferType: DataType = ArrayType(ByteType, containsNull = false) + // Consistent with Spark + private lazy val numRegistersPerSketch: Int = + 1 << Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt + + // Consistent with Spark + private lazy val numLongs = numRegistersPerSketch / 10 + 1 + + // specify the HLL sketch type: struct + private lazy val hllBufferType: DataType = StructType.fromAttributes(aggBufferAttributes) private lazy val hllBufferAttribute: AttributeReference = AttributeReference("hllAttr", hllBufferType)() + // TODO: should be long array literal override lazy val initialValues: Seq[Expression] = Seq(GpuLiteral.create(new GenericArrayData(Array.ofDim[Byte](32 * 1024)), hllBufferType)) override lazy val inputProjection: Seq[Expression] = Seq(childExpr) - override lazy val updateAggregates: Seq[CudfAggregate] = Seq(CudfHLL(hllBufferType)) + override lazy val updateAggregates: Seq[CudfAggregate] = + Seq(CudfHLL(hllBufferType, numRegistersPerSketch)) - override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(CudfMergeHLL(hllBufferType)) + override lazy val mergeAggregates: Seq[CudfAggregate] = + Seq(CudfMergeHLL(hllBufferType, numRegistersPerSketch)) - override lazy val evaluateExpression: Expression = GpuHLLEvaluation(hllBufferAttribute, precision) + override lazy val evaluateExpression: Expression = + GpuHLLEvaluation(hllBufferAttribute, numRegistersPerSketch) - override def aggBufferAttributes: Seq[AttributeReference] = hllBufferAttribute :: Nil + private val hllppHelper = new HyperLogLogPlusPlusHelper(relativeSD) + + /** Allocate enough words to store all registers. */ + override val aggBufferAttributes: Seq[AttributeReference] = { + Seq.tabulate(hllppHelper.numWords) { i => + AttributeReference(s"MS[$i]", LongType)() + } + } override def dataType: DataType = hllBufferType