Skip to content

Commit

Permalink
[dagster-dbt][refactor] Clean up multi-asset creation
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Dec 19, 2024
1 parent 69492b9 commit 1247848
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 48 deletions.
68 changes: 20 additions & 48 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
AssetCheckSpec,
AssetDep,
AssetKey,
AssetOut,
AssetsDefinition,
AssetSelection,
AutoMaterializePolicy,
Expand All @@ -41,6 +40,7 @@
_check as check,
define_asset_job,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.decorators.decorator_assets_definition_builder import (
validate_and_assign_output_names_to_check_specs,
)
Expand All @@ -50,10 +50,8 @@
CodeReferencesMetadataValue,
LocalFileCodeReference,
)
from dagster._core.definitions.tags import build_kind_tag
from dagster._utils.merger import merge_dicts

from dagster_dbt.asset_specs import AssetSpec
from dagster_dbt.metadata_set import DbtMetadataSet
from dagster_dbt.utils import (
ASSET_RESOURCE_TYPES,
Expand Down Expand Up @@ -741,8 +739,7 @@ def build_dbt_multi_asset_args(
io_manager_key: Optional[str],
project: Optional["DbtProject"],
) -> Tuple[
Sequence[AssetDep],
Dict[str, AssetOut],
Sequence[AssetSpec],
Dict[str, Set[AssetKey]],
Sequence[AssetCheckSpec],
]:
Expand All @@ -760,9 +757,6 @@ def build_dbt_multi_asset_args(
asset_resource_types=ASSET_RESOURCE_TYPES,
)

deps: Dict[AssetKey, AssetDep] = {}
outs: Dict[str, AssetOut] = {}
internal_asset_deps: Dict[str, Set[AssetKey]] = {}
check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {}

dbt_unique_id_and_resource_types_by_asset_key: Dict[AssetKey, Tuple[Set[str], Set[str]]] = {}
Expand All @@ -771,6 +765,7 @@ def build_dbt_multi_asset_args(
for dbt_group_resource_props in manifest["groups"].values()
}

partition_mapped_deps = []
dbt_adapter_type = manifest.get("metadata", {}).get("adapter_type")

for unique_id, parent_unique_ids in dbt_unique_id_deps.items():
Expand All @@ -782,63 +777,41 @@ def build_dbt_multi_asset_args(
)

output_name = dagster_name_fn(dbt_resource_props)
asset_key = dagster_dbt_translator.get_asset_key(dbt_resource_props)

spec = dagster_dbt_translator.get_spec(dbt_resource_props)
asset_key = spec.key

unique_ids_for_asset_key, resource_types_for_asset_key = (
dbt_unique_id_and_resource_types_by_asset_key.setdefault(asset_key, (set(), set()))
)
unique_ids_for_asset_key.add(unique_id)
resource_types_for_asset_key.add(dbt_resource_props["resource_type"])

metadata = {
**dagster_dbt_translator.get_metadata(dbt_resource_props),
DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest),
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
}
# add pointers to the dbt manifest and translator
spec = spec.merge_attributes(
metadata={
DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest),
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
}
)
if dagster_dbt_translator.settings.enable_code_references:
if not project:
raise DagsterInvalidDefinitionError(
"enable_code_references requires a DbtProject to be supplied"
" to the @dbt_assets decorator."
)

metadata = _attach_sql_model_code_reference(
existing_metadata=metadata,
dbt_resource_props=dbt_resource_props,
project=project,
spec.replace_attributes(
metadata=_attach_sql_model_code_reference(
existing_metadata=spec.metadata,
dbt_resource_props=dbt_resource_props,
project=project,
)
)

spec = AssetSpec(
key=asset_key,
description=dagster_dbt_translator.get_description(dbt_resource_props),
metadata=metadata,
owners=dagster_dbt_translator.get_owners(
{
**dbt_resource_props,
**({"group": dbt_group_resource_props} if dbt_group_resource_props else {}),
}
),
tags={
**build_kind_tag("dbt"),
**(build_kind_tag(dbt_adapter_type) if dbt_adapter_type else {}),
**dagster_dbt_translator.get_tags(dbt_resource_props),
},
group_name=dagster_dbt_translator.get_group_name(dbt_resource_props),
code_version=dagster_dbt_translator.get_code_version(dbt_resource_props),
freshness_policy=dagster_dbt_translator.get_freshness_policy(dbt_resource_props),
automation_condition=dagster_dbt_translator.get_automation_condition(
dbt_resource_props
),
)
if io_manager_key:
spec = spec.with_io_manager_key(io_manager_key)

outs[output_name] = AssetOut.from_spec(
spec=spec,
dagster_type=Nothing,
is_required=False,
)

test_unique_ids = [
child_unique_id
for child_unique_id in manifest["child_map"][unique_id]
Expand All @@ -856,7 +829,7 @@ def build_dbt_multi_asset_args(
check_specs_by_key[check_spec.key] = check_spec

# Translate parent unique ids to dependencies
output_internal_deps = internal_asset_deps.setdefault(output_name, set())
deps = []
for parent_unique_id in parent_unique_ids:
dbt_parent_resource_props = dbt_resource_props_by_dbt_unique_id[parent_unique_id]
parent_asset_key = dagster_dbt_translator.get_asset_key(dbt_parent_resource_props)
Expand All @@ -874,7 +847,6 @@ def build_dbt_multi_asset_args(
parent_resource_types_for_asset_key.add(dbt_parent_resource_props["resource_type"])

# Add this parent as an internal dependency
output_internal_deps.add(parent_asset_key)

# Mark this parent as an input if it has no dependencies
if parent_unique_id not in dbt_unique_id_deps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
_check as check,
)
from dagster._annotations import experimental, public
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._utils.tags import is_valid_tag_key

from dagster_dbt.asset_utils import (
Expand Down Expand Up @@ -520,6 +521,20 @@ def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Opt
auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None
)

def get_spec(self, dbt_resource_props: Mapping[str, Any]) -> AssetSpec:
return AssetSpec(
key=self.get_asset_key(dbt_resource_props),
deps=...,
description=self.get_description(dbt_resource_props),
metadata=self.get_metadata(dbt_resource_props),
skippable=True,
group_name=self.get_group_name(dbt_resource_props),
code_version=self.get_code_version(dbt_resource_props),
automation_condition=self.get_automation_condition(dbt_resource_props),
owners=self.get_owners(dbt_resource_props),
tags=self.get_tags(dbt_resource_props),
)


@dataclass
class DbtManifestWrapper:
Expand Down

0 comments on commit 1247848

Please sign in to comment.