Skip to content

Commit be8c8bb

Browse files
committed
[dagster-dbt] Spec-ify dbt cloud integration
1 parent 35c9420 commit be8c8bb

File tree

4 files changed

+73
-242
lines changed

4 files changed

+73
-242
lines changed

python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py

Lines changed: 0 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
Sequence,
1515
Set,
1616
Tuple,
17-
cast,
1817
)
1918

2019
from dagster import (
@@ -26,31 +25,23 @@
2625
AssetSelection,
2726
AssetSpec,
2827
AutoMaterializePolicy,
29-
AutomationCondition,
3028
DagsterInvalidDefinitionError,
3129
DagsterInvariantViolationError,
3230
DefaultScheduleStatus,
3331
FreshnessPolicy,
34-
In,
35-
Nothing,
36-
Out,
3732
RunConfig,
3833
ScheduleDefinition,
3934
TableColumn,
4035
TableSchema,
4136
_check as check,
4237
define_asset_job,
4338
)
44-
from dagster._core.definitions.decorators.decorator_assets_definition_builder import (
45-
validate_and_assign_output_names_to_check_specs,
46-
)
4739
from dagster._core.definitions.metadata import TableMetadataSet
4840
from dagster._core.definitions.metadata.source_code import (
4941
CodeReferencesMetadataSet,
5042
CodeReferencesMetadataValue,
5143
LocalFileCodeReference,
5244
)
53-
from dagster._utils.merger import merge_dicts
5445

5546
from dagster_dbt.metadata_set import DbtMetadataSet
5647
from dagster_dbt.utils import (
@@ -893,137 +884,6 @@ def _validate_asset_keys(
893884
)
894885

895886

896-
def get_asset_deps(
897-
dbt_nodes,
898-
deps,
899-
io_manager_key,
900-
manifest: Optional[Mapping[str, Any]],
901-
dagster_dbt_translator: "DagsterDbtTranslator",
902-
) -> Tuple[
903-
Dict[AssetKey, Set[AssetKey]],
904-
Dict[AssetKey, Tuple[str, In]],
905-
Dict[AssetKey, Tuple[str, Out]],
906-
Dict[AssetKey, str],
907-
Dict[AssetKey, FreshnessPolicy],
908-
Dict[AssetKey, AutomationCondition],
909-
Dict[str, AssetCheckSpec],
910-
Dict[str, List[str]],
911-
Dict[str, Dict[str, Any]],
912-
]:
913-
from dagster_dbt.dagster_dbt_translator import DbtManifestWrapper, validate_translator
914-
915-
dagster_dbt_translator = validate_translator(dagster_dbt_translator)
916-
917-
asset_deps: Dict[AssetKey, Set[AssetKey]] = {}
918-
asset_ins: Dict[AssetKey, Tuple[str, In]] = {}
919-
asset_outs: Dict[AssetKey, Tuple[str, Out]] = {}
920-
921-
# These dicts could be refactored as a single dict, mapping from output name to arbitrary
922-
# metadata that we need to store for reference.
923-
group_names_by_key: Dict[AssetKey, str] = {}
924-
freshness_policies_by_key: Dict[AssetKey, FreshnessPolicy] = {}
925-
automation_conditions_by_key: Dict[AssetKey, AutomationCondition] = {}
926-
check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {}
927-
fqns_by_output_name: Dict[str, List[str]] = {}
928-
metadata_by_output_name: Dict[str, Dict[str, Any]] = {}
929-
930-
for unique_id, parent_unique_ids in deps.items():
931-
dbt_resource_props = dbt_nodes[unique_id]
932-
933-
output_name = dagster_name_fn(dbt_resource_props)
934-
fqns_by_output_name[output_name] = dbt_resource_props["fqn"]
935-
936-
metadata_by_output_name[output_name] = {
937-
key: dbt_resource_props[key] for key in ["unique_id", "resource_type"]
938-
}
939-
940-
asset_key = dagster_dbt_translator.get_asset_key(dbt_resource_props)
941-
942-
asset_deps[asset_key] = set()
943-
944-
metadata = merge_dicts(
945-
dagster_dbt_translator.get_metadata(dbt_resource_props),
946-
{
947-
DAGSTER_DBT_MANIFEST_METADATA_KEY: DbtManifestWrapper(manifest=manifest)
948-
if manifest
949-
else None,
950-
DAGSTER_DBT_TRANSLATOR_METADATA_KEY: dagster_dbt_translator,
951-
},
952-
)
953-
asset_outs[asset_key] = (
954-
output_name,
955-
Out(
956-
io_manager_key=io_manager_key,
957-
description=dagster_dbt_translator.get_description(dbt_resource_props),
958-
metadata=metadata,
959-
is_required=False,
960-
dagster_type=Nothing,
961-
code_version=dagster_dbt_translator.get_code_version(dbt_resource_props),
962-
),
963-
)
964-
965-
group_name = dagster_dbt_translator.get_group_name(dbt_resource_props)
966-
if group_name is not None:
967-
group_names_by_key[asset_key] = group_name
968-
969-
freshness_policy = dagster_dbt_translator.get_freshness_policy(dbt_resource_props)
970-
if freshness_policy is not None:
971-
freshness_policies_by_key[asset_key] = freshness_policy
972-
973-
automation_condition = dagster_dbt_translator.get_automation_condition(dbt_resource_props)
974-
if automation_condition is not None:
975-
automation_conditions_by_key[asset_key] = automation_condition
976-
977-
test_unique_ids = []
978-
if manifest:
979-
test_unique_ids = [
980-
child_unique_id
981-
for child_unique_id in manifest["child_map"][unique_id]
982-
if child_unique_id.startswith("test")
983-
]
984-
985-
for test_unique_id in test_unique_ids:
986-
check_spec = default_asset_check_fn(
987-
manifest,
988-
dbt_nodes,
989-
dagster_dbt_translator,
990-
asset_key,
991-
test_unique_id,
992-
)
993-
if check_spec:
994-
check_specs_by_key[check_spec.key] = check_spec
995-
996-
for parent_unique_id in parent_unique_ids:
997-
parent_node_info = dbt_nodes[parent_unique_id]
998-
parent_asset_key = dagster_dbt_translator.get_asset_key(parent_node_info)
999-
1000-
asset_deps[asset_key].add(parent_asset_key)
1001-
1002-
# if this parent is not one of the selected nodes, it's an input
1003-
if parent_unique_id not in deps:
1004-
input_name = dagster_name_fn(parent_node_info)
1005-
asset_ins[parent_asset_key] = (input_name, In(Nothing))
1006-
1007-
check_specs_by_output_name = cast(
1008-
Dict[str, AssetCheckSpec],
1009-
validate_and_assign_output_names_to_check_specs(
1010-
list(check_specs_by_key.values()), list(asset_outs.keys())
1011-
),
1012-
)
1013-
1014-
return (
1015-
asset_deps,
1016-
asset_ins,
1017-
asset_outs,
1018-
group_names_by_key,
1019-
freshness_policies_by_key,
1020-
automation_conditions_by_key,
1021-
check_specs_by_output_name,
1022-
fqns_by_output_name,
1023-
metadata_by_output_name,
1024-
)
1025-
1026-
1027887
def has_self_dependency(dbt_resource_props: Mapping[str, Any]) -> bool:
1028888
dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {})
1029889
has_self_dependency = dagster_metadata.get("has_self_dependency", False)

0 commit comments

Comments
 (0)