-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fuzzy dedup #699
base: dev
Are you sure you want to change the base?
Fuzzy dedup #699
Conversation
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: nelson <[email protected]>
Signed-off-by: nelson <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to update the branch from dev and resolve any conflicts first before tackling some of these points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more words here to provide a gentle introduction would be nice. In addition, you need to describe all of the configuration keys. See doc_chunk for a template.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still working on the documentation.
transforms/universal/fdedup/python/src/cluster_analysis_local_python.py
Outdated
Show resolved
Hide resolved
jaccard_similarity_threshold_key, jaccard_similarity_threshold_default | ||
) | ||
self.sort_output = config.get(sort_output_key, sort_output_default) | ||
self.data_access = config.get("data_access") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If data_access is not provided, what happens. Either throw an exception, or use DataAccessLocal() as the default?
folder_name = f"{folder_name}/" | ||
return folder_name | ||
|
||
def consolidate_band_segment_files(self, files: dict[str, bytes]) -> tuple[pl.DataFrame, dict[str, Any]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should hide all these internal methods by prefixing them with _
or __
.
f"--{sort_output_cli_param}", | ||
type=bool, | ||
default=sort_output_default, | ||
help="Sort", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort all output or within a file and sort by what?
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
parser.add_argument( | ||
"--document_id_column", type=str, required=False, help="name of the column that stores document text" | ||
) | ||
parser.add_argument("--seed", type=int, required=False, help="name of the column that stores document text") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document_id
and seed
seem to have the wrong help text.
super().__init__(params=params) | ||
self.logger = get_logger(__name__) | ||
|
||
def get_folders(self, data_access: DataAccess) -> list[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This get_folders
method should be defined as an @abstractmethod
in a new super-class for this framework. I wouldn't suggest this for one class, but you seem to using the pattern in your other transforms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to the discussion in PR #691 as to why we implemented like this.
super().__init__( | ||
name=short_name, | ||
transform_class=SignatureCalculationTransform, | ||
remove_from_metadata=[sigcalc_data_factory_key], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this is the right key. It needs to be the key corresponding to the command line parameter for the s3 credentials. I'm not sure but I believe in this case is scdata_data_s3_cred
. See https://github.com/IBM/data-prep-kit/blob/dev/data-processing-lib/doc/ray-launcher-options.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can check in the metadata.json when using S3 to see what shows up there.
|
||
if __name__ == "__main__": | ||
launcher = PythonTransformLauncher(SignatureCalculationTransformConfiguration()) | ||
logger.info("Launching noop transform") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noop. Maybe just remove this line.
|
||
# create parameters | ||
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "expected/cluster_analysis")) | ||
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "expected")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should never write directly to test-data. Write somewhere else (output
is the convention) then manually copy if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit fa5959b
self.docs_to_remove_df = self.docs_to_remove_df.rename({"docs_to_remove": self.document_id_column}) | ||
|
||
def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: | ||
self.logger.info(f"Transforming table with {table.num_rows} rows from file {file_name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe make this debug so the kfp log doesn't get overwhelmed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only log message I output in this transform, and it is invoked once per file.
super().__init__( | ||
name=short_name, | ||
transform_class=transform_class, | ||
remove_from_metadata=[dataclean_data_factory_key], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as elsewhere about the correct key to pass here
|
||
self.logger = get_logger(__name__) | ||
|
||
def get_transform_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function looks like a duplicate of that in DataCleaningPythonRuntime. Can you make a shared method somehow, either as a global or in a shared superclass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to the discussion in PR #691 as to why we implemented like this.
super().__init__(params=params) | ||
self.logger = get_logger(__name__) | ||
|
||
def get_transform_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this function looks like a duplicate of that in DataCleaningPythonRuntime. Can you make a shared method somehow, either as a global or in a shared superclass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to the discussion in PR #691 as to why it is implemented like this.
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
ComponentUtils.add_settings_to_component(execute_data_cleaning_job, ONE_WEEK_SEC) | ||
# FIXME: see https://github.com/kubeflow/pipelines/issues/10914 | ||
if os.getenv("KFPv2", "0") != "1": | ||
ComponentUtils.set_s3_env_vars_to_component(execute_data_cleaning_job, data_s3_access_secret) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, in KFP v2 the secret name is hard coded (in kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py) as a workaround for kubeflow/pipelines#10914. Thus, this pipeline is not expected to run on kfp V2. Should it be added to the blacklist, given that we currently can't restrict it to run only for v1 in the CI/CD tests? @roytman what do you think? Thanks
|
||
## Summary | ||
|
||
The basic implementation of the fuzzy dedup is based on [MinHash](https://en.wikipedia.org/wiki/MinHash). Also see |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgive me if this is a duplicate comment as I thought I had submitted once already, but...
- A more gentle introduction to what the transform does instead of only providing the links.
- The set of configuration keys should be documented. See doc_chunk for a nice example.
- This file needs to be linked from a ../README.md, which now only points to ray and python.
transforms/universal/fdedup/ray/src/cluster_analysis_local_ray.py
Outdated
Show resolved
Hide resolved
) | ||
|
||
|
||
if __name__ == "__main__": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need these sub-transform main()s? They are not exposed in the Dockerfile (in the home dir) so are not even directly callable (in the standard way). Do we ever need to run this sub-transform manually outside of fdedup orchestrator/transform? If so, then it should be promoted to the home directory in the Dockerfile, otherwise maybe delete main()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main()
in all these transforms is needed by the kfp pipeline here:
https://github.com/IBM/data-prep-kit/blob/4941d5bab37a0bdc1e5873ce8e7288483703751f/transforms/universal/fdedup/kfp_ray/fdedup_wf.py#L30C1-L35C1
If I remove these, the pipeline will stop working
|
||
|
||
if __name__ == "__main__": | ||
# launcher = NOOPRayLauncher() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOOP, although per earlier common, do we need the main() for the sub-transforms orchestrated by fdedup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit fa5959b
Yes, we do need the main, see my comment above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By convention, this file should be named cluster_analysis_s3_spark.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By convention, this should be named data_cleaning_s3_spark.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By convention, this should be named fuzzy_dedup_s3_spark.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By convention...signature_calc_s3_spark.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$(PIP) install --upgrade pip; \ | ||
$(PIP) install -r requirements.txt; \ | ||
fi | ||
set-versions: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you don't need this if you've renamed the file. If you go back to Makefile, you will also need to add
clean test build publish
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
RUN pip install --no-cache-dir -e . | ||
|
||
# copy source data | ||
COPY src/ src/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmadam This is already done on line 19 above! also, line 24 is doing pip install of the transform and installing the same code
Why are these changes needed?
Provide fuzzy dedup implementation for Python, Spark and Ray
Related issue number (if any).
#152
#79