Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 60 additions & 19 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,18 @@ a tag called `USA-ops-team` and `Canada-marketing` will be added to them respect

### Config Details

| Field | Required | Type | Default | Description |
| ------------------ | -------- | ------------ | ----------- | ------------------------------------------------------------------ |
| `tag_urns` | ✅ | list[string] | | List of globalTags urn. |
| `replace_existing` | | boolean | `false` | Whether to remove globalTags from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
| ------------------ | -------- | ------------ | ----------- | ----------------------------------------------------------------------------------------------------------------------- |
| `tag_urns` | ✅ | list[string] | | List of globalTags urn. |
| `replace_existing` | | boolean | `false` | Whether to remove globalTags from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then tags will be attached to both the dataset and its container. |

Lets suppose wed like to add a set of dataset tags. To do so, we can use the `simple_add_dataset_tags` transformer thats included in the ingestion framework.
Let's suppose we'd like to add a set of dataset tags. To do so, we can use the `simple_add_dataset_tags` transformer that's included in the ingestion framework.

The config, which we’d append to our ingestion recipe YAML, would look like this:
If the is_container field is set to true, the module will not only attach the tags to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified tags.

The config, which we'd append to our ingestion recipe YAML, would look like this:

```yaml
transformers:
Expand Down Expand Up @@ -399,20 +402,34 @@ transformers:
- "urn:li:tag:NeedsDocumentation"
- "urn:li:tag:Legacy"
```
- Add tags to dataset and its containers
```yaml
transformers:
- type: "simple_add_dataset_tags"
config:
is_container: true
semantics: PATCH / OVERWRITE # Based on user
tag_urns:
- "urn:li:tag:NeedsDocumentation"
- "urn:li:tag:Legacy"
```

## Pattern Add Dataset globalTags

### Config Details

| Field | Required | Type | Default | Description |
| ------------------ | -------- | -------------------- | ----------- | ------------------------------------------------------------------------------------- |
| `tag_pattern` | ✅ | map[regx, list[urn]] | | Entity urn with regular expression and list of tags urn apply to matching entity urn. |
| `replace_existing` | | boolean | `false` | Whether to remove globalTags from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
| ------------------ | -------- | -------------------- | ----------- | ----------------------------------------------------------------------------------------------------------------------- |
| `tag_pattern` | ✅ | map[regx, list[urn]] | | Entity urn with regular expression and list of tags urn apply to matching entity urn. |
| `replace_existing` | | boolean | `false` | Whether to remove globalTags from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then tags will be attached to both the dataset and its container. |

Lets suppose wed like to append a series of tags to specific datasets. To do so, we can use the `pattern_add_dataset_tags` module thats included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and assign the respective tags urns given in the array.
Let's suppose we'd like to append a series of tags to specific datasets. To do so, we can use the `pattern_add_dataset_tags` module that's included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and assign the respective tags urns given in the array.

The config, which we’d append to our ingestion recipe YAML, would look like this:
If the is_container field is set to true, the module will not only attach the tags to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified tags.

The config, which we'd append to our ingestion recipe YAML, would look like this:

```yaml
transformers:
Expand Down Expand Up @@ -462,19 +479,34 @@ transformers:
["urn:li:tag:NeedsDocumentation", "urn:li:tag:Legacy"]
".*example2.*": ["urn:li:tag:NeedsDocumentation"]
```
- Add tags to dataset and its containers
```yaml
transformers:
- type: "pattern_add_dataset_tags"
config:
is_container: true
semantics: PATCH
tag_pattern:
rules:
".*example1.*": ["urn:li:tag:Private"]
".*example2.*": ["urn:li:tag:Public"]
```

## Add Dataset globalTags

### Config Details

| Field | Required | Type | Default | Description |
| ------------------ | -------- | ------------------------------------------ | ----------- | -------------------------------------------------------------------------- |
| `get_tags_to_add` | ✅ | callable[[str], list[TagAssociationClass]] | | A function which takes entity urn as input and return TagAssociationClass. |
| `replace_existing` | | boolean | `false` | Whether to remove globalTags from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
| ------------------ | -------- | ------------------------------------------ | ----------- | ----------------------------------------------------------------------------------------------------------------------- |
| `get_tags_to_add` | ✅ | callable[[str], list[TagAssociationClass]] | | A function which takes entity urn as input and return TagAssociationClass. |
| `replace_existing` | | boolean | `false` | Whether to remove globalTags from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then tags will be attached to both the dataset and its container. |

If you'd like to add more complex logic for assigning tags, you can use the more generic add_dataset_tags transformer, which calls a user-provided function to determine the tags for each dataset.

If the is_container field is set to true, the module will not only attach the tags to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified tags.

```yaml
transformers:
- type: "add_dataset_tags"
Expand Down Expand Up @@ -536,6 +568,15 @@ Finally, you can install and use your custom transformer as [shown here](#instal
semantics: PATCH
get_tags_to_add: "<your_module>.<your_function>"
```
- Add tags to dataset and its containers
```yaml
transformers:
- type: "add_dataset_tags"
config:
is_container: true
semantics: PATCH / OVERWRITE # Based on user
get_tags_to_add: "<your_module>.<your_function>"
```

## Set Dataset browsePath

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
from datahub.metadata.schema_classes import (
BrowsePathsV2Class,
GlobalTagsClass,
MetadataChangeProposalClass,
TagAssociationClass,
Expand All @@ -22,6 +23,7 @@

class AddDatasetTagsConfig(TransformerSemanticsConfigModel):
get_tags_to_add: Callable[[str], List[TagAssociationClass]]
is_container: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we've used this is_container flag pattern in other places, but frankly I find it really confusing that the dataset tag transformer has a container-related option.

I'd like us to spend a little bit of time thinking about if there is a better way to do this that results in a more logically consistent interface / less confusing experience.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, it's important to me that we continue to reuse as much code as possible around the logic of merging ingestion-produced tags with server-fetch tags. But I don't think stuffing all of that functionality in a dataset transformer is the right approach.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we've used this is_container flag pattern in other places, but frankly I find it really confusing that the dataset tag transformer has a container-related option.

I'd like us to spend a little bit of time thinking about if there is a better way to do this that results in a more logically consistent interface / less confusing experience.

I do agree with the comment and sentiment. My general feel is that the OOTB transformers, in being limited to datasets, are restrictive. There is functionality within the existing transformers that would make sense to be applied to other entity types (e.g. dashboards, containers etc.). Because of this it feels like the is_container functionality has generally been added to get around these imposed restrictions on the OOTB transformers. I do feel that this change does at least offer consistency with other transformers, hence raising it.
It does feel like an area for us to revisit - what the future state of transformers should be, and how they should be used, and what entity types should be allowed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, it's important to me that we continue to reuse as much code as possible around the logic of merging ingestion-produced tags with server-fetch tags. But I don't think stuffing all of that functionality in a dataset transformer is the right approach.

What would you propose as next steps? Is there existing code logic that this should leverage or would you prefer for this PR to start on the creation of this common code?


_resolve_tag_fn = pydantic_resolve_key("get_tags_to_add")

Expand Down Expand Up @@ -73,6 +75,7 @@ def handle_end_of_stream(

logger.debug("Generating tags")

# Generate tag entities
for tag_association in self.processed_tags.values():
tag_urn = TagUrn.from_string(tag_association.tag)
mcps.append(
Expand All @@ -82,11 +85,58 @@ def handle_end_of_stream(
)
)

# Handle container tags if is_container is enabled
container_tag_mcps: List[MetadataChangeProposalWrapper] = []
container_tag_mapping: Dict[str, List[TagAssociationClass]] = {}

logger.debug("Generating tags for containers")

if self.config.is_container:
for entity_urn, tags_to_add in (
(urn, self.config.get_tags_to_add(urn)) for urn in self.entity_map
):
if not tags_to_add:
continue

assert self.ctx.graph
Copy link

@aikido-pr-checks aikido-pr-checks bot Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dangerous use of assert - low severity
When running Python in production in optimized mode, assert calls are not executed. This mode is enabled by setting the PYTHONOPTIMIZE command line flag. Optimized mode is usually ON in production. Any safety check done using assert will not be executed.

Remediation: Raise an exception instead of using assert.
View details in Aikido Security

browse_paths = self.ctx.graph.get_aspect(entity_urn, BrowsePathsV2Class)
if not browse_paths:
continue

for path in browse_paths.path:
container_urn = path.urn

if not container_urn or not container_urn.startswith(
"urn:li:container:"
):
continue

if container_urn not in container_tag_mapping:
container_tag_mapping[container_urn] = tags_to_add.copy()
else:
# Merge tags, avoiding duplicates
existing_tag_urns = {
tag.tag for tag in container_tag_mapping[container_urn]
}
for tag in tags_to_add:
if tag.tag not in existing_tag_urns:
container_tag_mapping[container_urn].append(tag)

for urn, tags in container_tag_mapping.items():
container_tag_mcps.append(
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=GlobalTagsClass(tags=tags),
)
)

mcps.extend(container_tag_mcps)
return mcps


class SimpleDatasetTagConfig(TransformerSemanticsConfigModel):
tag_urns: List[str]
is_container: bool = False


class SimpleAddDatasetTags(AddDatasetTags):
Expand All @@ -99,6 +149,7 @@ def __init__(self, config: SimpleDatasetTagConfig, ctx: PipelineContext):
get_tags_to_add=lambda _: tags,
replace_existing=config.replace_existing,
semantics=config.semantics,
is_container=config.is_container,
)
super().__init__(generic_config, ctx)

Expand All @@ -110,6 +161,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "SimpleAddDatasetTag

class PatternDatasetTagsConfig(TransformerSemanticsConfigModel):
tag_pattern: KeyValuePattern = KeyValuePattern.all()
is_container: bool = False


class PatternAddDatasetTags(AddDatasetTags):
Expand All @@ -123,6 +175,7 @@ def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext):
],
replace_existing=config.replace_existing,
semantics=config.semantics,
is_container=config.is_container,
)
super().__init__(generic_config, ctx)

Expand Down
Loading
Loading