Skip to content

Commit

Permalink
[dagster-dbt] Spec-ify dbt cloud integration
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Dec 30, 2024
1 parent 1feff71 commit ca1bbf9
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 242 deletions.
140 changes: 0 additions & 140 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
Sequence,
Set,
Tuple,
cast,
)

from dagster import (
Expand All @@ -26,31 +25,23 @@
AssetSelection,
AssetSpec,
AutoMaterializePolicy,
AutomationCondition,
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
DefaultScheduleStatus,
FreshnessPolicy,
In,
Nothing,
Out,
RunConfig,
ScheduleDefinition,
TableColumn,
TableSchema,
_check as check,
define_asset_job,
)
from dagster._core.definitions.decorators.decorator_assets_definition_builder import (
validate_and_assign_output_names_to_check_specs,
)
from dagster._core.definitions.metadata import TableMetadataSet
from dagster._core.definitions.metadata.source_code import (
CodeReferencesMetadataSet,
CodeReferencesMetadataValue,
LocalFileCodeReference,
)
from dagster._utils.merger import merge_dicts

from dagster_dbt.metadata_set import DbtMetadataSet
from dagster_dbt.utils import (
Expand Down Expand Up @@ -893,137 +884,6 @@ def _validate_asset_keys(
)


def get_asset_deps(
dbt_nodes,
deps,
io_manager_key,
manifest: Optional[Mapping[str, Any]],
dagster_dbt_translator: "DagsterDbtTranslator",
) -> Tuple[
Dict[AssetKey, Set[AssetKey]],
Dict[AssetKey, Tuple[str, In]],
Dict[AssetKey, Tuple[str, Out]],
Dict[AssetKey, str],
Dict[AssetKey, FreshnessPolicy],
Dict[AssetKey, AutomationCondition],
Dict[str, AssetCheckSpec],
Dict[str, List[str]],
Dict[str, Dict[str, Any]],
]:
from dagster_dbt.dagster_dbt_translator import DbtManifestWrapper, validate_translator

dagster_dbt_translator = validate_translator(dagster_dbt_translator)

asset_deps: Dict[AssetKey, Set[AssetKey]] = {}
asset_ins: Dict[AssetKey, Tuple[str, In]] = {}
asset_outs: Dict[AssetKey, Tuple[str, Out]] = {}

# These dicts could be refactored as a single dict, mapping from output name to arbitrary
# metadata that we need to store for reference.
group_names_by_key: Dict[AssetKey, str] = {}
freshness_policies_by_key: Dict[AssetKey, FreshnessPolicy] = {}
automation_conditions_by_key: Dict[AssetKey, AutomationCondition] = {}
check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {}
fqns_by_output_name: Dict[str, List[str]] = {}
metadata_by_output_name: Dict[str, Dict[str, Any]] = {}

for unique_id, parent_unique_ids in deps.items():
dbt_resource_props = dbt_nodes[unique_id]

output_name = dagster_name_fn(dbt_resource_props)
fqns_by_output_name[output_name] = dbt_resource_props["fqn"]

metadata_by_output_name[output_name] = {
key: dbt_resource_props[key] for key in ["unique_id", "resource_type"]
}

asset_key = dagster_dbt_translator.get_asset_key(dbt_resource_props)

asset_deps[asset_key] = set()

metadata = merge_dicts(
dagster_dbt_translator.get_metadata(dbt_resource_props),
{
DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest)
if manifest
else None,
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
},
)
asset_outs[asset_key] = (
output_name,
Out(
io_manager_key=io_manager_key,
description=dagster_dbt_translator.get_description(dbt_resource_props),
metadata=metadata,
is_required=False,
dagster_type=Nothing,
code_version=dagster_dbt_translator.get_code_version(dbt_resource_props),
),
)

group_name = dagster_dbt_translator.get_group_name(dbt_resource_props)
if group_name is not None:
group_names_by_key[asset_key] = group_name

freshness_policy = dagster_dbt_translator.get_freshness_policy(dbt_resource_props)
if freshness_policy is not None:
freshness_policies_by_key[asset_key] = freshness_policy

automation_condition = dagster_dbt_translator.get_automation_condition(dbt_resource_props)
if automation_condition is not None:
automation_conditions_by_key[asset_key] = automation_condition

test_unique_ids = []
if manifest:
test_unique_ids = [
child_unique_id
for child_unique_id in manifest["child_map"][unique_id]
if child_unique_id.startswith("test")
]

for test_unique_id in test_unique_ids:
check_spec = default_asset_check_fn(
manifest,
dbt_nodes,
dagster_dbt_translator,
asset_key,
test_unique_id,
)
if check_spec:
check_specs_by_key[check_spec.key] = check_spec

for parent_unique_id in parent_unique_ids:
parent_node_info = dbt_nodes[parent_unique_id]
parent_asset_key = dagster_dbt_translator.get_asset_key(parent_node_info)

asset_deps[asset_key].add(parent_asset_key)

# if this parent is not one of the selected nodes, it's an input
if parent_unique_id not in deps:
input_name = dagster_name_fn(parent_node_info)
asset_ins[parent_asset_key] = (input_name, In(Nothing))

check_specs_by_output_name = cast(
Dict[str, AssetCheckSpec],
validate_and_assign_output_names_to_check_specs(
list(check_specs_by_key.values()), list(asset_outs.keys())
),
)

return (
asset_deps,
asset_ins,
asset_outs,
group_names_by_key,
freshness_policies_by_key,
automation_conditions_by_key,
check_specs_by_output_name,
fqns_by_output_name,
metadata_by_output_name,
)


def has_self_dependency(dbt_resource_props: Mapping[str, Any]) -> bool:
dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {})
has_self_dependency = dagster_metadata.get("has_self_dependency", False)
Expand Down
Loading

0 comments on commit ca1bbf9

Please sign in to comment.