Skip to content

Commit e5c69f3

Browse files
sryzaOwenKephart
authored andcommitted
AssetSpec.partitions_def
1 parent 37f6bef commit e5c69f3

File tree

14 files changed

+340
-195
lines changed

14 files changed

+340
-195
lines changed

python_modules/dagster/dagster/_core/definitions/asset_graph.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ def owners(self) -> Sequence[str]:
9090

9191
@property
9292
def is_partitioned(self) -> bool:
93-
return self.assets_def.partitions_def is not None
93+
return self.partitions_def is not None
9494

9595
@property
9696
def partitions_def(self) -> Optional[PartitionsDefinition]:
97-
return self.assets_def.partitions_def
97+
return self.assets_def.specs_by_key[self.key].partitions_def
9898

9999
@property
100100
def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:

python_modules/dagster/dagster/_core/definitions/asset_out.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from dagster._core.definitions.freshness_policy import FreshnessPolicy
2323
from dagster._core.definitions.input import NoValueSentinel
2424
from dagster._core.definitions.output import Out
25+
from dagster._core.definitions.partition import PartitionsDefinition
2526
from dagster._core.definitions.utils import resolve_automation_condition
2627
from dagster._core.errors import DagsterInvalidDefinitionError
2728
from dagster._core.types.dagster_type import DagsterType
@@ -217,12 +218,17 @@ def to_out(self) -> Out:
217218
)
218219

219220
def to_spec(
220-
self, key: AssetKey, deps: Sequence[AssetDep], additional_tags: Mapping[str, str] = {}
221+
self,
222+
key: AssetKey,
223+
deps: Sequence[AssetDep],
224+
additional_tags: Mapping[str, str] = {},
225+
partitions_def: Optional[PartitionsDefinition] = ...,
221226
) -> AssetSpec:
222227
return self._spec.replace_attributes(
223228
key=key,
224229
tags={**additional_tags, **self.tags} if self.tags else additional_tags,
225230
deps=[*self._spec.deps, *deps],
231+
partitions_def=partitions_def,
226232
)
227233

228234
@public

python_modules/dagster/dagster/_core/definitions/assets.py

Lines changed: 62 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
AssetSpec,
3737
)
3838
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
39-
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
39+
from dagster._core.definitions.backfill_policy import BackfillPolicy
4040
from dagster._core.definitions.declarative_automation.automation_condition import (
4141
AutomationCondition,
4242
)
@@ -107,9 +107,9 @@ class AssetsDefinition(ResourceAddable, IHasInternalInit):
107107
"descriptions_by_key",
108108
"asset_deps",
109109
"owners_by_key",
110+
"partitions_def",
110111
}
111112

112-
_partitions_def: Optional[PartitionsDefinition]
113113
# partition mappings are also tracked inside the AssetSpecs, but this enables faster access by
114114
# upstream asset key
115115
_partition_mappings: Mapping[AssetKey, PartitionMapping]
@@ -229,24 +229,10 @@ def __init__(
229229
execution_type=execution_type or AssetExecutionType.MATERIALIZATION,
230230
)
231231

232-
self._partitions_def = _resolve_partitions_def(specs, partitions_def)
233-
234232
self._resource_defs = wrap_resources_for_execution(
235233
check.opt_mapping_param(resource_defs, "resource_defs")
236234
)
237235

238-
if self._partitions_def is None:
239-
# check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned
240-
check.param_invariant(
241-
(
242-
backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN
243-
if backfill_policy
244-
else True
245-
),
246-
"backfill_policy",
247-
"Non partitioned asset can only have single run backfill policy",
248-
)
249-
250236
if specs is not None:
251237
check.invariant(group_names_by_key is None)
252238
check.invariant(metadata_by_key is None)
@@ -258,6 +244,7 @@ def __init__(
258244
check.invariant(owners_by_key is None)
259245
check.invariant(partition_mappings is None)
260246
check.invariant(asset_deps is None)
247+
check.invariant(partitions_def is None)
261248
resolved_specs = specs
262249

263250
else:
@@ -297,6 +284,7 @@ def __init__(
297284
metadata_by_key=metadata_by_key,
298285
descriptions_by_key=descriptions_by_key,
299286
code_versions_by_key=None,
287+
partitions_def=partitions_def,
300288
)
301289

302290
normalized_specs: List[AssetSpec] = []
@@ -333,11 +321,11 @@ def __init__(
333321
check.invariant(
334322
not (
335323
spec.freshness_policy
336-
and self._partitions_def is not None
337-
and not isinstance(self._partitions_def, TimeWindowPartitionsDefinition)
324+
and spec.partitions_def is not None
325+
and not isinstance(spec.partitions_def, TimeWindowPartitionsDefinition)
338326
),
339327
"FreshnessPolicies are currently unsupported for assets with partitions of type"
340-
f" {type(self._partitions_def)}.",
328+
f" {spec.partitions_def}.",
341329
)
342330

343331
normalized_specs.append(
@@ -347,10 +335,19 @@ def __init__(
347335
metadata=metadata,
348336
description=description,
349337
skippable=skippable,
350-
partitions_def=self._partitions_def,
351338
)
352339
)
353340

341+
unique_partitions_defs = {
342+
spec.partitions_def for spec in normalized_specs if spec.partitions_def is not None
343+
}
344+
if len(unique_partitions_defs) > 1 and not can_subset:
345+
raise DagsterInvalidDefinitionError(
346+
"If different AssetSpecs have different partitions_defs, can_subset must be True"
347+
)
348+
349+
_validate_self_deps(normalized_specs)
350+
354351
self._specs_by_key = {spec.key: spec for spec in normalized_specs}
355352

356353
self._partition_mappings = get_partition_mappings_from_deps(
@@ -363,27 +360,11 @@ def __init__(
363360
spec.key: spec for spec in self._check_specs_by_output_name.values()
364361
}
365362

366-
if self._computation:
367-
_validate_self_deps(
368-
input_keys=[
369-
key
370-
# filter out the special inputs which are used for cases when a multi-asset is
371-
# subsetted, as these are not the same as self-dependencies and are never loaded
372-
# in the same step that their corresponding output is produced
373-
for input_name, key in self._computation.keys_by_input_name.items()
374-
if not input_name.startswith(ASSET_SUBSET_INPUT_PREFIX)
375-
],
376-
output_keys=self._computation.selected_asset_keys,
377-
partition_mappings=self._partition_mappings,
378-
partitions_def=self._partitions_def,
379-
)
380-
381363
def dagster_internal_init(
382364
*,
383365
keys_by_input_name: Mapping[str, AssetKey],
384366
keys_by_output_name: Mapping[str, AssetKey],
385367
node_def: NodeDefinition,
386-
partitions_def: Optional[PartitionsDefinition],
387368
selected_asset_keys: Optional[AbstractSet[AssetKey]],
388369
can_subset: bool,
389370
resource_defs: Optional[Mapping[str, object]],
@@ -400,7 +381,6 @@ def dagster_internal_init(
400381
keys_by_input_name=keys_by_input_name,
401382
keys_by_output_name=keys_by_output_name,
402383
node_def=node_def,
403-
partitions_def=partitions_def,
404384
selected_asset_keys=selected_asset_keys,
405385
can_subset=can_subset,
406386
resource_defs=resource_defs,
@@ -771,17 +751,13 @@ def _output_dict_to_asset_dict(
771751
metadata_by_key=_output_dict_to_asset_dict(metadata_by_output_name),
772752
descriptions_by_key=_output_dict_to_asset_dict(descriptions_by_output_name),
773753
code_versions_by_key=_output_dict_to_asset_dict(code_versions_by_output_name),
754+
partitions_def=partitions_def,
774755
)
775756

776757
return AssetsDefinition.dagster_internal_init(
777758
keys_by_input_name=keys_by_input_name,
778759
keys_by_output_name=keys_by_output_name_with_prefix,
779760
node_def=node_def,
780-
partitions_def=check.opt_inst_param(
781-
partitions_def,
782-
"partitions_def",
783-
PartitionsDefinition,
784-
),
785761
resource_defs=resource_defs,
786762
backfill_policy=check.opt_inst_param(
787763
backfill_policy, "backfill_policy", BackfillPolicy
@@ -1044,10 +1020,20 @@ def backfill_policy(self) -> Optional[BackfillPolicy]:
10441020
return self._computation.backfill_policy if self._computation else None
10451021

10461022
@public
1047-
@property
1023+
@cached_property
10481024
def partitions_def(self) -> Optional[PartitionsDefinition]:
10491025
"""Optional[PartitionsDefinition]: The PartitionsDefinition for this AssetsDefinition (if any)."""
1050-
return self._partitions_def
1026+
partitions_defs = {
1027+
spec.partitions_def for spec in self.specs if spec.partitions_def is not None
1028+
}
1029+
if len(partitions_defs) == 1:
1030+
return next(iter(partitions_defs))
1031+
elif len(partitions_defs) == 0:
1032+
return None
1033+
else:
1034+
check.failed(
1035+
"Different assets within this AssetsDefinition have different PartitionsDefinitions"
1036+
)
10511037

10521038
@property
10531039
def metadata_by_key(self) -> Mapping[AssetKey, ArbitraryMetadataMapping]:
@@ -1138,12 +1124,17 @@ def get_partition_mapping_for_dep(self, dep_key: AssetKey) -> Optional[Partition
11381124
return self._partition_mappings.get(dep_key)
11391125

11401126
def infer_partition_mapping(
1141-
self, upstream_asset_key: AssetKey, upstream_partitions_def: Optional[PartitionsDefinition]
1127+
self,
1128+
asset_key: AssetKey,
1129+
upstream_asset_key: AssetKey,
1130+
upstream_partitions_def: Optional[PartitionsDefinition],
11421131
) -> PartitionMapping:
11431132
with disable_dagster_warnings():
11441133
partition_mapping = self._partition_mappings.get(upstream_asset_key)
11451134
return infer_partition_mapping(
1146-
partition_mapping, self._partitions_def, upstream_partitions_def
1135+
partition_mapping,
1136+
self.specs_by_key[asset_key].partitions_def,
1137+
upstream_partitions_def,
11471138
)
11481139

11491140
def has_output_for_asset_key(self, key: AssetKey) -> bool:
@@ -1398,7 +1389,7 @@ def _output_to_source_asset(self, output_name: str) -> SourceAsset:
13981389
io_manager_key=output_def.io_manager_key,
13991390
description=spec.description,
14001391
resource_defs=self.resource_defs,
1401-
partitions_def=self.partitions_def,
1392+
partitions_def=spec.partitions_def,
14021393
group_name=spec.group_name,
14031394
tags=spec.tags,
14041395
io_manager_def=None,
@@ -1504,7 +1495,6 @@ def get_attributes_dict(self) -> Dict[str, Any]:
15041495
keys_by_input_name=self.node_keys_by_input_name,
15051496
keys_by_output_name=self.node_keys_by_output_name,
15061497
node_def=self._computation.node_def if self._computation else None,
1507-
partitions_def=self._partitions_def,
15081498
selected_asset_keys=self.keys,
15091499
can_subset=self.can_subset,
15101500
resource_defs=self._resource_defs,
@@ -1700,6 +1690,7 @@ def _asset_specs_from_attr_key_params(
17001690
code_versions_by_key: Optional[Mapping[AssetKey, str]],
17011691
descriptions_by_key: Optional[Mapping[AssetKey, str]],
17021692
owners_by_key: Optional[Mapping[AssetKey, Sequence[str]]],
1693+
partitions_def: Optional[PartitionsDefinition],
17031694
) -> Sequence[AssetSpec]:
17041695
validated_group_names_by_key = check.opt_mapping_param(
17051696
group_names_by_key, "group_names_by_key", key_type=AssetKey, value_type=str
@@ -1772,41 +1763,37 @@ def _asset_specs_from_attr_key_params(
17721763
# NodeDefinition
17731764
skippable=False,
17741765
auto_materialize_policy=None,
1775-
partitions_def=None,
17761766
kinds=None,
1767+
partitions_def=check.opt_inst_param(
1768+
partitions_def, "partitions_def", PartitionsDefinition
1769+
),
17771770
)
17781771
)
17791772

17801773
return result
17811774

17821775

1783-
def _validate_self_deps(
1784-
input_keys: Iterable[AssetKey],
1785-
output_keys: Iterable[AssetKey],
1786-
partition_mappings: Mapping[AssetKey, PartitionMapping],
1787-
partitions_def: Optional[PartitionsDefinition],
1788-
) -> None:
1789-
output_keys_set = set(output_keys)
1790-
for input_key in input_keys:
1791-
if input_key in output_keys_set:
1792-
if input_key in partition_mappings:
1793-
partition_mapping = partition_mappings[input_key]
1794-
time_window_partition_mapping = get_self_dep_time_window_partition_mapping(
1795-
partition_mapping, partitions_def
1796-
)
1797-
if (
1798-
time_window_partition_mapping is not None
1799-
and (time_window_partition_mapping.start_offset or 0) < 0
1800-
and (time_window_partition_mapping.end_offset or 0) < 0
1801-
):
1802-
continue
1776+
def _validate_self_deps(specs: Iterable[AssetSpec]) -> None:
1777+
for spec in specs:
1778+
for dep in spec.deps:
1779+
if dep.asset_key == spec.key:
1780+
if dep.partition_mapping:
1781+
time_window_partition_mapping = get_self_dep_time_window_partition_mapping(
1782+
dep.partition_mapping, spec.partitions_def
1783+
)
1784+
if (
1785+
time_window_partition_mapping is not None
1786+
and (time_window_partition_mapping.start_offset or 0) < 0
1787+
and (time_window_partition_mapping.end_offset or 0) < 0
1788+
):
1789+
continue
18031790

1804-
raise DagsterInvalidDefinitionError(
1805-
f'Asset "{input_key.to_user_string()}" depends on itself. Assets can only depend'
1806-
" on themselves if they are:\n(a) time-partitioned and each partition depends on"
1807-
" earlier partitions\n(b) multipartitioned, with one time dimension that depends"
1808-
" on earlier time partitions"
1809-
)
1791+
raise DagsterInvalidDefinitionError(
1792+
f'Asset "{spec.key.to_user_string()}" depends on itself. Assets can only depend'
1793+
" on themselves if they are:\n(a) time-partitioned and each partition depends on"
1794+
" earlier partitions\n(b) multipartitioned, with one time dimension that depends"
1795+
" on earlier time partitions"
1796+
)
18101797

18111798

18121799
def get_self_dep_time_window_partition_mapping(
@@ -1834,38 +1821,6 @@ def get_self_dep_time_window_partition_mapping(
18341821
return None
18351822

18361823

1837-
def _resolve_partitions_def(
1838-
specs: Optional[Sequence[AssetSpec]], partitions_def: Optional[PartitionsDefinition]
1839-
) -> Optional[PartitionsDefinition]:
1840-
if specs:
1841-
asset_keys_by_partitions_def = defaultdict(set)
1842-
for spec in specs:
1843-
asset_keys_by_partitions_def[spec.partitions_def].add(spec.key)
1844-
if len(asset_keys_by_partitions_def) > 1:
1845-
partition_1_asset_keys, partition_2_asset_keys, *_ = (
1846-
asset_keys_by_partitions_def.values()
1847-
)
1848-
check.failed(
1849-
f"All AssetSpecs must have the same partitions_def, but "
1850-
f"{next(iter(partition_1_asset_keys)).to_user_string()} and "
1851-
f"{next(iter(partition_2_asset_keys)).to_user_string()} have different "
1852-
"partitions_defs."
1853-
)
1854-
common_partitions_def = next(iter(asset_keys_by_partitions_def.keys()))
1855-
if (
1856-
common_partitions_def is not None
1857-
and partitions_def is not None
1858-
and common_partitions_def != partitions_def
1859-
):
1860-
check.failed(
1861-
f"AssetSpec for {next(iter(specs)).key.to_user_string()} has partitions_def which is different "
1862-
"than the partitions_def provided to AssetsDefinition.",
1863-
)
1864-
return partitions_def or common_partitions_def
1865-
else:
1866-
return partitions_def
1867-
1868-
18691824
def get_partition_mappings_from_deps(
18701825
partition_mappings: Dict[AssetKey, PartitionMapping], deps: Iterable[AssetDep], asset_name: str
18711826
) -> Mapping[AssetKey, PartitionMapping]:

0 commit comments

Comments
 (0)