diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 2986a88f..885e52da 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -57,10 +57,6 @@ filter_text_rows_by_bucket_batch, merge_left_to_shuffled_right, ) -from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import ( - build_partition, - get_agg_text_bytes_df, -) from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import write_partitioned_file @@ -639,9 +635,7 @@ def __call__(self, dataset: DocumentDataset): "map_buckets", ): ddf_mapped_buckets_w_anchors = ( - self.map_buckets.map_buckets_with_anchors( - documents_df=dataset.df, buckets_df=buckets_df.df - ) + self.map_buckets.map_buckets_with_anchors(buckets_df=buckets_df.df) ) ddf_mapped_buckets_w_anchors.to_parquet( mapped_buckets_w_anchors_path, write_index=False, overwrite=True @@ -883,120 +877,6 @@ def __init__( else: self._logger = logger - @staticmethod - def _get_output_part_ids_with_approx_equal_sum( - bucket_text_bytes_df: cudf.DataFrame, - max_text_bytes_per_part: int, - buckets_column: str, - bytes_column: str, - output_partition_column: str, - ) -> cudf.DataFrame: - """ - Create a output_series that maps the ser.index into `nparts` - so that the total sum of bucket_val_counts_df - for each output id are all most equal and - less than max_text_bytes_per_part - This is used downstream for creating equal output_ids - """ - sizes = bucket_text_bytes_df[bytes_column].values - bucket_output_ar = build_partition( - sizes=sizes.get(), max_size=max_text_bytes_per_part - ) - df = cudf.DataFrame() - df[buckets_column] = bucket_text_bytes_df[buckets_column] - df[output_partition_column] = bucket_output_ar - return df - - def _get_output_map_from_text_bytes_per_bucket( - self, - ddf_bk_text_bytes, - bytes_column, - output_partition_column="_output_partition_id", - ): - # String bytes limit for cuDF - # https://github.com/rapidsai/cudf/issues/13733 - max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) - - self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}") - # Increasing in an attempt to prevent hitting - # ulimits - output_map_df_meta = cudf.DataFrame( - {self.bucket_field: [0], output_partition_column: [1]} - ) - output_map_df_meta = output_map_df_meta.astype( - {self.bucket_field: np.uint64, output_partition_column: np.int32} - ) - - output_map_df = ddf_bk_text_bytes.map_partitions( - _MapBuckets._get_output_part_ids_with_approx_equal_sum, - max_text_bytes_per_part=max_text_bytes_per_part, - buckets_column=self.bucket_field, - bytes_column=bytes_column, - output_partition_column=output_partition_column, - meta=output_map_df_meta, - ) - output_map_df = output_map_df.persist() - self._logger.info( - f"Step 1 of output_map_df of len: {len(output_map_df)} computed" - ) - lower_bounds = ( - output_map_df[output_partition_column] - .map_partitions(lambda s: (s.max() + 1)) - .compute() - ) - lower_bounds = np.cumsum(lower_bounds) - - def update_id(df, lower_bound): - df[output_partition_column] += lower_bound - return df - - updated_parts = [ - output_map_df.get_partition(i).map_partitions( - update_id, lower_bounds[i - 1] - ) - for i in range(1, len(lower_bounds)) - ] - updated_parts.append(output_map_df.get_partition(0)) - output_map_df = dask_cudf.concat(updated_parts) - output_map_df = output_map_df.persist() - self._logger.info( - f"All steps of output_map_df of len: {len(output_map_df)} computed" - ) - return output_map_df - - def _get_output_map_based_on_str_bytes( - self, buckets_df, documents_df, bytes_column="_text_bytes" - ): - """ - Add output_partition_id to buckets_ddf - """ - documents_df = documents_df.copy() - documents_df[bytes_column] = documents_df[self.text_field].map_partitions( - lambda s: s.str.byte_count() - ) - n_partitions = buckets_df.npartitions - documents_df = documents_df.drop(columns=[self.text_field]).repartition( - npartitions=n_partitions - ) - buckets_df = buckets_df.merge(documents_df).repartition( - npartitions=n_partitions - ) - del documents_df - ddf_bk_text_bytes, agg_df_len = get_agg_text_bytes_df( - df=buckets_df, - agg_column=self.bucket_field, - bytes_column=bytes_column, - n_partitions=n_partitions, - shuffle=True, - ) - self._logger.info(f"Agg_df computed of length = {agg_df_len}") - del buckets_df - output_map_df = self._get_output_map_from_text_bytes_per_bucket( - ddf_bk_text_bytes=ddf_bk_text_bytes, - bytes_column=bytes_column, - ) - return output_map_df - def _random_select_anchor(self, buckets_df, n=2): """ Randomly select `n` anchors from each bucket. @@ -1041,36 +921,18 @@ def _add_anchor_docs(self, buckets_df, num_anchors): def map_buckets_with_anchors( self, - documents_df: dask_cudf.DataFrame, buckets_df: dask_cudf.DataFrame, shuffle_type: Union[str, bool, None] = "tasks", ) -> dask_cudf.DataFrame: - """ - Get anchor docs with bucket info - Args: - input_data_paths: list of paths to input data - input_bucket_path: path to input buckets - text_ddf_blocksize: blocksize for text ddf - num_files: number of files to read - num_workers: number of workers - shuffle_type: type of shuffle to use - Returns: - ddf_anchor_docs_with_bk - """ - output_map_df = self._get_output_map_based_on_str_bytes( - buckets_df=buckets_df, documents_df=documents_df - ) ddf_anchor_docs_with_bk = buckets_df.map_partitions( self._add_anchor_docs, num_anchors=self.num_anchors ) - self._logger.info("output_map_df is based on string bytes") - ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.merge( - output_map_df, on=self.bucket_field - ) + # Bucket is no longer needed ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.drop( columns=[self.bucket_field] ) + # Below removes any duplicates lying around after dropping buckets ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.map_partitions( M.drop_duplicates, @@ -1079,6 +941,7 @@ def map_buckets_with_anchors( transform_divisions=False, align_dataframes=False, ) + ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.shuffle( self.id_fields, ignore_index=True, @@ -1090,7 +953,7 @@ def map_buckets_with_anchors( transform_divisions=False, align_dataframes=False, ) - del output_map_df + return ddf_anchor_docs_with_bk diff --git a/nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py deleted file mode 100644 index dc1f5795..00000000 --- a/nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -from typing import Tuple - -import numba -import numpy as np - -from nemo_curator._compat import DASK_SHUFFLE_METHOD_ARG, query_planning_enabled - - -def get_agg_text_bytes_df( - df: dask_cudf.DataFrame, - agg_column: str, - bytes_column: str, - n_partitions: int, - shuffle: bool = False, -) -> Tuple[dask_cudf.DataFrame, int]: - """ - Groupby bucket and calculate total bytes for a bucket. - """ - if query_planning_enabled(): - # `shuffle_method: bool` doesn't really make sense - # when query-planning is enabled, because dask-expr - # will ALWAYS use a shuffle-based reduction when - # `split_out>1` - shuffle_arg = {} - else: - shuffle_arg = { - "shuffle_method" if DASK_SHUFFLE_METHOD_ARG else "shuffle": shuffle - } - agg_df = ( - df[[agg_column, bytes_column]] - .groupby([agg_column]) - .agg({bytes_column: "sum"}, split_out=n_partitions, **shuffle_arg) - ) - agg_df = agg_df.reset_index(drop=False) - # Doing a per partition sort - # seems to cause issues with - # jaccard shuffle (Overflow errors) - # which are caught and then - # retried with text_bytes_aware_merge - agg_df = agg_df.persist() - agg_df = agg_df.sort_values(by=[bytes_column], ascending=False, ignore_index=True) - agg_df = agg_df.persist() - # Added length to force computation - # after persist - agg_df_len = len(agg_df) - - return agg_df, agg_df_len - - -# next-fit-descending bin packing -# https://en.wikipedia.org/wiki/Next-fit-decreasing_bin_packing -@numba.jit(nopython=True) -def build_partition(sizes: np.ndarray, max_size: int) -> np.ndarray: - """ - Given an array of items and a max bin size this method - attempts to return a grouping of items such that no group exceeds - the max bin size using the Next-fit-decreasing bin packing approach. - """ - i: int = 0 - count: int = 0 - current: int = 0 - size: int = 0 - partition = np.empty(sizes.shape, dtype=np.int32) - for i in range(len(sizes)): - size = sizes[i] - if current + size < max_size: - partition[i] = count - current += size - else: - count += 1 - current = size - partition[i] = count - return partition diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 937518ec..8af4732f 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -20,7 +20,6 @@ from packaging.version import Version from nemo_curator._compat import query_planning_enabled -from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import build_partition dask_cuda_version = Version(dask_cuda.__version__) USE_EXCOMMS = ( @@ -95,30 +94,3 @@ def rearange_by_column_direct( npartitions=npartitions, ignore_index=ignore_index, ) - - -def get_shuffle_part_ids_df( - agg_df, - partition_on, - output_col, - size_col, - num_workers=0, -): - sizes = agg_df[size_col].values - max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) - - # Adjust max_text_bytes_per_part if the number of output - # partitions is small compared to the number of workers. - # Sometimes we just have very few output partitions to - # deal with, and just need a larger batch - npartitions_min = max(1, int(num_workers * 0.8)) - while True: - output_ar = build_partition(sizes.get(), max_text_bytes_per_part) - if output_ar.max() > npartitions_min or max_text_bytes_per_part < 2**24: - break - max_text_bytes_per_part = int(max_text_bytes_per_part // 2.0) - - df = cudf.DataFrame() - df[partition_on] = agg_df[partition_on] - df[output_col] = output_ar - return df