Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove max_text_bytes_per_part #385

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 5 additions & 142 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@
filter_text_rows_by_bucket_batch,
merge_left_to_shuffled_right,
)
from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import (
build_partition,
get_agg_text_bytes_df,
)
from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import write_partitioned_file


Expand Down Expand Up @@ -639,9 +635,7 @@ def __call__(self, dataset: DocumentDataset):
"map_buckets",
):
ddf_mapped_buckets_w_anchors = (
self.map_buckets.map_buckets_with_anchors(
documents_df=dataset.df, buckets_df=buckets_df.df
)
self.map_buckets.map_buckets_with_anchors(buckets_df=buckets_df.df)
)
ddf_mapped_buckets_w_anchors.to_parquet(
mapped_buckets_w_anchors_path, write_index=False, overwrite=True
Expand Down Expand Up @@ -883,120 +877,6 @@ def __init__(
else:
self._logger = logger

@staticmethod
def _get_output_part_ids_with_approx_equal_sum(
bucket_text_bytes_df: cudf.DataFrame,
max_text_bytes_per_part: int,
buckets_column: str,
bytes_column: str,
output_partition_column: str,
) -> cudf.DataFrame:
"""
Create a output_series that maps the ser.index into `nparts`
so that the total sum of bucket_val_counts_df
for each output id are all most equal and
less than max_text_bytes_per_part
This is used downstream for creating equal output_ids
"""
sizes = bucket_text_bytes_df[bytes_column].values
bucket_output_ar = build_partition(
sizes=sizes.get(), max_size=max_text_bytes_per_part
)
df = cudf.DataFrame()
df[buckets_column] = bucket_text_bytes_df[buckets_column]
df[output_partition_column] = bucket_output_ar
return df

def _get_output_map_from_text_bytes_per_bucket(
self,
ddf_bk_text_bytes,
bytes_column,
output_partition_column="_output_partition_id",
):
# String bytes limit for cuDF
# https://github.com/rapidsai/cudf/issues/13733
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)

self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}")
# Increasing in an attempt to prevent hitting
# ulimits
output_map_df_meta = cudf.DataFrame(
{self.bucket_field: [0], output_partition_column: [1]}
)
output_map_df_meta = output_map_df_meta.astype(
{self.bucket_field: np.uint64, output_partition_column: np.int32}
)

output_map_df = ddf_bk_text_bytes.map_partitions(
_MapBuckets._get_output_part_ids_with_approx_equal_sum,
max_text_bytes_per_part=max_text_bytes_per_part,
buckets_column=self.bucket_field,
bytes_column=bytes_column,
output_partition_column=output_partition_column,
meta=output_map_df_meta,
)
output_map_df = output_map_df.persist()
self._logger.info(
f"Step 1 of output_map_df of len: {len(output_map_df)} computed"
)
lower_bounds = (
output_map_df[output_partition_column]
.map_partitions(lambda s: (s.max() + 1))
.compute()
)
lower_bounds = np.cumsum(lower_bounds)

def update_id(df, lower_bound):
df[output_partition_column] += lower_bound
return df

updated_parts = [
output_map_df.get_partition(i).map_partitions(
update_id, lower_bounds[i - 1]
)
for i in range(1, len(lower_bounds))
]
updated_parts.append(output_map_df.get_partition(0))
output_map_df = dask_cudf.concat(updated_parts)
output_map_df = output_map_df.persist()
self._logger.info(
f"All steps of output_map_df of len: {len(output_map_df)} computed"
)
return output_map_df

def _get_output_map_based_on_str_bytes(
self, buckets_df, documents_df, bytes_column="_text_bytes"
):
"""
Add output_partition_id to buckets_ddf
"""
documents_df = documents_df.copy()
documents_df[bytes_column] = documents_df[self.text_field].map_partitions(
lambda s: s.str.byte_count()
)
n_partitions = buckets_df.npartitions
documents_df = documents_df.drop(columns=[self.text_field]).repartition(
npartitions=n_partitions
)
buckets_df = buckets_df.merge(documents_df).repartition(
npartitions=n_partitions
)
del documents_df
ddf_bk_text_bytes, agg_df_len = get_agg_text_bytes_df(
df=buckets_df,
agg_column=self.bucket_field,
bytes_column=bytes_column,
n_partitions=n_partitions,
shuffle=True,
)
self._logger.info(f"Agg_df computed of length = {agg_df_len}")
del buckets_df
output_map_df = self._get_output_map_from_text_bytes_per_bucket(
ddf_bk_text_bytes=ddf_bk_text_bytes,
bytes_column=bytes_column,
)
return output_map_df

def _random_select_anchor(self, buckets_df, n=2):
"""
Randomly select `n` anchors from each bucket.
Expand Down Expand Up @@ -1041,36 +921,18 @@ def _add_anchor_docs(self, buckets_df, num_anchors):

def map_buckets_with_anchors(
self,
documents_df: dask_cudf.DataFrame,
buckets_df: dask_cudf.DataFrame,
shuffle_type: Union[str, bool, None] = "tasks",
) -> dask_cudf.DataFrame:
"""
Get anchor docs with bucket info
Args:
input_data_paths: list of paths to input data
input_bucket_path: path to input buckets
text_ddf_blocksize: blocksize for text ddf
num_files: number of files to read
num_workers: number of workers
shuffle_type: type of shuffle to use
Returns:
ddf_anchor_docs_with_bk
"""
output_map_df = self._get_output_map_based_on_str_bytes(
buckets_df=buckets_df, documents_df=documents_df
)
ddf_anchor_docs_with_bk = buckets_df.map_partitions(
self._add_anchor_docs, num_anchors=self.num_anchors
)
self._logger.info("output_map_df is based on string bytes")
ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.merge(
output_map_df, on=self.bucket_field
)

# Bucket is no longer needed
ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.drop(
columns=[self.bucket_field]
)

# Below removes any duplicates lying around after dropping buckets
ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.map_partitions(
M.drop_duplicates,
Expand All @@ -1079,6 +941,7 @@ def map_buckets_with_anchors(
transform_divisions=False,
align_dataframes=False,
)

ddf_anchor_docs_with_bk = ddf_anchor_docs_with_bk.shuffle(
self.id_fields,
ignore_index=True,
Expand All @@ -1090,7 +953,7 @@ def map_buckets_with_anchors(
transform_divisions=False,
align_dataframes=False,
)
del output_map_df

return ddf_anchor_docs_with_bk


Expand Down
89 changes: 0 additions & 89 deletions nemo_curator/utils/fuzzy_dedup_utils/output_map_utils.py

This file was deleted.

28 changes: 0 additions & 28 deletions nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from packaging.version import Version

from nemo_curator._compat import query_planning_enabled
from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import build_partition

dask_cuda_version = Version(dask_cuda.__version__)
USE_EXCOMMS = (
Expand Down Expand Up @@ -95,30 +94,3 @@ def rearange_by_column_direct(
npartitions=npartitions,
ignore_index=ignore_index,
)


def get_shuffle_part_ids_df(
agg_df,
partition_on,
output_col,
size_col,
num_workers=0,
):
sizes = agg_df[size_col].values
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)

# Adjust max_text_bytes_per_part if the number of output
# partitions is small compared to the number of workers.
# Sometimes we just have very few output partitions to
# deal with, and just need a larger batch
npartitions_min = max(1, int(num_workers * 0.8))
while True:
output_ar = build_partition(sizes.get(), max_text_bytes_per_part)
if output_ar.max() > npartitions_min or max_text_bytes_per_part < 2**24:
break
max_text_bytes_per_part = int(max_text_bytes_per_part // 2.0)

df = cudf.DataFrame()
df[partition_on] = agg_df[partition_on]
df[output_col] = output_ar
return df
Loading