Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuzzy dedup #699

Open
wants to merge 86 commits into
base: dev
Choose a base branch
from
Open

Fuzzy dedup #699

wants to merge 86 commits into from

Conversation

Kibnelson
Copy link
Collaborator

Why are these changes needed?

Provide fuzzy dedup implementation for Python, Spark and Ray

Related issue number (if any).

#152
#79

blublinsky and others added 30 commits October 10, 2024 19:05
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Copy link
Collaborator

@touma-I touma-I left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to update the branch from dev and resolve any conflicts first before tackling some of these points.

transforms/universal/fdedup/utils/Makefile Outdated Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more words here to provide a gentle introduction would be nice. In addition, you need to describe all of the configuration keys. See doc_chunk for a template.

Copy link
Collaborator

@cmadam cmadam Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still working on the documentation.

transforms/universal/fdedup/python/pyproject.toml Outdated Show resolved Hide resolved
transforms/universal/fdedup/python/pyproject.toml Outdated Show resolved Hide resolved
jaccard_similarity_threshold_key, jaccard_similarity_threshold_default
)
self.sort_output = config.get(sort_output_key, sort_output_default)
self.data_access = config.get("data_access")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data_access is not provided, what happens. Either throw an exception, or use DataAccessLocal() as the default?

folder_name = f"{folder_name}/"
return folder_name

def consolidate_band_segment_files(self, files: dict[str, bytes]) -> tuple[pl.DataFrame, dict[str, Any]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should hide all these internal methods by prefixing them with _ or __.

f"--{sort_output_cli_param}",
type=bool,
default=sort_output_default,
help="Sort",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort all output or within a file and sort by what?

parser.add_argument(
"--document_id_column", type=str, required=False, help="name of the column that stores document text"
)
parser.add_argument("--seed", type=int, required=False, help="name of the column that stores document text")
Copy link
Member

@daw3rd daw3rd Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document_id and seed seem to have the wrong help text.

super().__init__(params=params)
self.logger = get_logger(__name__)

def get_folders(self, data_access: DataAccess) -> list[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This get_folders method should be defined as an @abstractmethod in a new super-class for this framework. I wouldn't suggest this for one class, but you seem to using the pattern in your other transforms.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the discussion in PR #691 as to why we implemented like this.

super().__init__(
name=short_name,
transform_class=SignatureCalculationTransform,
remove_from_metadata=[sigcalc_data_factory_key],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is the right key. It needs to be the key corresponding to the command line parameter for the s3 credentials. I'm not sure but I believe in this case is scdata_data_s3_cred. See https://github.com/IBM/data-prep-kit/blob/dev/data-processing-lib/doc/ray-launcher-options.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check in the metadata.json when using S3 to see what shows up there.


if __name__ == "__main__":
launcher = PythonTransformLauncher(SignatureCalculationTransformConfiguration())
logger.info("Launching noop transform")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noop. Maybe just remove this line.


# create parameters
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "expected/cluster_analysis"))
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "expected"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should never write directly to test-data. Write somewhere else (output is the convention) then manually copy if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit fa5959b

self.docs_to_remove_df = self.docs_to_remove_df.rename({"docs_to_remove": self.document_id_column})

def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
self.logger.info(f"Transforming table with {table.num_rows} rows from file {file_name}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make this debug so the kfp log doesn't get overwhelmed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only log message I output in this transform, and it is invoked once per file.

super().__init__(
name=short_name,
transform_class=transform_class,
remove_from_metadata=[dataclean_data_factory_key],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as elsewhere about the correct key to pass here


self.logger = get_logger(__name__)

def get_transform_config(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function looks like a duplicate of that in DataCleaningPythonRuntime. Can you make a shared method somehow, either as a global or in a shared superclass?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the discussion in PR #691 as to why we implemented like this.

super().__init__(params=params)
self.logger = get_logger(__name__)

def get_transform_config(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this function looks like a duplicate of that in DataCleaningPythonRuntime. Can you make a shared method somehow, either as a global or in a shared superclass?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the discussion in PR #691 as to why it is implemented like this.

Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
ComponentUtils.add_settings_to_component(execute_data_cleaning_job, ONE_WEEK_SEC)
# FIXME: see https://github.com/kubeflow/pipelines/issues/10914
if os.getenv("KFPv2", "0") != "1":
ComponentUtils.set_s3_env_vars_to_component(execute_data_cleaning_job, data_s3_access_secret)
Copy link
Collaborator

@revit13 revit13 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, in KFP v2 the secret name is hard coded (in kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py) as a workaround for kubeflow/pipelines#10914. Thus, this pipeline is not expected to run on kfp V2. Should it be added to the blacklist, given that we currently can't restrict it to run only for v1 in the CI/CD tests? @roytman what do you think? Thanks


## Summary

The basic implementation of the fuzzy dedup is based on [MinHash](https://en.wikipedia.org/wiki/MinHash). Also see
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive me if this is a duplicate comment as I thought I had submitted once already, but...

  1. A more gentle introduction to what the transform does instead of only providing the links.
  2. The set of configuration keys should be documented. See doc_chunk for a nice example.
  3. This file needs to be linked from a ../README.md, which now only points to ray and python.

)


if __name__ == "__main__":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need these sub-transform main()s? They are not exposed in the Dockerfile (in the home dir) so are not even directly callable (in the standard way). Do we ever need to run this sub-transform manually outside of fdedup orchestrator/transform? If so, then it should be promoted to the home directory in the Dockerfile, otherwise maybe delete main()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main() in all these transforms is needed by the kfp pipeline here:
https://github.com/IBM/data-prep-kit/blob/4941d5bab37a0bdc1e5873ce8e7288483703751f/transforms/universal/fdedup/kfp_ray/fdedup_wf.py#L30C1-L35C1
If I remove these, the pipeline will stop working

transforms/universal/fdedup/python/Dockerfile Outdated Show resolved Hide resolved


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOOP, although per earlier common, do we need the main() for the sub-transforms orchestrated by fdedup.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit fa5959b
Yes, we do need the main, see my comment above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, this file should be named cluster_analysis_s3_spark.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commits f3c5be0 and 4941d5b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, this should be named data_cleaning_s3_spark.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commits f3c5be0 and 4941d5b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, this should be named fuzzy_dedup_s3_spark.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commits f3c5be0 and 4941d5b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention...signature_calc_s3_spark.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commits f3c5be0 and 4941d5b

$(PIP) install --upgrade pip; \
$(PIP) install -r requirements.txt; \
fi
set-versions:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you don't need this if you've renamed the file. If you go back to Makefile, you will also need to add

clean test build publish

@touma-I touma-I self-requested a review November 15, 2024 19:01
RUN pip install --no-cache-dir -e .

# copy source data
COPY src/ src/
Copy link
Collaborator

@touma-I touma-I Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmadam This is already done on line 19 above! also, line 24 is doing pip install of the transform and installing the same code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants