Skip to content

Commit

Permalink
[dagster-dbt][refactor] Clean up multi-asset creation
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Dec 23, 2024
1 parent 9ce5277 commit 35c9420
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def to_python_identifier(self, suffix: Optional[str] = None) -> str:
if suffix is not None:
path.append(suffix)

return "__".join(path).replace("-", "_")
return "__".join(path).replace("-", "_").replace(".", "_")

@staticmethod
def from_user_string(asset_key_string: str) -> "AssetKey":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dagster_dbt.asset_utils import (
DAGSTER_DBT_EXCLUDE_METADATA_KEY,
DAGSTER_DBT_SELECT_METADATA_KEY,
build_dbt_multi_asset_args,
build_dbt_specs,
)
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, validate_translator
from dagster_dbt.dbt_manifest import DbtManifestParam, validate_manifest
Expand Down Expand Up @@ -302,14 +302,9 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource
dagster_dbt_translator = validate_translator(dagster_dbt_translator or DagsterDbtTranslator())
manifest = validate_manifest(manifest)

(
deps,
outs,
internal_asset_deps,
check_specs,
) = build_dbt_multi_asset_args(
specs, check_specs = build_dbt_specs(
translator=dagster_dbt_translator,
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
select=select,
exclude=exclude or "",
io_manager_key=io_manager_key,
Expand Down Expand Up @@ -342,15 +337,13 @@ def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource
backfill_policy = BackfillPolicy.single_run()

return multi_asset(
outs=outs,
name=name,
internal_asset_deps=internal_asset_deps,
deps=deps,
specs=specs,
check_specs=check_specs,
can_subset=True,
required_resource_keys=required_resource_keys,
partitions_def=partitions_def,
can_subset=True,
op_tags=resolved_op_tags,
check_specs=check_specs,
backfill_policy=backfill_policy,
retry_policy=retry_policy,
)
32 changes: 7 additions & 25 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_specs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Optional, Sequence

import dagster._check as check
from dagster import AssetDep, AssetKey, AssetSpec
from dagster import AssetSpec
from dagster._annotations import experimental
from dagster._core.storage.tags import KIND_PREFIX

from dagster_dbt.asset_utils import build_dbt_multi_asset_args
from dagster_dbt.asset_utils import build_dbt_specs
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, validate_translator
from dagster_dbt.dbt_manifest import DbtManifestParam, validate_manifest
from dagster_dbt.dbt_project import DbtProject
Expand Down Expand Up @@ -43,33 +41,17 @@ def build_dbt_asset_specs(
manifest = validate_manifest(manifest)
dagster_dbt_translator = validate_translator(dagster_dbt_translator or DagsterDbtTranslator())

(
_,
outs,
internal_asset_deps,
_,
) = build_dbt_multi_asset_args(
specs, _ = build_dbt_specs(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator,
translator=dagster_dbt_translator,
select=select,
exclude=exclude or "",
io_manager_key=None,
project=project,
)

specs = [
asset_out.to_spec(
key=check.inst(asset_out.key, AssetKey),
deps=[AssetDep(asset=dep) for dep in internal_asset_deps.get(output_name, set())],
additional_tags={f"{KIND_PREFIX}dbt": ""},
partitions_def=None,
)
return [
# Allow specs to be represented as external assets by adhering to external asset invariants.
._replace(
skippable=False,
code_version=None,
)
for output_name, asset_out in outs.items()
spec.replace_attributes(skippable=False, code_version=None)
for spec in specs
]

return specs
Loading

0 comments on commit 35c9420

Please sign in to comment.