Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global cache_dir variable for exact, fuzzy, and semantic deduplication #384

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
1 change: 0 additions & 1 deletion config/fuzzy_dedup_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
cache_dir: "./fuzzy_dedup_cache"
# Optional Params below with default values
# profile_dir: null
# id_field: "id"
Expand Down
3 changes: 0 additions & 3 deletions config/sem_dedup_config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Configuration file for semdantic dedup
cache_dir: "semdedup_cache"
num_files: 16

# Embeddings configuration
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128

# Clustering configuration
clustering_save_loc: "clustering_results"
n_clusters: 1000
seed: 1234
max_iter: 100
Expand Down
9 changes: 5 additions & 4 deletions docs/user-guide/gpudeduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ Python API
from nemo_curator import FuzzyDuplicatesConfig

config = FuzzyDuplicatesConfig(
cache_dir="/path/to/dedup_outputs", # must be cleared between runs
id_field="my_id",
text_field="text",
seed=42,
Expand All @@ -200,7 +199,6 @@ Python API

.. code-block:: yaml

cache_dir: /path/to/dedup_outputs
id_field: my_id
text_field: text
seed: 42
Expand All @@ -223,8 +221,12 @@ Python API
.. code-block:: python

from nemo_curator import FuzzyDuplicates
from nemo_curator.cache import initialize_cache_directory
from nemo_curator.datasets import DocumentDataset

# Initialize cache directory where intermediate results are stored
initialize_cache_directory("/path/to/dedup_outputs")

# Initialize the deduplication object
FuzzyDups = FuzzyDuplicates(config=config, logger="./")

Expand All @@ -251,7 +253,7 @@ Python API
- The default values of ``num_buckets`` and ``hashes_per_bucket`` are set to find documents with an approximately Jaccard similarity of 0.8 or above.
- Higher ``buckets_per_shuffle`` values can lead to better performance but might lead to out of memory errors.
- Setting the ``false_positive_check`` flag to ``False`` is ideal for optimal performance.
- When setting the ``false_positive_check`` flag to ``True`` ensure ``cache_dir`` between runs is emptied to avoid data from previous runs interfering with the current run's results.
- When setting the ``false_positive_check`` flag to ``True``, ensure the cache directory is emptied between runs to avoid data from previous runs interfering with the current run's results.

""""""""""""
CLI Utility
Expand Down Expand Up @@ -392,7 +394,6 @@ steps (all scripts are included in the `nemo_curator/scripts/fuzzy_deduplication

# same as `python connected_components.py`
gpu_connected_component \
--jaccard-pairs-path /path/to/dedup_output/jaccard_similarity_results.parquet `#Or /path/to/dedup_output/_edges.parquet` \
--output-dir /path/to/dedup_output \
--cache-dir /path/to/cc_cache \
--jaccard-threshold 0.8 \
Expand Down
17 changes: 9 additions & 8 deletions docs/user-guide/semdedup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,13 @@ Semantic deduplication in NeMo Curator can be configured using a YAML file. Here
.. code-block:: yaml

# Configuration file for semantic dedup
cache_dir: "semdedup_cache"
num_files: -1

# Embeddings configuration
embeddings_save_loc: "embeddings"
embedding_model_name_or_path: "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: 128

# Clustering configuration
clustering_save_loc: "clustering_results"
n_clusters: 1000
seed: 1234
max_iter: 100
Expand Down Expand Up @@ -154,6 +151,15 @@ You can use the ``add_id`` module from NeMo Curator if needed:
id_dataset.to_json("output_file_path", write_to_filename=True)


You also need to set a global variable representing the cache directory where the outputs are written:

.. code-block:: python

from nemo_curator.cache import initialize_cache_directory

initialize_cache_directory("cache_dir")
Comment on lines +154 to +160
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also make more sense to call this something else, like deduplication_outputs.



To perform semantic deduplication, you can either use individual components or the SemDedup class with a configuration file.

Use Individual Components
Expand All @@ -169,7 +175,6 @@ Use Individual Components
embedding_creator = EmbeddingCreator(
embedding_model_name_or_path="path/to/pretrained/model",
embedding_batch_size=128,
embedding_output_dir="path/to/output/embeddings",
input_column="text",
logger="path/to/log/dir",
)
Expand All @@ -187,7 +192,6 @@ Use Individual Components
id_column="doc_id",
max_iter=100,
n_clusters=50000,
clustering_output_dir="path/to/output/clusters",
logger="path/to/log/dir"
)
clustered_dataset = clustering_model(embeddings_dataset)
Expand All @@ -201,12 +205,9 @@ Use Individual Components
# Step 3: Semantic Deduplication
semantic_dedup = SemanticClusterLevelDedup(
n_clusters=50000,
emb_by_clust_dir="path/to/embeddings/by/cluster",
sorted_clusters_dir="path/to/sorted/clusters",
id_column="doc_id",
id_column_type="str",
which_to_keep="hard",
output_dir="path/to/output/deduped",
logger="path/to/log/dir"
)
semantic_dedup.compute_semantic_match_dfs()
Expand Down
4 changes: 1 addition & 3 deletions examples/exact_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import ArgumentHelper


Expand Down Expand Up @@ -46,7 +45,6 @@ def main(args):
logger=log_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
# cache_dir=output_dir # Optionally write the output to disk
)

duplicates = exact_dup(dataset=input_dataset)
Expand Down
4 changes: 2 additions & 2 deletions examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import dask

from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
from nemo_curator.cache import initialize_cache_directory
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import ArgumentHelper
Expand All @@ -31,7 +32,7 @@ def main(args):

dataset_dir = "/path/to/dataset"
log_dir = "./"
cache_dir = "./fuzzy_cache" # must be cleared between runs
initialize_cache_directory("./fuzzy_cache") # must be cleared between runs
output_dir = "./output"
dataset_id_field = "id"
dataset_text_field = "text"
Expand Down Expand Up @@ -65,7 +66,6 @@ def main(args):
)

fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
seed=42,
Expand Down
10 changes: 4 additions & 6 deletions examples/semdedup_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
import os
import time

from nemo_curator.cache import get_cache_directory, initialize_cache_directory
from nemo_curator.datasets import DocumentDataset
from nemo_curator.log import create_logger
from nemo_curator.modules.config import SemDedupConfig
from nemo_curator.modules.semantic_dedup import SemDedup
from nemo_curator.utils.distributed_utils import get_client, read_data
from nemo_curator.utils.file_utils import (
expand_outdir_and_mkdir,
get_all_files_paths_under,
)
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import ArgumentHelper


Expand All @@ -41,11 +39,11 @@ def main(args):
silence_hf_warnings()
client.run(silence_hf_warnings)

expand_outdir_and_mkdir(semdedup_config.cache_dir)
initialize_cache_directory(args.cache_dir)
logger = create_logger(
rank=0,
name="logger-end-to_end-semdup",
log_file=os.path.join(semdedup_config.cache_dir, "compute_embeddings.log"),
log_file=os.path.join(get_cache_directory(), "compute_embeddings.log"),
log_level=logging.INFO,
stdout=True,
)
Expand Down
34 changes: 34 additions & 0 deletions nemo_curator/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nemo_curator.utils.file_utils import expand_outdir_and_mkdir

# Global variable to store the cache directory
_global_cache_dir = None
Copy link
Collaborator

@Maghoumi Maghoumi Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a global variable, we should use the singleton design pattern here, where there is a Cache class that creates the static singleton instance (if it doesn't exist), or returns an existing one (if it was already created).

Subsequently, the init and get methods can be nicely wrapped inside that class.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sarahyurick please let me know what you think and whether my comment makes sense to you

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Yes, that makes sense to me. Sorry I haven't gotten back to this earlier, but I expect to have more changes ready next week.



def initialize_cache_directory(cache_dir: str):
"""
Initialize and set the global cache directory.
"""
global _global_cache_dir
cache_dir = expand_outdir_and_mkdir(cache_dir)
_global_cache_dir = cache_dir


def get_cache_directory() -> str:
"""
Retrieve the global cache directory.
"""
return _global_cache_dir
16 changes: 4 additions & 12 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import yaml

from nemo_curator.cache import get_cache_directory


@dataclass
class BaseConfig:
Expand Down Expand Up @@ -46,8 +48,6 @@ class FuzzyDuplicatesConfig(BaseConfig):
text_field: Column in the Dataset denoting document content.
profile_dir: str, Default None
If specified directory to write dask profile
cache_dir: str, Default None
Location to store deduplcation intermediates such as minhashes/buckets etc.
false_positive_check: bool,
Whether to run a check to look for false positives within buckets.
Note: This is a computationally expensive step.
Expand All @@ -60,7 +60,6 @@ class FuzzyDuplicatesConfig(BaseConfig):
"""

# General config
cache_dir: str
profile_dir: Optional[str] = None
id_field: str = "id"
text_field: str = "text"
Expand All @@ -83,7 +82,7 @@ class FuzzyDuplicatesConfig(BaseConfig):

def __post_init__(self):
self.num_hashes = self.num_buckets * self.hashes_per_bucket
if self.cache_dir is None:
if get_cache_directory() is None:
raise ValueError(
"Finding fuzzy duplicates requires a cache directory accessible via all workers to store intermediates"
)
Expand Down Expand Up @@ -116,14 +115,10 @@ class SemDedupConfig(BaseConfig):
Configuration for Semantic Deduplication.

Attributes:
cache_dir (str): Directory to store cache.
profile_dir (Optional[str]): If specified directory to write dask profile. Default is None.
cache_dir (str): Directory to store cache.
num_files (int): Number of files. Default is -1, meaning all files.
embeddings_save_loc (str): Location to save embeddings.
embedding_model_name_or_path (str): Model name or path for embeddings.
embedding_batch_size (int): Inital Batch size for processing embeddings.
clustering_save_loc (str): Location to save clustering results.
n_clusters (int): Number of clusters.
seed (int): Seed for clustering.
max_iter (int): Maximum iterations for clustering.
Expand All @@ -135,17 +130,14 @@ class SemDedupConfig(BaseConfig):
eps_to_extract (float): Epsilon value to extract deduplicated data.
"""

cache_dir: str
profile_dir: Optional[str] = None
num_files: int = -1

# Embeddings
embeddings_save_loc: str = "embeddings"
embedding_model_name_or_path: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_batch_size: int = 128

# Clustering config
clustering_save_loc: str = "clustering_results"
n_clusters: int = 1000
seed: int = 1234
max_iter: int = 100
Expand All @@ -161,7 +153,7 @@ class SemDedupConfig(BaseConfig):
eps_to_extract: float = 0.01

def __post_init__(self):
if self.cache_dir is None:
if get_cache_directory() is None:
raise ValueError(
"Finding sem-dedup requires a cache directory accessible via all workers to store intermediates"
)
Expand Down
9 changes: 4 additions & 5 deletions nemo_curator/modules/exact_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import time
import warnings
from contextlib import nullcontext
from datetime import datetime
from hashlib import md5
from typing import Optional, Union

Expand All @@ -27,6 +26,7 @@
from dask import dataframe as dd

from nemo_curator._compat import DASK_P2P_ERROR
from nemo_curator.cache import get_cache_directory
from nemo_curator.datasets import DocumentDataset
from nemo_curator.log import create_logger
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix
Expand All @@ -45,7 +45,6 @@ def __init__(
text_field: str = "text",
hash_method: str = "md5",
profile_dir: Optional[str] = None,
cache_dir: Optional[str] = None,
):
"""
Parameters
Expand All @@ -56,10 +55,10 @@ def __init__(
hash_method: The hashing algorithm used for identifying exact duplicates. Currently supports {"md5"}
profile_dir: str, Default None
If specified directory to write dask profile
cache_dir: str, Default None
If specified, will compute & write duplicate id's to cache directory.
"""

cache_dir = get_cache_directory()

if hash_method not in self.SUPPORTED_HASHES:
raise ValueError(
f"{hash_method} not in supported hash_methods. Choose a hash_method from {self.SUPPORTED_HASHES}"
Expand All @@ -69,7 +68,7 @@ def __init__(
self.text_field = text_field
if cache_dir is None and profile_dir is not None:
warnings.warn(
"cache_dir for intermediate outputs is required to generate profiles"
"cache_dir for intermediate outputs is required to generate profiles. Please use initialize_cache_directory for this."
)
self.cache_dir = cache_dir
self.profile_dir = profile_dir
Expand Down
Loading
Loading