From 024f199a6c299e3c68910e1b1d018a26957b4f7c Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Thu, 9 Oct 2025 23:55:12 +0000 Subject: [PATCH 1/9] use test deps for datasketches Signed-off-by: You-Cheng Lin (Owen) --- python/ray/data/aggregate.py | 88 +++++++++++++++ python/ray/data/stats.py | 3 + python/ray/data/tests/test_custom_agg.py | 100 +++++++++++++++++- python/ray/data/tests/test_dataset_stats.py | 40 +++---- .../ml/data-test-requirements.txt | 1 + 5 files changed, 213 insertions(+), 19 deletions(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index 45415d53773f..c24ad87c1ff0 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -4,6 +4,7 @@ import numpy as np import pyarrow.compute as pc +from datasketches import kll_floats_sketch from ray.data._internal.util import is_null from ray.data.block import ( @@ -1189,3 +1190,90 @@ def finalize(self, accumulator: List[int]) -> Optional[float]: if accumulator[1] == 0: return None return (accumulator[0] / accumulator[1]) * 100.0 + + +class ApproximateQuantile(AggregateFnV2): + def __init__( + self, + on: str, + quantiles: List[float], + k: int = 800, + alias_name: Optional[str] = None, + ): + """ + Computes the approximate quantiles of a column by using a datasketches kll_floats_sketch. + https://datasketches.apache.org/docs/KLL/KLLOverview.html + + The accuracy of the KLL quantile sketch is a function of the configured K, which also affects + the overall size of the sketch. + The KLL Sketch has absolute error. For example, a specified rank accuracy of 1% at the + median (rank = 0.50) means that the true quantile (if you could extract it from the set) + should be between getQuantile(0.49) and getQuantile(0.51). This same 1% error applied at a + rank of 0.95 means that the true quantile should be between getQuantile(0.94) and getQuantile(0.96). + In other words, the error is a fixed +/- epsilon for the entire range of ranks. + + Typical single-sided rank error by k (use for getQuantile/getRank): + - k=100 → ~2.61% + - k=200 → ~1.33% + - k=400 → ~0.68% + - k=800 → ~0.35% + + See https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html for details on accuracy and size. + + Null values in the target column are ignored when constructing the sketch. + + Example: + + .. testcode:: + + import ray + from ray.data.aggregate import ApproximateQuantile + + # Create a dataset with some values + ds = ray.data.from_items( + [{"value": 20.0}, {"value": 40.0}, {"value": 60.0}, + {"value": 80.0}, {"value": 100.0}] + ) + + result = ds.aggregate(ApproximateQuantile(on="value", quantiles=[0.1, 0.5, 0.9])) + # Result: {'approx_quantile(value)': [20.0, 60.0, 100.0]} + + + Args: + on: The name of the column to calculate the quantile on. Must be a numeric column. + quantiles: The list of quantiles to compute. Must be between 0 and 1 inclusive. For example, quantiles=[0.5] computes the median. Null entries in the source column are skipped. + k: Controls the accuracy and memory footprint of the sketch; higher k yields lower error but uses more memory. Defaults to 800. + alias_name: Optional name for the resulting column. If not provided, defaults to "approx_quantile({column_name})". + """ + self._quantiles = quantiles + self._k = k + super().__init__( + alias_name if alias_name else f"approx_quantile({str(on)})", + on=on, + ignore_nulls=True, + zero_factory=lambda: ApproximateQuantile.zero(k).serialize(), + ) + + @staticmethod + def zero(k: int): + return kll_floats_sketch(k=k) + + def aggregate_block(self, block: Block) -> bytes: + block_acc = BlockAccessor.for_block(block) + table = block_acc.to_arrow() + column = table.column(self.get_target_column()) + sketch = ApproximateQuantile.zero(self._k) + for value in column: + # we ignore nulls here + if value.as_py() is not None: + sketch.update(float(value.as_py())) + return sketch.serialize() + + def combine(self, current_accumulator: bytes, new: bytes) -> bytes: + combined = ApproximateQuantile.zero(self._k) + combined.merge(kll_floats_sketch.deserialize(current_accumulator)) + combined.merge(kll_floats_sketch.deserialize(new)) + return combined.serialize() + + def finalize(self, accumulator: bytes) -> List[float]: + return kll_floats_sketch.deserialize(accumulator).get_quantiles(self._quantiles) diff --git a/python/ray/data/stats.py b/python/ray/data/stats.py index ce832674a72f..3e9f14f2b33d 100644 --- a/python/ray/data/stats.py +++ b/python/ray/data/stats.py @@ -6,6 +6,7 @@ from ray.data.aggregate import ( AggregateFnV2, + ApproximateQuantile, Count, Max, Mean, @@ -31,6 +32,7 @@ def numerical_aggregators(column: str) -> List[AggregateFnV2]: - min - max - std + - approximate_quantile - missing_value_percentage - zero_percentage @@ -46,6 +48,7 @@ def numerical_aggregators(column: str) -> List[AggregateFnV2]: Min(on=column, ignore_nulls=True), Max(on=column, ignore_nulls=True), Std(on=column, ignore_nulls=True, ddof=0), + ApproximateQuantile(on=column, quantiles=[0.5]), MissingValuePercentage(on=column), ZeroPercentage(on=column, ignore_nulls=True), ] diff --git a/python/ray/data/tests/test_custom_agg.py b/python/ray/data/tests/test_custom_agg.py index bbb427566988..9afe3df0a072 100644 --- a/python/ray/data/tests/test_custom_agg.py +++ b/python/ray/data/tests/test_custom_agg.py @@ -2,7 +2,11 @@ import pytest import ray -from ray.data.aggregate import MissingValuePercentage, ZeroPercentage +from ray.data.aggregate import ( + ApproximateQuantile, + MissingValuePercentage, + ZeroPercentage, +) from ray.data.tests.conftest import * # noqa from ray.tests.conftest import * # noqa @@ -276,6 +280,100 @@ def test_zero_percentage_negative_values(self, ray_start_regular_shared_2_cpus): assert result["zero_pct(value)"] == expected +class TestApproximateQuantile: + """Test cases for ApproximateQuantile aggregation.""" + + def test_approximate_quantile_basic(self, ray_start_regular_shared_2_cpus): + """Test basic approximate quantile calculation.""" + data = [ + { + "id": 1, + "value": 10, + }, + {"id": 2, "value": 0}, + {"id": 3, "value": 30}, + {"id": 4, "value": 0}, + {"id": 5, "value": 50}, + ] + ds = ray.data.from_items(data) + + result = ds.aggregate( + ApproximateQuantile(on="value", quantiles=[0.1, 0.5, 0.9]) + ) + expected = [0.0, 10.0, 50.0] + assert result["approx_quantile(value)"] == expected + + def test_approximate_quantile_ignores_nulls(self, ray_start_regular_shared_2_cpus): + data = [ + {"id": 1, "value": 5.0}, + {"id": 2, "value": None}, + {"id": 3, "value": 15.0}, + {"id": 4, "value": None}, + {"id": 5, "value": 25.0}, + ] + ds = ray.data.from_items(data) + + result = ds.aggregate(ApproximateQuantile(on="value", quantiles=[0.5])) + assert result["approx_quantile(value)"] == [15.0] + + def test_approximate_quantile_custom_alias(self, ray_start_regular_shared_2_cpus): + data = [ + {"id": 1, "value": 1.0}, + {"id": 2, "value": 3.0}, + {"id": 3, "value": 5.0}, + {"id": 4, "value": 7.0}, + {"id": 5, "value": 9.0}, + ] + ds = ray.data.from_items(data) + + quantiles = [0.0, 1.0] + result = ds.aggregate( + ApproximateQuantile( + on="value", quantiles=quantiles, alias_name="value_range" + ) + ) + + assert result["value_range"] == [1.0, 9.0] + assert len(result["value_range"]) == len(quantiles) + + def test_approximate_quantile_groupby(self, ray_start_regular_shared_2_cpus): + data = [ + {"group": "A", "value": 1.0}, + {"group": "A", "value": 2.0}, + {"group": "A", "value": 3.0}, + {"group": "B", "value": 10.0}, + {"group": "B", "value": 20.0}, + {"group": "B", "value": 30.0}, + ] + ds = ray.data.from_items(data) + + result = ( + ds.groupby("group") + .aggregate(ApproximateQuantile(on="value", quantiles=[0.5])) + .take_all() + ) + + result_by_group = { + row["group"]: row["approx_quantile(value)"] for row in result + } + + assert result_by_group["A"] == [2.0] + assert result_by_group["B"] == [20.0] + + def test_approximate_quantile_respects_k(self, ray_start_regular_shared_2_cpus): + data = [{"id": i, "value": float(i)} for i in range(1000)] + ds = ray.data.from_items(data) + + result = ds.aggregate( + ApproximateQuantile(on="value", quantiles=[0.25, 0.5, 0.75], k=200) + ) + + expected = np.quantile(np.arange(1000, dtype=float), [0.25, 0.5, 0.75]) + actual = result["approx_quantile(value)"] + for actual_val, expected_val in zip(actual, expected): + assert abs(actual_val - expected_val) <= 1.0 + + if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_dataset_stats.py b/python/ray/data/tests/test_dataset_stats.py index d83516c18d90..b96a68602548 100644 --- a/python/ray/data/tests/test_dataset_stats.py +++ b/python/ray/data/tests/test_dataset_stats.py @@ -4,6 +4,7 @@ import ray from ray.data.aggregate import ( + ApproximateQuantile, Count, Max, Mean, @@ -51,8 +52,9 @@ def test_numerical_columns_detection(self): assert len(feature_aggs.vector_columns) == 0 # Check that we have the right number of aggregators - # 3 numerical columns * 7 aggregators each + 1 string column * 2 aggregators = 23 total - assert len(feature_aggs.aggregators) == 23 + # 3 numerical columns * 8 aggregators each + 1 string column * 2 aggregators = 23 total + print(feature_aggs.aggregators) + assert len(feature_aggs.aggregators) == 26 def test_categorical_columns_detection(self): """Test that string columns are correctly identified as categorical.""" @@ -74,8 +76,8 @@ def test_categorical_columns_detection(self): assert "value" in feature_aggs.numerical_columns assert "category" not in feature_aggs.numerical_columns - # Check aggregator count: 1 numerical * 7 + 2 categorical * 2 = 11 - assert len(feature_aggs.aggregators) == 11 + # Check aggregator count: 1 numerical * 8 + 2 categorical * 2 = 11 + assert len(feature_aggs.aggregators) == 12 def test_vector_columns_detection(self): """Test that list columns are correctly identified as vector columns.""" @@ -97,8 +99,8 @@ def test_vector_columns_detection(self): assert "scalar" in feature_aggs.numerical_columns assert "text" in feature_aggs.str_columns - # Check aggregator count: 1 numerical * 7 + 1 categorical * 2 + 1 vector * 2 = 11 - assert len(feature_aggs.aggregators) == 11 + # Check aggregator count: 1 numerical * 8 + 1 categorical * 2 + 1 vector * 2 = 12 + assert len(feature_aggs.aggregators) == 12 def test_mixed_column_types(self): """Test dataset with all column types mixed together.""" @@ -130,8 +132,8 @@ def test_mixed_column_types(self): # bool_val should be treated as numerical (integer-like) assert "bool_val" in feature_aggs.numerical_columns - # Check aggregator count: 3 numerical * 7 + 1 categorical * 2 + 1 vector * 2 = 25 - assert len(feature_aggs.aggregators) == 25 + # Check aggregator count: 3 numerical * 8 + 1 categorical * 2 + 1 vector * 2 = 25 + assert len(feature_aggs.aggregators) == 28 def test_column_filtering(self): """Test that only specified columns are included when columns parameter is provided.""" @@ -151,8 +153,8 @@ def test_column_filtering(self): assert "col3" in feature_aggs.vector_columns assert "col4" not in feature_aggs.numerical_columns - # Check aggregator count: 1 numerical * 7 + 1 vector * 2 = 9 - assert len(feature_aggs.aggregators) == 9 + # Check aggregator count: 1 numerical * 8 + 1 vector * 2 = 9 + assert len(feature_aggs.aggregators) == 10 def test_empty_dataset_schema(self): """Test behavior with empty dataset that has no schema.""" @@ -199,8 +201,8 @@ def test_unsupported_column_types(self): assert "unsupported_binary" not in feature_aggs.str_columns assert "unsupported_binary" not in feature_aggs.vector_columns - # Check aggregator count: 1 numerical * 7 + 1 categorical * 2 = 9 - assert len(feature_aggs.aggregators) == 9 + # Check aggregator count: 1 numerical * 8 + 1 categorical * 2 = 10 + assert len(feature_aggs.aggregators) == 10 def test_aggregator_types_verification(self): """Test that the correct aggregator types are generated for each column type.""" @@ -215,9 +217,9 @@ def test_aggregator_types_verification(self): # Check that we have the right types of aggregators agg_names = [agg.name for agg in feature_aggs.aggregators] - # Numerical aggregators should include all 7 types + # Numerical aggregators should include all 8 types num_agg_names = [name for name in agg_names if "num" in name] - assert len(num_agg_names) == 7 + assert len(num_agg_names) == 8 assert any("count" in name.lower() for name in num_agg_names) assert any("mean" in name.lower() for name in num_agg_names) assert any("min" in name.lower() for name in num_agg_names) @@ -225,6 +227,7 @@ def test_aggregator_types_verification(self): assert any("std" in name.lower() for name in num_agg_names) assert any("missing" in name.lower() for name in num_agg_names) assert any("zero" in name.lower() for name in num_agg_names) + assert any("approx_quantile" in name.lower() for name in num_agg_names) # Categorical aggregators should include count and missing percentage cat_agg_names = [name for name in agg_names if "cat" in name] @@ -246,7 +249,7 @@ def test_aggregator_instances_verification(self): # Find aggregators for the numerical column num_aggs = [agg for agg in feature_aggs.aggregators if "num" in agg.name] - assert len(num_aggs) == 7 + assert len(num_aggs) == 8 # Check that we have the right aggregator types agg_types = [type(agg) for agg in num_aggs] @@ -257,6 +260,7 @@ def test_aggregator_instances_verification(self): assert Std in agg_types assert MissingValuePercentage in agg_types assert ZeroPercentage in agg_types + assert ApproximateQuantile in agg_types # Find aggregators for the categorical column cat_aggs = [agg for agg in feature_aggs.aggregators if "cat" in agg.name] @@ -352,8 +356,8 @@ def test_large_dataset_performance(self): assert "category" in feature_aggs.str_columns assert "vector" in feature_aggs.vector_columns - # Check aggregator count: 2 numerical * 7 + 1 categorical * 2 + 1 vector * 2 = 18 - assert len(feature_aggs.aggregators) == 18 + # Check aggregator count: 2 numerical * 8 + 1 categorical * 2 + 1 vector * 2 = 18 + assert len(feature_aggs.aggregators) == 20 class TestIndividualAggregatorFunctions: @@ -363,7 +367,7 @@ def test_numerical_aggregators(self): """Test numerical_aggregators function.""" aggs = numerical_aggregators("test_column") - assert len(aggs) == 7 + assert len(aggs) == 8 assert all(hasattr(agg, "get_target_column") for agg in aggs) assert all(agg.get_target_column() == "test_column" for agg in aggs) diff --git a/python/requirements/ml/data-test-requirements.txt b/python/requirements/ml/data-test-requirements.txt index bb7634360b21..14d2e9c2dfe9 100644 --- a/python/requirements/ml/data-test-requirements.txt +++ b/python/requirements/ml/data-test-requirements.txt @@ -23,3 +23,4 @@ pyiceberg[sql-sqlite]==0.9.0 clickhouse-connect pybase64 hudi==0.4.0 +datasketches From f983338b23090aa7fa851da64373db02f719f61e Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 00:39:26 +0000 Subject: [PATCH 2/9] add import error Signed-off-by: You-Cheng Lin (Owen) --- python/ray/data/aggregate.py | 32 ++++++++++++++++-------- python/ray/data/tests/test_custom_agg.py | 13 ---------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index c24ad87c1ff0..a468e86fab94 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -4,7 +4,6 @@ import numpy as np import pyarrow.compute as pc -from datasketches import kll_floats_sketch from ray.data._internal.util import is_null from ray.data.block import ( @@ -1193,6 +1192,16 @@ def finalize(self, accumulator: List[int]) -> Optional[float]: class ApproximateQuantile(AggregateFnV2): + def _require_datasketches(self): + try: + from datasketches import kll_floats_sketch # type: ignore[import] + except ImportError as exc: + raise ImportError( + "ApproximateQuantile requires the `datasketches` package. " + "Install it with `pip install datasketches`." + ) from exc + return kll_floats_sketch + def __init__( self, on: str, @@ -1245,24 +1254,25 @@ def __init__( k: Controls the accuracy and memory footprint of the sketch; higher k yields lower error but uses more memory. Defaults to 800. alias_name: Optional name for the resulting column. If not provided, defaults to "approx_quantile({column_name})". """ + self._require_datasketches() self._quantiles = quantiles self._k = k super().__init__( alias_name if alias_name else f"approx_quantile({str(on)})", on=on, ignore_nulls=True, - zero_factory=lambda: ApproximateQuantile.zero(k).serialize(), + zero_factory=lambda: self.zero(k).serialize(), ) - @staticmethod - def zero(k: int): - return kll_floats_sketch(k=k) + def zero(self, k: int): + sketch_cls = self._require_datasketches() + return sketch_cls(k=k) def aggregate_block(self, block: Block) -> bytes: block_acc = BlockAccessor.for_block(block) table = block_acc.to_arrow() column = table.column(self.get_target_column()) - sketch = ApproximateQuantile.zero(self._k) + sketch = self.zero(self._k) for value in column: # we ignore nulls here if value.as_py() is not None: @@ -1270,10 +1280,12 @@ def aggregate_block(self, block: Block) -> bytes: return sketch.serialize() def combine(self, current_accumulator: bytes, new: bytes) -> bytes: - combined = ApproximateQuantile.zero(self._k) - combined.merge(kll_floats_sketch.deserialize(current_accumulator)) - combined.merge(kll_floats_sketch.deserialize(new)) + combined = self.zero(self._k) + sketch_cls = self._require_datasketches() + combined.merge(sketch_cls.deserialize(current_accumulator)) + combined.merge(sketch_cls.deserialize(new)) return combined.serialize() def finalize(self, accumulator: bytes) -> List[float]: - return kll_floats_sketch.deserialize(accumulator).get_quantiles(self._quantiles) + sketch_cls = self._require_datasketches() + return sketch_cls.deserialize(accumulator).get_quantiles(self._quantiles) diff --git a/python/ray/data/tests/test_custom_agg.py b/python/ray/data/tests/test_custom_agg.py index 9afe3df0a072..db9fcf603d57 100644 --- a/python/ray/data/tests/test_custom_agg.py +++ b/python/ray/data/tests/test_custom_agg.py @@ -360,19 +360,6 @@ def test_approximate_quantile_groupby(self, ray_start_regular_shared_2_cpus): assert result_by_group["A"] == [2.0] assert result_by_group["B"] == [20.0] - def test_approximate_quantile_respects_k(self, ray_start_regular_shared_2_cpus): - data = [{"id": i, "value": float(i)} for i in range(1000)] - ds = ray.data.from_items(data) - - result = ds.aggregate( - ApproximateQuantile(on="value", quantiles=[0.25, 0.5, 0.75], k=200) - ) - - expected = np.quantile(np.arange(1000, dtype=float), [0.25, 0.5, 0.75]) - actual = result["approx_quantile(value)"] - for actual_val, expected_val in zip(actual, expected): - assert abs(actual_val - expected_val) <= 1.0 - if __name__ == "__main__": import sys From 93a531bfc8780cd6c670172af49c64f8b5385143 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 01:14:02 +0000 Subject: [PATCH 3/9] update requirements_compiled.txt Signed-off-by: You-Cheng Lin (Owen) --- python/requirements_compiled.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/requirements_compiled.txt b/python/requirements_compiled.txt index f09035819bdc..42e5bcf044dc 100644 --- a/python/requirements_compiled.txt +++ b/python/requirements_compiled.txt @@ -421,6 +421,8 @@ datasets==3.6.0 # -r python/requirements/ml/data-test-requirements.txt # -r python/requirements/ml/train-requirements.txt # evaluate +datasketches==5.2.0 + # via -r python/requirements/ml/data-test-requirements.txt debugpy==1.8.0 # via ipykernel decorator==5.1.1 @@ -1247,6 +1249,7 @@ numpy==1.26.4 # cupy-cuda12x # dask # datasets + # datasketches # decord # deepspeed # dm-control From 153a44c1c6edadd1d31210c24f33ae17efc55106 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 03:56:03 +0000 Subject: [PATCH 4/9] add stability annotation Signed-off-by: You-Cheng Lin (Owen) --- python/ray/data/aggregate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index a468e86fab94..46e40ab6f8b1 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -1191,6 +1191,7 @@ def finalize(self, accumulator: List[int]) -> Optional[float]: return (accumulator[0] / accumulator[1]) * 100.0 +@PublicAPI(stability="alpha") class ApproximateQuantile(AggregateFnV2): def _require_datasketches(self): try: From 891dccb5f0b0e4205d040dc9d99eb6f9d4365e3d Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 05:45:27 +0000 Subject: [PATCH 5/9] add to rst Signed-off-by: You-Cheng Lin (Owen) --- doc/source/data/api/aggregate.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/source/data/api/aggregate.rst b/doc/source/data/api/aggregate.rst index 19cce4d7e08c..5f7d6f3dae7a 100644 --- a/doc/source/data/api/aggregate.rst +++ b/doc/source/data/api/aggregate.rst @@ -27,3 +27,5 @@ compute aggregations. Unique MissingValuePercentage ZeroPercentage + ApproximateQuantile + \ No newline at end of file From b0e9923d4551463c2bdd7fc862cc64b303e8df1a Mon Sep 17 00:00:00 2001 From: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Date: Fri, 10 Oct 2025 17:26:44 +0800 Subject: [PATCH 6/9] Update incorrect comments Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> --- python/ray/data/tests/test_dataset_stats.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/data/tests/test_dataset_stats.py b/python/ray/data/tests/test_dataset_stats.py index b96a68602548..baf54f365901 100644 --- a/python/ray/data/tests/test_dataset_stats.py +++ b/python/ray/data/tests/test_dataset_stats.py @@ -52,7 +52,7 @@ def test_numerical_columns_detection(self): assert len(feature_aggs.vector_columns) == 0 # Check that we have the right number of aggregators - # 3 numerical columns * 8 aggregators each + 1 string column * 2 aggregators = 23 total + # 3 numerical columns * 8 aggregators each + 1 string column * 2 aggregators = 26 total print(feature_aggs.aggregators) assert len(feature_aggs.aggregators) == 26 @@ -76,7 +76,7 @@ def test_categorical_columns_detection(self): assert "value" in feature_aggs.numerical_columns assert "category" not in feature_aggs.numerical_columns - # Check aggregator count: 1 numerical * 8 + 2 categorical * 2 = 11 + # Check aggregator count: 1 numerical * 8 + 2 categorical * 2 = 12 assert len(feature_aggs.aggregators) == 12 def test_vector_columns_detection(self): @@ -132,7 +132,7 @@ def test_mixed_column_types(self): # bool_val should be treated as numerical (integer-like) assert "bool_val" in feature_aggs.numerical_columns - # Check aggregator count: 3 numerical * 8 + 1 categorical * 2 + 1 vector * 2 = 25 + # Check aggregator count: 3 numerical * 8 + 1 categorical * 2 + 1 vector * 2 = 28 assert len(feature_aggs.aggregators) == 28 def test_column_filtering(self): @@ -153,7 +153,7 @@ def test_column_filtering(self): assert "col3" in feature_aggs.vector_columns assert "col4" not in feature_aggs.numerical_columns - # Check aggregator count: 1 numerical * 8 + 1 vector * 2 = 9 + # Check aggregator count: 1 numerical * 8 + 1 vector * 2 = 10 assert len(feature_aggs.aggregators) == 10 def test_empty_dataset_schema(self): @@ -356,7 +356,7 @@ def test_large_dataset_performance(self): assert "category" in feature_aggs.str_columns assert "vector" in feature_aggs.vector_columns - # Check aggregator count: 2 numerical * 8 + 1 categorical * 2 + 1 vector * 2 = 18 + # Check aggregator count: 2 numerical * 8 + 1 categorical * 2 + 1 vector * 2 = 20 assert len(feature_aggs.aggregators) == 20 From a3978b48baa28ee5fd23d30ff94c908e720efe98 Mon Sep 17 00:00:00 2001 From: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Date: Fri, 10 Oct 2025 17:27:17 +0800 Subject: [PATCH 7/9] Remove print Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> --- python/ray/data/tests/test_dataset_stats.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/tests/test_dataset_stats.py b/python/ray/data/tests/test_dataset_stats.py index baf54f365901..708e5d4facd5 100644 --- a/python/ray/data/tests/test_dataset_stats.py +++ b/python/ray/data/tests/test_dataset_stats.py @@ -53,7 +53,6 @@ def test_numerical_columns_detection(self): # Check that we have the right number of aggregators # 3 numerical columns * 8 aggregators each + 1 string column * 2 aggregators = 26 total - print(feature_aggs.aggregators) assert len(feature_aggs.aggregators) == 26 def test_categorical_columns_detection(self): From 47d8d7b51036b1c0c644f933668efd515f329754 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Sat, 11 Oct 2025 01:12:54 +0000 Subject: [PATCH 8/9] update KLL link Signed-off-by: You-Cheng Lin (Owen) --- python/ray/data/aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index 46e40ab6f8b1..44b362d12952 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -1212,7 +1212,7 @@ def __init__( ): """ Computes the approximate quantiles of a column by using a datasketches kll_floats_sketch. - https://datasketches.apache.org/docs/KLL/KLLOverview.html + https://datasketches.apache.org/docs/KLL/KLLSketch.html The accuracy of the KLL quantile sketch is a function of the configured K, which also affects the overall size of the sketch. From 47a89f886c991b205cc7c39f29fe6589df62b6f0 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Sat, 11 Oct 2025 01:56:34 +0000 Subject: [PATCH 9/9] add link to k params Signed-off-by: You-Cheng Lin (Owen) --- python/ray/data/aggregate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index 44b362d12952..cfec9fc4acb1 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -1252,7 +1252,7 @@ def __init__( Args: on: The name of the column to calculate the quantile on. Must be a numeric column. quantiles: The list of quantiles to compute. Must be between 0 and 1 inclusive. For example, quantiles=[0.5] computes the median. Null entries in the source column are skipped. - k: Controls the accuracy and memory footprint of the sketch; higher k yields lower error but uses more memory. Defaults to 800. + k: Controls the accuracy and memory footprint of the sketch; higher k yields lower error but uses more memory. Defaults to 800. See https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html for details on accuracy and size. alias_name: Optional name for the resulting column. If not provided, defaults to "approx_quantile({column_name})". """ self._require_datasketches()