-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[Data] Add approximate quantile to aggregator #57598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
024f199
f983338
93a531b
153a44c
891dccb
b0e9923
a3978b4
47d8d7b
47a89f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,3 +27,5 @@ compute aggregations. | |
Unique | ||
MissingValuePercentage | ||
ZeroPercentage | ||
ApproximateQuantile | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1189,3 +1189,104 @@ def finalize(self, accumulator: List[int]) -> Optional[float]: | |
if accumulator[1] == 0: | ||
return None | ||
return (accumulator[0] / accumulator[1]) * 100.0 | ||
|
||
|
||
@PublicAPI(stability="alpha") | ||
class ApproximateQuantile(AggregateFnV2): | ||
def _require_datasketches(self): | ||
try: | ||
from datasketches import kll_floats_sketch # type: ignore[import] | ||
except ImportError as exc: | ||
raise ImportError( | ||
"ApproximateQuantile requires the `datasketches` package. " | ||
"Install it with `pip install datasketches`." | ||
) from exc | ||
return kll_floats_sketch | ||
|
||
def __init__( | ||
self, | ||
on: str, | ||
quantiles: List[float], | ||
k: int = 800, | ||
alias_name: Optional[str] = None, | ||
): | ||
""" | ||
Computes the approximate quantiles of a column by using a datasketches kll_floats_sketch. | ||
https://datasketches.apache.org/docs/KLL/KLLSketch.html | ||
|
||
The accuracy of the KLL quantile sketch is a function of the configured K, which also affects | ||
the overall size of the sketch. | ||
The KLL Sketch has absolute error. For example, a specified rank accuracy of 1% at the | ||
median (rank = 0.50) means that the true quantile (if you could extract it from the set) | ||
should be between getQuantile(0.49) and getQuantile(0.51). This same 1% error applied at a | ||
rank of 0.95 means that the true quantile should be between getQuantile(0.94) and getQuantile(0.96). | ||
In other words, the error is a fixed +/- epsilon for the entire range of ranks. | ||
|
||
Typical single-sided rank error by k (use for getQuantile/getRank): | ||
- k=100 → ~2.61% | ||
- k=200 → ~1.33% | ||
- k=400 → ~0.68% | ||
- k=800 → ~0.35% | ||
|
||
See https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html for details on accuracy and size. | ||
|
||
Null values in the target column are ignored when constructing the sketch. | ||
|
||
Example: | ||
|
||
.. testcode:: | ||
|
||
import ray | ||
from ray.data.aggregate import ApproximateQuantile | ||
|
||
# Create a dataset with some values | ||
ds = ray.data.from_items( | ||
[{"value": 20.0}, {"value": 40.0}, {"value": 60.0}, | ||
{"value": 80.0}, {"value": 100.0}] | ||
) | ||
|
||
result = ds.aggregate(ApproximateQuantile(on="value", quantiles=[0.1, 0.5, 0.9])) | ||
# Result: {'approx_quantile(value)': [20.0, 60.0, 100.0]} | ||
|
||
|
||
Args: | ||
on: The name of the column to calculate the quantile on. Must be a numeric column. | ||
quantiles: The list of quantiles to compute. Must be between 0 and 1 inclusive. For example, quantiles=[0.5] computes the median. Null entries in the source column are skipped. | ||
k: Controls the accuracy and memory footprint of the sketch; higher k yields lower error but uses more memory. Defaults to 800. See https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html for details on accuracy and size. | ||
alias_name: Optional name for the resulting column. If not provided, defaults to "approx_quantile({column_name})". | ||
""" | ||
self._require_datasketches() | ||
self._quantiles = quantiles | ||
self._k = k | ||
super().__init__( | ||
alias_name if alias_name else f"approx_quantile({str(on)})", | ||
on=on, | ||
ignore_nulls=True, | ||
zero_factory=lambda: self.zero(k).serialize(), | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Quantile Initialization Error and Inconsistent Parameter UsageThe |
||
|
||
def zero(self, k: int): | ||
sketch_cls = self._require_datasketches() | ||
return sketch_cls(k=k) | ||
|
||
def aggregate_block(self, block: Block) -> bytes: | ||
block_acc = BlockAccessor.for_block(block) | ||
table = block_acc.to_arrow() | ||
column = table.column(self.get_target_column()) | ||
sketch = self.zero(self._k) | ||
for value in column: | ||
# we ignore nulls here | ||
if value.as_py() is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because we will get this error when the value is none. def test_approximate_quantile_ignores_nulls(self, ray_start_regular_shared_2_cpus):
data = [
{"id": 1, "value": 5.0},
{"id": 2, "value": None},
{"id": 3, "value": 15.0},
{"id": 4, "value": None},
{"id": 5, "value": 25.0},
]
ds = ray.data.from_items(data)
result = ds.aggregate(ApproximateQuantile(on="value", quantiles=[0.5]))
assert result["approx_quantile(value)"] == [15.0]
|
||
sketch.update(float(value.as_py())) | ||
return sketch.serialize() | ||
|
||
def combine(self, current_accumulator: bytes, new: bytes) -> bytes: | ||
combined = self.zero(self._k) | ||
sketch_cls = self._require_datasketches() | ||
combined.merge(sketch_cls.deserialize(current_accumulator)) | ||
combined.merge(sketch_cls.deserialize(new)) | ||
return combined.serialize() | ||
|
||
def finalize(self, accumulator: bytes) -> List[float]: | ||
sketch_cls = self._require_datasketches() | ||
return sketch_cls.deserialize(accumulator).get_quantiles(self._quantiles) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,3 +23,4 @@ pyiceberg[sql-sqlite]==0.9.0 | |
clickhouse-connect | ||
pybase64 | ||
hudi==0.4.0 | ||
datasketches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of
k
, let's usecapacity_per_level
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
capacity_per_level
does not feel accurate to me, I think maybe we don't need to hide the detail of k, since user will need to see the doc fromdatasketches
anyway.I added link to k params description to guide users to the doc for more info.