-
Notifications
You must be signed in to change notification settings - Fork 3.3k
DRAFT - feat(ingestion/transformers): add is_container config to tags transformers to enable container tagging #14290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| from datahub.ingestion.api.common import PipelineContext | ||
| from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer | ||
| from datahub.metadata.schema_classes import ( | ||
| BrowsePathsV2Class, | ||
| GlobalTagsClass, | ||
| MetadataChangeProposalClass, | ||
| TagAssociationClass, | ||
|
|
@@ -22,6 +23,7 @@ | |
|
|
||
| class AddDatasetTagsConfig(TransformerSemanticsConfigModel): | ||
| get_tags_to_add: Callable[[str], List[TagAssociationClass]] | ||
| is_container: bool = False | ||
|
|
||
| _resolve_tag_fn = pydantic_resolve_key("get_tags_to_add") | ||
|
|
||
|
|
@@ -73,6 +75,7 @@ def handle_end_of_stream( | |
|
|
||
| logger.debug("Generating tags") | ||
|
|
||
| # Generate tag entities | ||
| for tag_association in self.processed_tags.values(): | ||
| tag_urn = TagUrn.from_string(tag_association.tag) | ||
| mcps.append( | ||
|
|
@@ -82,11 +85,58 @@ def handle_end_of_stream( | |
| ) | ||
| ) | ||
|
|
||
| # Handle container tags if is_container is enabled | ||
| container_tag_mcps: List[MetadataChangeProposalWrapper] = [] | ||
| container_tag_mapping: Dict[str, List[TagAssociationClass]] = {} | ||
|
|
||
| logger.debug("Generating tags for containers") | ||
|
|
||
| if self.config.is_container: | ||
| for entity_urn, tags_to_add in ( | ||
| (urn, self.config.get_tags_to_add(urn)) for urn in self.entity_map | ||
| ): | ||
| if not tags_to_add: | ||
| continue | ||
|
|
||
| assert self.ctx.graph | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dangerous use of assert - low severity Remediation: Raise an exception instead of using assert. |
||
| browse_paths = self.ctx.graph.get_aspect(entity_urn, BrowsePathsV2Class) | ||
| if not browse_paths: | ||
| continue | ||
|
|
||
| for path in browse_paths.path: | ||
| container_urn = path.urn | ||
|
|
||
| if not container_urn or not container_urn.startswith( | ||
| "urn:li:container:" | ||
| ): | ||
| continue | ||
|
|
||
| if container_urn not in container_tag_mapping: | ||
| container_tag_mapping[container_urn] = tags_to_add.copy() | ||
| else: | ||
| # Merge tags, avoiding duplicates | ||
| existing_tag_urns = { | ||
| tag.tag for tag in container_tag_mapping[container_urn] | ||
| } | ||
| for tag in tags_to_add: | ||
| if tag.tag not in existing_tag_urns: | ||
| container_tag_mapping[container_urn].append(tag) | ||
|
|
||
| for urn, tags in container_tag_mapping.items(): | ||
| container_tag_mcps.append( | ||
| MetadataChangeProposalWrapper( | ||
| entityUrn=urn, | ||
| aspect=GlobalTagsClass(tags=tags), | ||
| ) | ||
| ) | ||
|
|
||
| mcps.extend(container_tag_mcps) | ||
| return mcps | ||
|
|
||
|
|
||
| class SimpleDatasetTagConfig(TransformerSemanticsConfigModel): | ||
| tag_urns: List[str] | ||
| is_container: bool = False | ||
|
|
||
|
|
||
| class SimpleAddDatasetTags(AddDatasetTags): | ||
|
|
@@ -99,6 +149,7 @@ def __init__(self, config: SimpleDatasetTagConfig, ctx: PipelineContext): | |
| get_tags_to_add=lambda _: tags, | ||
| replace_existing=config.replace_existing, | ||
| semantics=config.semantics, | ||
| is_container=config.is_container, | ||
| ) | ||
| super().__init__(generic_config, ctx) | ||
|
|
||
|
|
@@ -110,6 +161,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "SimpleAddDatasetTag | |
|
|
||
| class PatternDatasetTagsConfig(TransformerSemanticsConfigModel): | ||
| tag_pattern: KeyValuePattern = KeyValuePattern.all() | ||
| is_container: bool = False | ||
|
|
||
|
|
||
| class PatternAddDatasetTags(AddDatasetTags): | ||
|
|
@@ -123,6 +175,7 @@ def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext): | |
| ], | ||
| replace_existing=config.replace_existing, | ||
| semantics=config.semantics, | ||
| is_container=config.is_container, | ||
| ) | ||
| super().__init__(generic_config, ctx) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we've used this is_container flag pattern in other places, but frankly I find it really confusing that the dataset tag transformer has a container-related option.
I'd like us to spend a little bit of time thinking about if there is a better way to do this that results in a more logically consistent interface / less confusing experience.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In particular, it's important to me that we continue to reuse as much code as possible around the logic of merging ingestion-produced tags with server-fetch tags. But I don't think stuffing all of that functionality in a dataset transformer is the right approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree with the comment and sentiment. My general feel is that the OOTB transformers, in being limited to datasets, are restrictive. There is functionality within the existing transformers that would make sense to be applied to other entity types (e.g. dashboards, containers etc.). Because of this it feels like the
is_containerfunctionality has generally been added to get around these imposed restrictions on the OOTB transformers. I do feel that this change does at least offer consistency with other transformers, hence raising it.It does feel like an area for us to revisit - what the future state of transformers should be, and how they should be used, and what entity types should be allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you propose as next steps? Is there existing code logic that this should leverage or would you prefer for this PR to start on the creation of this common code?