Skip to content

Commit

Permalink
partitions_def off of dagster_internal_init
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Sep 6, 2024
1 parent 3ea5758 commit c44fe49
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 43 deletions.
10 changes: 8 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/asset_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.input import NoValueSentinel
from dagster._core.definitions.output import Out
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.utils import (
DEFAULT_IO_MANAGER_KEY,
resolve_automation_condition,
Expand Down Expand Up @@ -148,7 +149,12 @@ def to_out(self) -> Out:
code_version=self.code_version,
)

def to_spec(self, key: AssetKey, deps: Sequence[AssetDep]) -> AssetSpec:
def to_spec(
self,
key: AssetKey,
deps: Sequence[AssetDep],
partitions_def: Optional[PartitionsDefinition],
) -> AssetSpec:
with disable_dagster_warnings():
return AssetSpec.dagster_internal_init(
key=key,
Expand All @@ -163,7 +169,7 @@ def to_spec(self, key: AssetKey, deps: Sequence[AssetDep]) -> AssetSpec:
tags=self.tags,
deps=deps,
auto_materialize_policy=None,
partitions_def=None,
partitions_def=partitions_def,
)

@property
Expand Down
34 changes: 7 additions & 27 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class AssetsDefinition(ResourceAddable, IHasInternalInit):
"descriptions_by_key",
"asset_deps",
"owners_by_key",
"partitions_def",
}

# partition mappings are also tracked inside the AssetSpecs, but this enables faster access by
Expand Down Expand Up @@ -241,7 +242,7 @@ def __init__(
check.invariant(owners_by_key is None)
check.invariant(partition_mappings is None)
check.invariant(asset_deps is None)
_validate_partitions_defs(specs, partitions_def)
validate_spec_partitions_defs(specs, partitions_def)
resolved_specs = specs

else:
Expand Down Expand Up @@ -363,7 +364,6 @@ def dagster_internal_init(
keys_by_input_name: Mapping[str, AssetKey],
keys_by_output_name: Mapping[str, AssetKey],
node_def: NodeDefinition,
partitions_def: Optional[PartitionsDefinition],
selected_asset_keys: Optional[AbstractSet[AssetKey]],
can_subset: bool,
resource_defs: Optional[Mapping[str, object]],
Expand All @@ -380,7 +380,6 @@ def dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
node_def=node_def,
partitions_def=partitions_def,
selected_asset_keys=selected_asset_keys,
can_subset=can_subset,
resource_defs=resource_defs,
Expand Down Expand Up @@ -1492,7 +1491,6 @@ def get_attributes_dict(self) -> Dict[str, Any]:
keys_by_input_name=self.node_keys_by_input_name,
keys_by_output_name=self.node_keys_by_output_name,
node_def=self._computation.node_def if self._computation else None,
partitions_def=None,
selected_asset_keys=self.keys,
can_subset=self.can_subset,
resource_defs=self._resource_defs,
Expand Down Expand Up @@ -1760,7 +1758,6 @@ def _asset_specs_from_attr_key_params(
# NodeDefinition
skippable=False,
auto_materialize_policy=None,
# Value here is irrelevant, because it will be replaced by value from partitions_def
partitions_def=None,
)
)
Expand Down Expand Up @@ -1816,35 +1813,18 @@ def get_self_dep_time_window_partition_mapping(
return None


def _validate_partitions_defs(
def validate_spec_partitions_defs(
specs: Sequence[AssetSpec], partitions_def: Optional[PartitionsDefinition]
) -> Optional[PartitionsDefinition]:
any_spec_has_partitions_def = False
any_spec_has_no_partitions_def = False
if partitions_def is not None:
for spec in specs:
if spec.partitions_def is not None and spec.partitions_def != partitions_def:
if spec.partitions_def != partitions_def:
check.failed(
f"AssetSpec for {next(iter(specs)).key.to_user_string()} has partitions_def "
"which is different than the partitions_def provided to AssetsDefinition.",
f"AssetSpec for {spec.key.to_user_string()} has partitions_def "
f"(type={type(spec.partitions_def)}) which is different than the "
f"partitions_def provided to AssetsDefinition (type={type(partitions_def)}).",
)

any_spec_has_partitions_def = (
any_spec_has_partitions_def or spec.partitions_def is not None
)
any_spec_has_no_partitions_def = (
any_spec_has_no_partitions_def or spec.partitions_def is None
)

if (
partitions_def is not None
and any_spec_has_partitions_def
and any_spec_has_no_partitions_def
):
check.failed(
"If partitions_def is provided, then either all specs must have that PartitionsDefinition or none."
)


def get_partition_mappings_from_deps(
partition_mappings: Dict[AssetKey, PartitionMapping], deps: Iterable[AssetDep], asset_name: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ASSET_SUBSET_INPUT_PREFIX,
AssetsDefinition,
get_partition_mappings_from_deps,
validate_spec_partitions_defs,
)
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.decorators.op_decorator import _Op
Expand Down Expand Up @@ -563,7 +564,6 @@ def create_assets_definition(self) -> AssetsDefinition:
keys_by_input_name=self.asset_keys_by_input_names,
keys_by_output_name=self.asset_keys_by_output_name,
node_def=self.create_op_definition(),
partitions_def=self.args.partitions_def,
can_subset=self.args.can_subset,
resource_defs=self.args.assets_def_resource_defs,
backfill_policy=self.args.backfill_policy,
Expand All @@ -577,18 +577,31 @@ def create_assets_definition(self) -> AssetsDefinition:

@cached_property
def specs(self) -> Sequence[AssetSpec]:
specs = self.args.specs if self.args.specs else self._synthesize_specs()

if not self.group_name:
return specs
if self.args.specs:
specs = self.args.specs
validate_spec_partitions_defs(specs, self.args.partitions_def)
else:
specs = self._synthesize_specs()

check.invariant(
all((spec.group_name is None or spec.group_name == self.group_name) for spec in specs),
not self.group_name
or all(
(spec.group_name is None or spec.group_name == self.group_name) for spec in specs
),
"Cannot set group_name parameter on multi_asset if one or more of the"
" AssetSpecs/AssetOuts supplied to this multi_asset have a group_name defined.",
)

return [spec._replace(group_name=self.group_name) for spec in specs]
if not self.group_name and not self.args.partitions_def:
return specs

return [
spec._replace(
group_name=self.group_name,
partitions_def=spec.partitions_def or self.args.partitions_def,
)
for spec in specs
]

def _synthesize_specs(self) -> Sequence[AssetSpec]:
resolved_specs = []
Expand All @@ -613,7 +626,9 @@ def _synthesize_specs(self) -> Sequence[AssetSpec]:
else:
deps = input_deps

resolved_specs.append(asset_out.to_spec(key, deps=deps))
resolved_specs.append(
asset_out.to_spec(key, deps=deps, partitions_def=self.args.partitions_def)
)

specs = resolved_specs
return specs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
freshness_policy=source_asset.freshness_policy,
deps=[],
owners=[],
partitions_def=source_asset.partitions_def,
)

return AssetsDefinition(
specs=[spec],
keys_by_output_name=keys_by_output_name,
node_def=node_def,
partitions_def=source_asset.partitions_def,
# We don't pass the `io_manager_def` because it will already be present in
# `resource_defs` (it is added during `SourceAsset` initialization).
resource_defs=source_asset.resource_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,11 @@ def asset_partitions_subset_for_input(
upstream_asset_partitions_def = asset_layer.get(upstream_asset_key).partitions_def

if upstream_asset_partitions_def is not None:
partitions_def = assets_def.partitions_def if assets_def else None
partitions_def = (
assets_def.specs_by_key[next(iter(assets_def.keys))].partitions_def
if assets_def
else None
)
partitions_subset = (
partitions_def.empty_subset().with_partition_key_range(
partitions_def,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ def assets2(): ...

with pytest.raises(
CheckError,
match="AssetSpec for asset1 has partitions_def which is different than the partitions_def provided to AssetsDefinition.",
match="which is different than the partitions_def provided to AssetsDefinition",
):

@multi_asset(
Expand All @@ -890,8 +890,8 @@ def assets3(): ...
def assets4(): ...

with pytest.raises(
DagsterInvalidDefinitionError,
match="If different AssetSpecs have different partitions_defs, can_subset must be True",
CheckError,
match="which is different than the partitions_def provided to AssetsDefinition",
):

@multi_asset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@
import mock
import pytest
from dagster import (
AssetDep,
AssetIn,
AssetKey,
AssetOut,
AssetsDefinition,
AssetSpec,
DagsterInstance,
DagsterRunStatus,
DailyPartitionsDefinition,
Definitions,
HourlyPartitionsDefinition,
LastPartitionMapping,
MaterializeResult,
Nothing,
PartitionKeyRange,
PartitionsDefinition,
RunRequest,
StaticPartitionMapping,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
WeeklyPartitionsDefinition,
Expand Down Expand Up @@ -537,7 +541,9 @@ def get_asset_graph(
) as get_builtin_partition_mapping_types:
get_builtin_partition_mapping_types.return_value = tuple(
assets_def.infer_partition_mapping(
next(iter(assets_def.keys)), dep_key, assets_defs_by_key[dep_key].partitions_def
next(iter(assets_def.keys)),
dep_key,
assets_defs_by_key[dep_key].specs_by_key[dep_key].partitions_def,
).__class__
for assets in assets_by_repo_name.values()
for assets_def in assets
Expand Down Expand Up @@ -1660,6 +1666,56 @@ def my_multi_asset():
assert AssetKeyPartitionKey(AssetKey("c"), "1") in backfill_data.requested_subset


def test_multi_asset_internal_deps_different_partitions_asset_backfill() -> None:
@multi_asset(
specs=[
AssetSpec(
"asset1", partitions_def=StaticPartitionsDefinition(["a", "b"]), skippable=True
),
AssetSpec(
"asset2",
partitions_def=StaticPartitionsDefinition(["1"]),
deps=[
AssetDep(
"asset1",
partition_mapping=StaticPartitionMapping({"a": {"1"}, "b": {"1"}}),
)
],
skippable=True,
),
],
can_subset=True,
)
def my_multi_asset(context):
for asset_key in context.selected_asset_keys:
yield MaterializeResult(asset_key=asset_key)

instance = DagsterInstance.ephemeral()
repo_dict = {"repo": [my_multi_asset]}
asset_graph = get_asset_graph(repo_dict)
current_time = create_datetime(2024, 1, 9, 0, 0, 0)
asset_backfill_data = AssetBackfillData.from_asset_graph_subset(
asset_graph_subset=AssetGraphSubset.all(
asset_graph, dynamic_partitions_store=MagicMock(), current_time=current_time
),
backfill_start_timestamp=current_time.timestamp(),
dynamic_partitions_store=MagicMock(),
)
backfill_data_after_iter1 = _single_backfill_iteration(
"fake_id", asset_backfill_data, asset_graph, instance, repo_dict
)
after_iter1_requested_subset = backfill_data_after_iter1.requested_subset
assert AssetKeyPartitionKey(AssetKey("asset1"), "a") in after_iter1_requested_subset
assert AssetKeyPartitionKey(AssetKey("asset1"), "b") in after_iter1_requested_subset
assert AssetKeyPartitionKey(AssetKey("asset2"), "1") not in after_iter1_requested_subset

backfill_data_after_iter2 = _single_backfill_iteration(
"fake_id", backfill_data_after_iter1, asset_graph, instance, repo_dict
)
after_iter2_requested_subset = backfill_data_after_iter2.requested_subset
assert AssetKeyPartitionKey(AssetKey("asset2"), "1") in after_iter2_requested_subset


def test_multi_asset_internal_and_external_deps_asset_backfill() -> None:
pd = StaticPartitionsDefinition(["1", "2", "3"])

Expand Down

0 comments on commit c44fe49

Please sign in to comment.