-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53848] Add ability to support Alpha family in Theta Aggregates #52551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
d59f6df
to
c1d4fe8
Compare
c1d4fe8
to
13aa686
Compare
f2f3cb6
to
47ef17e
Compare
@cboumalh @mkaravel @dtenedor @cloud-fan Can you help review? |
Hi @karuppayya. Thanks for the effort here. Is the goal to deprecate the aggregation implementation you linked above and call the one from here directly in the Iceberg code? |
Yes, that's right. The goal is to deprecate the custom aggregation implementation(which was done since Spark didnt have the ability then) and have Iceberg call Spark's Theta aggregate/estimate functions directly. |
I see that makes sense. Just want to point out if the only need for iceberg is approximating count, Spark's HLL can achieve it at similar speeds (though not as fast) with a much smaller memory footprint. How would this work for your case? |
The selection of Alpha family sketch comes from the Iceberg Specification for NDV stats. |
Sounds good thank you, and yes for superior update speeds, the Alpha family is beneficial to users. I will review on my end, but don't have any write permissions. Thanks again for the info! |
}, | ||
"THETA_INVALID_FAMILY" : { | ||
"message" : [ | ||
"Invalid call to <function>; the `family` parameter must be one of: <validFamilies>. Got: <value>." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit, but can we change Got: <value>
to , got: <value>
or , but got: <value>
to stay consistent with the rest of the file?
* @param prettyName The display name of the function/expression for error messages | ||
* @return The corresponding DataSketches Family enum value | ||
*/ | ||
def parseFamily(familyName: String, prettyName: String): Family = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test for this new function here: https://github.com/cboumalh/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ThetaSketchUtilsSuite.scala
final val MAX_LG_NOM_LONGS = 26 | ||
final val DEFAULT_LG_NOM_LONGS = 12 | ||
|
||
// Family constants for ThetaSketch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above with the nominal values, can we add a small explanation to what the difference is here between both families
* @param child | ||
* child expression against which unique counting will occur | ||
* @param right | ||
* @param lgNomEntriesExpr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider making the parameter renaming update in this whole file to be consistent and here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/thetasketchesExpressions.scala
|
||
// Constructors | ||
private lazy val family: Family = | ||
ThetaSketchUtils.parseFamily(familyExpr.eval().asInstanceOf[UTF8String].toString, prettyName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, we can break this up similar to lgNomEntries
for readability
override val mutableAggBufferOffset: Int, | ||
override val inputAggBufferOffset: Int) | ||
extends TypedImperativeAggregate[ThetaSketchState] | ||
with BinaryLike[Expression] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use TernaryLike
here instead of removing this trait altogether
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchAgg = | ||
copy(inputAggBufferOffset = newInputAggBufferOffset) | ||
|
||
override protected def withNewChildrenInternal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's update this to work with TernaryLike
's withNewChildrenInternal
* @since 4.1.0 | ||
*/ | ||
def theta_sketch_agg(e: Column, family: String): Column = | ||
theta_sketch_agg(e, 12, family) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded 12 matches the Catalyst default, so behaviorally it’s fine. Still, duplicating an internal default in the public layer is a bit awkward .We could consider making this explicit (functions above preferred) or defining a local constant for clarity.
What changes were proposed in this pull request?
Adding ability to use ALPHA family for Theta Sketch
Why are the changes needed?
Theta sketch aggregate currently supports only quick select.
Consumers like Iceberg will benefit from the sketch aggregate if has the ability to use
ALPHA family
Iceberg specification to use ALPHA sketches
Custom implementation of theta sketch aggregates in Iceberg that can be replaced with Spark Theta aggregates
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
No