Skip to content

Commit dee20d5

Browse files
committed
AssetSpec.partitions_def
1 parent ba81226 commit dee20d5

File tree

12 files changed

+300
-143
lines changed

12 files changed

+300
-143
lines changed

python_modules/dagster/dagster/_core/definitions/asset_graph.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ def owners(self) -> Sequence[str]:
8989

9090
@property
9191
def is_partitioned(self) -> bool:
92-
return self.assets_def.partitions_def is not None
92+
return self.partitions_def is not None
9393

9494
@property
9595
def partitions_def(self) -> Optional[PartitionsDefinition]:
96-
return self.assets_def.partitions_def
96+
return self.assets_def.specs_by_key[self.key].partitions_def
9797

9898
@property
9999
def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:

python_modules/dagster/dagster/_core/definitions/asset_out.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from dagster._core.definitions.freshness_policy import FreshnessPolicy
1818
from dagster._core.definitions.input import NoValueSentinel
1919
from dagster._core.definitions.output import Out
20+
from dagster._core.definitions.partition import PartitionsDefinition
2021
from dagster._core.definitions.utils import (
2122
DEFAULT_IO_MANAGER_KEY,
2223
resolve_automation_condition,
@@ -148,7 +149,12 @@ def to_out(self) -> Out:
148149
code_version=self.code_version,
149150
)
150151

151-
def to_spec(self, key: AssetKey, deps: Sequence[AssetDep]) -> AssetSpec:
152+
def to_spec(
153+
self,
154+
key: AssetKey,
155+
deps: Sequence[AssetDep],
156+
partitions_def: Optional[PartitionsDefinition],
157+
) -> AssetSpec:
152158
with disable_dagster_warnings():
153159
return AssetSpec.dagster_internal_init(
154160
key=key,
@@ -163,7 +169,7 @@ def to_spec(self, key: AssetKey, deps: Sequence[AssetDep]) -> AssetSpec:
163169
tags=self.tags,
164170
deps=deps,
165171
auto_materialize_policy=None,
166-
partitions_def=None,
172+
partitions_def=partitions_def,
167173
)
168174

169175
@property

python_modules/dagster/dagster/_core/definitions/assets.py

Lines changed: 62 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
AssetSpec,
3737
)
3838
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
39-
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
39+
from dagster._core.definitions.backfill_policy import BackfillPolicy
4040
from dagster._core.definitions.declarative_automation.automation_condition import (
4141
AutomationCondition,
4242
)
@@ -103,9 +103,9 @@ class AssetsDefinition(ResourceAddable, IHasInternalInit):
103103
"descriptions_by_key",
104104
"asset_deps",
105105
"owners_by_key",
106+
"partitions_def",
106107
}
107108

108-
_partitions_def: Optional[PartitionsDefinition]
109109
# partition mappings are also tracked inside the AssetSpecs, but this enables faster access by
110110
# upstream asset key
111111
_partition_mappings: Mapping[AssetKey, PartitionMapping]
@@ -227,24 +227,10 @@ def __init__(
227227
execution_type=execution_type or AssetExecutionType.MATERIALIZATION,
228228
)
229229

230-
self._partitions_def = _resolve_partitions_def(specs, partitions_def)
231-
232230
self._resource_defs = wrap_resources_for_execution(
233231
check.opt_mapping_param(resource_defs, "resource_defs")
234232
)
235233

236-
if self._partitions_def is None:
237-
# check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned
238-
check.param_invariant(
239-
(
240-
backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN
241-
if backfill_policy
242-
else True
243-
),
244-
"backfill_policy",
245-
"Non partitioned asset can only have single run backfill policy",
246-
)
247-
248234
if specs is not None:
249235
check.invariant(group_names_by_key is None)
250236
check.invariant(metadata_by_key is None)
@@ -256,6 +242,7 @@ def __init__(
256242
check.invariant(owners_by_key is None)
257243
check.invariant(partition_mappings is None)
258244
check.invariant(asset_deps is None)
245+
check.invariant(partitions_def is None)
259246
resolved_specs = specs
260247

261248
else:
@@ -295,6 +282,7 @@ def __init__(
295282
metadata_by_key=metadata_by_key,
296283
descriptions_by_key=descriptions_by_key,
297284
code_versions_by_key=None,
285+
partitions_def=partitions_def,
298286
)
299287

300288
normalized_specs: List[AssetSpec] = []
@@ -331,11 +319,11 @@ def __init__(
331319
check.invariant(
332320
not (
333321
spec.freshness_policy
334-
and self._partitions_def is not None
335-
and not isinstance(self._partitions_def, TimeWindowPartitionsDefinition)
322+
and spec.partitions_def is not None
323+
and not isinstance(spec.partitions_def, TimeWindowPartitionsDefinition)
336324
),
337325
"FreshnessPolicies are currently unsupported for assets with partitions of type"
338-
f" {type(self._partitions_def)}.",
326+
f" {spec.partitions_def}.",
339327
)
340328

341329
normalized_specs.append(
@@ -345,10 +333,19 @@ def __init__(
345333
metadata=metadata,
346334
description=description,
347335
skippable=skippable,
348-
partitions_def=self._partitions_def,
349336
)
350337
)
351338

339+
unique_partitions_defs = {
340+
spec.partitions_def for spec in normalized_specs if spec.partitions_def is not None
341+
}
342+
if len(unique_partitions_defs) > 1 and not can_subset:
343+
raise DagsterInvalidDefinitionError(
344+
"If different AssetSpecs have different partitions_defs, can_subset must be True"
345+
)
346+
347+
_validate_self_deps(normalized_specs)
348+
352349
self._specs_by_key = {spec.key: spec for spec in normalized_specs}
353350

354351
self._partition_mappings = get_partition_mappings_from_deps(
@@ -361,27 +358,11 @@ def __init__(
361358
spec.key: spec for spec in self._check_specs_by_output_name.values()
362359
}
363360

364-
if self._computation:
365-
_validate_self_deps(
366-
input_keys=[
367-
key
368-
# filter out the special inputs which are used for cases when a multi-asset is
369-
# subsetted, as these are not the same as self-dependencies and are never loaded
370-
# in the same step that their corresponding output is produced
371-
for input_name, key in self._computation.keys_by_input_name.items()
372-
if not input_name.startswith(ASSET_SUBSET_INPUT_PREFIX)
373-
],
374-
output_keys=self._computation.selected_asset_keys,
375-
partition_mappings=self._partition_mappings,
376-
partitions_def=self._partitions_def,
377-
)
378-
379361
def dagster_internal_init(
380362
*,
381363
keys_by_input_name: Mapping[str, AssetKey],
382364
keys_by_output_name: Mapping[str, AssetKey],
383365
node_def: NodeDefinition,
384-
partitions_def: Optional[PartitionsDefinition],
385366
selected_asset_keys: Optional[AbstractSet[AssetKey]],
386367
can_subset: bool,
387368
resource_defs: Optional[Mapping[str, object]],
@@ -398,7 +379,6 @@ def dagster_internal_init(
398379
keys_by_input_name=keys_by_input_name,
399380
keys_by_output_name=keys_by_output_name,
400381
node_def=node_def,
401-
partitions_def=partitions_def,
402382
selected_asset_keys=selected_asset_keys,
403383
can_subset=can_subset,
404384
resource_defs=resource_defs,
@@ -769,17 +749,13 @@ def _output_dict_to_asset_dict(
769749
metadata_by_key=_output_dict_to_asset_dict(metadata_by_output_name),
770750
descriptions_by_key=_output_dict_to_asset_dict(descriptions_by_output_name),
771751
code_versions_by_key=_output_dict_to_asset_dict(code_versions_by_output_name),
752+
partitions_def=partitions_def,
772753
)
773754

774755
return AssetsDefinition.dagster_internal_init(
775756
keys_by_input_name=keys_by_input_name,
776757
keys_by_output_name=keys_by_output_name_with_prefix,
777758
node_def=node_def,
778-
partitions_def=check.opt_inst_param(
779-
partitions_def,
780-
"partitions_def",
781-
PartitionsDefinition,
782-
),
783759
resource_defs=resource_defs,
784760
backfill_policy=check.opt_inst_param(
785761
backfill_policy, "backfill_policy", BackfillPolicy
@@ -1038,10 +1014,20 @@ def backfill_policy(self) -> Optional[BackfillPolicy]:
10381014
return self._computation.backfill_policy if self._computation else None
10391015

10401016
@public
1041-
@property
1017+
@cached_property
10421018
def partitions_def(self) -> Optional[PartitionsDefinition]:
10431019
"""Optional[PartitionsDefinition]: The PartitionsDefinition for this AssetsDefinition (if any)."""
1044-
return self._partitions_def
1020+
partitions_defs = {
1021+
spec.partitions_def for spec in self.specs if spec.partitions_def is not None
1022+
}
1023+
if len(partitions_defs) == 1:
1024+
return next(iter(partitions_defs))
1025+
elif len(partitions_defs) == 0:
1026+
return None
1027+
else:
1028+
check.failed(
1029+
"Different assets within this AssetsDefinition have different PartitionsDefinitions"
1030+
)
10451031

10461032
@property
10471033
def metadata_by_key(self) -> Mapping[AssetKey, ArbitraryMetadataMapping]:
@@ -1132,12 +1118,17 @@ def get_partition_mapping_for_dep(self, dep_key: AssetKey) -> Optional[Partition
11321118
return self._partition_mappings.get(dep_key)
11331119

11341120
def infer_partition_mapping(
1135-
self, upstream_asset_key: AssetKey, upstream_partitions_def: Optional[PartitionsDefinition]
1121+
self,
1122+
asset_key: AssetKey,
1123+
upstream_asset_key: AssetKey,
1124+
upstream_partitions_def: Optional[PartitionsDefinition],
11361125
) -> PartitionMapping:
11371126
with disable_dagster_warnings():
11381127
partition_mapping = self._partition_mappings.get(upstream_asset_key)
11391128
return infer_partition_mapping(
1140-
partition_mapping, self._partitions_def, upstream_partitions_def
1129+
partition_mapping,
1130+
self.specs_by_key[asset_key].partitions_def,
1131+
upstream_partitions_def,
11411132
)
11421133

11431134
def get_output_name_for_asset_key(self, key: AssetKey) -> str:
@@ -1396,7 +1387,7 @@ def _output_to_source_asset(self, output_name: str) -> SourceAsset:
13961387
io_manager_key=output_def.io_manager_key,
13971388
description=spec.description,
13981389
resource_defs=self.resource_defs,
1399-
partitions_def=self.partitions_def,
1390+
partitions_def=spec.partitions_def,
14001391
group_name=spec.group_name,
14011392
tags=spec.tags,
14021393
)
@@ -1495,7 +1486,6 @@ def get_attributes_dict(self) -> Dict[str, Any]:
14951486
keys_by_input_name=self.node_keys_by_input_name,
14961487
keys_by_output_name=self.node_keys_by_output_name,
14971488
node_def=self._computation.node_def if self._computation else None,
1498-
partitions_def=self._partitions_def,
14991489
selected_asset_keys=self.keys,
15001490
can_subset=self.can_subset,
15011491
resource_defs=self._resource_defs,
@@ -1691,6 +1681,7 @@ def _asset_specs_from_attr_key_params(
16911681
code_versions_by_key: Optional[Mapping[AssetKey, str]],
16921682
descriptions_by_key: Optional[Mapping[AssetKey, str]],
16931683
owners_by_key: Optional[Mapping[AssetKey, Sequence[str]]],
1684+
partitions_def: Optional[PartitionsDefinition],
16941685
) -> Sequence[AssetSpec]:
16951686
validated_group_names_by_key = check.opt_mapping_param(
16961687
group_names_by_key, "group_names_by_key", key_type=AssetKey, value_type=str
@@ -1763,40 +1754,36 @@ def _asset_specs_from_attr_key_params(
17631754
# NodeDefinition
17641755
skippable=False,
17651756
auto_materialize_policy=None,
1766-
partitions_def=None,
1757+
partitions_def=check.opt_inst_param(
1758+
partitions_def, "partitions_def", PartitionsDefinition
1759+
),
17671760
)
17681761
)
17691762

17701763
return result
17711764

17721765

1773-
def _validate_self_deps(
1774-
input_keys: Iterable[AssetKey],
1775-
output_keys: Iterable[AssetKey],
1776-
partition_mappings: Mapping[AssetKey, PartitionMapping],
1777-
partitions_def: Optional[PartitionsDefinition],
1778-
) -> None:
1779-
output_keys_set = set(output_keys)
1780-
for input_key in input_keys:
1781-
if input_key in output_keys_set:
1782-
if input_key in partition_mappings:
1783-
partition_mapping = partition_mappings[input_key]
1784-
time_window_partition_mapping = get_self_dep_time_window_partition_mapping(
1785-
partition_mapping, partitions_def
1786-
)
1787-
if (
1788-
time_window_partition_mapping is not None
1789-
and (time_window_partition_mapping.start_offset or 0) < 0
1790-
and (time_window_partition_mapping.end_offset or 0) < 0
1791-
):
1792-
continue
1766+
def _validate_self_deps(specs: Iterable[AssetSpec]) -> None:
1767+
for spec in specs:
1768+
for dep in spec.deps:
1769+
if dep.asset_key == spec.key:
1770+
if dep.partition_mapping:
1771+
time_window_partition_mapping = get_self_dep_time_window_partition_mapping(
1772+
dep.partition_mapping, spec.partitions_def
1773+
)
1774+
if (
1775+
time_window_partition_mapping is not None
1776+
and (time_window_partition_mapping.start_offset or 0) < 0
1777+
and (time_window_partition_mapping.end_offset or 0) < 0
1778+
):
1779+
continue
17931780

1794-
raise DagsterInvalidDefinitionError(
1795-
f'Asset "{input_key.to_user_string()}" depends on itself. Assets can only depend'
1796-
" on themselves if they are:\n(a) time-partitioned and each partition depends on"
1797-
" earlier partitions\n(b) multipartitioned, with one time dimension that depends"
1798-
" on earlier time partitions"
1799-
)
1781+
raise DagsterInvalidDefinitionError(
1782+
f'Asset "{spec.key.to_user_string()}" depends on itself. Assets can only depend'
1783+
" on themselves if they are:\n(a) time-partitioned and each partition depends on"
1784+
" earlier partitions\n(b) multipartitioned, with one time dimension that depends"
1785+
" on earlier time partitions"
1786+
)
18001787

18011788

18021789
def get_self_dep_time_window_partition_mapping(
@@ -1824,38 +1811,6 @@ def get_self_dep_time_window_partition_mapping(
18241811
return None
18251812

18261813

1827-
def _resolve_partitions_def(
1828-
specs: Optional[Sequence[AssetSpec]], partitions_def: Optional[PartitionsDefinition]
1829-
) -> Optional[PartitionsDefinition]:
1830-
if specs:
1831-
asset_keys_by_partitions_def = defaultdict(set)
1832-
for spec in specs:
1833-
asset_keys_by_partitions_def[spec.partitions_def].add(spec.key)
1834-
if len(asset_keys_by_partitions_def) > 1:
1835-
partition_1_asset_keys, partition_2_asset_keys, *_ = (
1836-
asset_keys_by_partitions_def.values()
1837-
)
1838-
check.failed(
1839-
f"All AssetSpecs must have the same partitions_def, but "
1840-
f"{next(iter(partition_1_asset_keys)).to_user_string()} and "
1841-
f"{next(iter(partition_2_asset_keys)).to_user_string()} have different "
1842-
"partitions_defs."
1843-
)
1844-
common_partitions_def = next(iter(asset_keys_by_partitions_def.keys()))
1845-
if (
1846-
common_partitions_def is not None
1847-
and partitions_def is not None
1848-
and common_partitions_def != partitions_def
1849-
):
1850-
check.failed(
1851-
f"AssetSpec for {next(iter(specs)).key.to_user_string()} has partitions_def which is different "
1852-
"than the partitions_def provided to AssetsDefinition.",
1853-
)
1854-
return partitions_def or common_partitions_def
1855-
else:
1856-
return partitions_def
1857-
1858-
18591814
def get_partition_mappings_from_deps(
18601815
partition_mappings: Dict[AssetKey, PartitionMapping], deps: Iterable[AssetDep], asset_name: str
18611816
) -> Mapping[AssetKey, PartitionMapping]:

0 commit comments

Comments
 (0)