Skip to content

Conversation

karuppayya
Copy link
Contributor

@karuppayya karuppayya commented Oct 8, 2025

What changes were proposed in this pull request?

Adding ability to use ALPHA family for Theta Sketch

Why are the changes needed?

Theta sketch aggregate currently supports only quick select.
Consumers like Iceberg will benefit from the sketch aggregate if has the ability to use ALPHA family
Iceberg specification to use ALPHA sketches
Custom implementation of theta sketch aggregates in Iceberg that can be replaced with Spark Theta aggregates

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@karuppayya
Copy link
Contributor Author

@cboumalh @mkaravel @dtenedor @cloud-fan Can you help review?
cc: @aokolnychyi @huaxingao

@cboumalh
Copy link
Contributor

Hi @karuppayya. Thanks for the effort here. Is the goal to deprecate the aggregation implementation you linked above and call the one from here directly in the Iceberg code?

@karuppayya
Copy link
Contributor Author

karuppayya commented Oct 10, 2025

Yes, that's right. The goal is to deprecate the custom aggregation implementation(which was done since Spark didnt have the ability then) and have Iceberg call Spark's Theta aggregate/estimate functions directly.
I used Iceberg as a concrete example which could use this functionality when it becomes available, but this change will benefit consumers/users in general.

@cboumalh
Copy link
Contributor

cboumalh commented Oct 10, 2025

I see that makes sense. Just want to point out if the only need for iceberg is approximating count, Spark's HLL can achieve it at similar speeds (though not as fast) with a much smaller memory footprint. How would this work for your case?

@karuppayya
Copy link
Contributor Author

The selection of Alpha family sketch comes from the Iceberg Specification for NDV stats.
Changing this would break the interoperability guarantee that Iceberg provides across engines like Spark, Trino, and Flink.
I took Iceberg as an example, but having the flexibility in choosing the skecth family will benefit users in general.

@cboumalh
Copy link
Contributor

cboumalh commented Oct 10, 2025

Sounds good thank you, and yes for superior update speeds, the Alpha family is beneficial to users. I will review on my end, but don't have any write permissions. Thanks again for the info!

},
"THETA_INVALID_FAMILY" : {
"message" : [
"Invalid call to <function>; the `family` parameter must be one of: <validFamilies>. Got: <value>."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, but can we change Got: <value> to , got: <value> or , but got: <value> to stay consistent with the rest of the file?

* @param prettyName The display name of the function/expression for error messages
* @return The corresponding DataSketches Family enum value
*/
def parseFamily(familyName: String, prettyName: String): Family = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final val MAX_LG_NOM_LONGS = 26
final val DEFAULT_LG_NOM_LONGS = 12

// Family constants for ThetaSketch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to above with the nominal values, can we add a small explanation to what the difference is here between both families

* @param child
* child expression against which unique counting will occur
* @param right
* @param lgNomEntriesExpr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider making the parameter renaming update in this whole file to be consistent and here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/thetasketchesExpressions.scala


// Constructors
private lazy val family: Family =
ThetaSketchUtils.parseFamily(familyExpr.eval().asInstanceOf[UTF8String].toString, prettyName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, we can break this up similar to lgNomEntries for readability

override val mutableAggBufferOffset: Int,
override val inputAggBufferOffset: Int)
extends TypedImperativeAggregate[ThetaSketchState]
with BinaryLike[Expression]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use TernaryLike here instead of removing this trait altogether

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchAgg =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override protected def withNewChildrenInternal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update this to work with TernaryLike's withNewChildrenInternal

* @since 4.1.0
*/
def theta_sketch_agg(e: Column, family: String): Column =
theta_sketch_agg(e, 12, family)
Copy link
Contributor

@cboumalh cboumalh Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded 12 matches the Catalyst default, so behaviorally it’s fine. Still, duplicating an internal default in the public layer is a bit awkward .We could consider making this explicit (functions above preferred) or defining a local constant for clarity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants