diff --git a/nvtabular/ops/categorify.py b/nvtabular/ops/categorify.py index 4ebc878621..9c246b12db 100644 --- a/nvtabular/ops/categorify.py +++ b/nvtabular/ops/categorify.py @@ -39,6 +39,7 @@ from dask.highlevelgraph import HighLevelGraph from dask.utils import parse_bytes from fsspec.core import get_fs_token_paths +from packaging.version import Version from merlin.core import dispatch from merlin.core.dispatch import DataFrameType, annotate, is_cpu_object, nullable_series @@ -53,6 +54,7 @@ PAD_OFFSET = 0 NULL_OFFSET = 1 OOV_OFFSET = 2 +PYARROW_GE_14 = Version(pa.__version__) >= Version("14.0") class Categorify(StatOperator): @@ -907,7 +909,11 @@ def _general_concat( ): # Concatenate DataFrame or pa.Table objects if isinstance(frames[0], pa.Table): - df = pa.concat_tables(frames, promote=True) + if PYARROW_GE_14: + df = pa.concat_tables(frames, promote_options="default") + else: + df = pa.concat_tables(frames, promote=True) + if ( cardinality_memory_limit and col_selector is not None diff --git a/tests/unit/ops/test_lambda.py b/tests/unit/ops/test_lambda.py index d50f5d2379..e864fe3b6a 100644 --- a/tests/unit/ops/test_lambda.py +++ b/tests/unit/ops/test_lambda.py @@ -227,7 +227,7 @@ def test_lambdaop_dtype_multi_op_propagation(cpu): { "a": np.arange(size), "b": np.random.choice(["apple", "banana", "orange"], size), - "c": np.random.choice([0, 1], size).astype(np.float16), + "c": np.random.choice([0, 1], size), } ) ddf0 = dd.from_pandas(df0, npartitions=4) diff --git a/tests/unit/test_dask_nvt.py b/tests/unit/test_dask_nvt.py index 10b9ed9323..82f0d3c987 100644 --- a/tests/unit/test_dask_nvt.py +++ b/tests/unit/test_dask_nvt.py @@ -142,6 +142,8 @@ def test_dask_workflow_api_dlrm( @pytest.mark.parametrize("part_mem_fraction", [0.01]) def test_dask_groupby_stats(client, tmpdir, datasets, part_mem_fraction): + from nvtabular.ops.join_groupby import AGG_DTYPES + set_dask_client(client=client) engine = "parquet" @@ -175,10 +177,14 @@ def test_dask_groupby_stats(client, tmpdir, datasets, part_mem_fraction): gb_e = expect.groupby("name-cat").aggregate({"name-cat": "count", "x": ["sum", "min", "std"]}) gb_e.columns = ["count", "sum", "min", "std"] df_check = got.merge(gb_e, left_on="name-cat", right_index=True, how="left") - assert_eq(df_check["name-cat_count"], df_check["count"], check_names=False) + assert_eq( + df_check["name-cat_count"], df_check["count"].astype(AGG_DTYPES["count"]), check_names=False + ) assert_eq(df_check["name-cat_x_sum"], df_check["sum"], check_names=False) assert_eq(df_check["name-cat_x_min"], df_check["min"], check_names=False) - assert_eq(df_check["name-cat_x_std"], df_check["std"].astype("float32"), check_names=False) + assert_eq( + df_check["name-cat_x_std"], df_check["std"].astype(AGG_DTYPES["std"]), check_names=False + ) @pytest.mark.parametrize("part_mem_fraction", [0.01])