From 63116a81c7e287b68abe5c13b6c9637ee86eb48c Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 20 Dec 2024 15:25:26 -0500 Subject: [PATCH] [components] Remove AutomationConditionModel in favor of raw python object --- .../core/component_rendering.py | 14 +++- .../dagster_components/core/dsl_schema.py | 38 ++++++---- .../lib/pipes_subprocess_script_collection.py | 40 +++-------- .../components/scripts/component.yaml | 8 +-- .../registry_tests/__init__.py | 0 .../test_registry.py | 0 ...test_pipes_subprocess_script_collection.py | 10 +-- .../unit_tests/test_spec_processing.py | 72 +++++++++++++++---- 8 files changed, 111 insertions(+), 71 deletions(-) create mode 100644 python_modules/libraries/dagster-components/dagster_components_tests/registry_tests/__init__.py rename python_modules/libraries/dagster-components/dagster_components_tests/{unit_tests => registry_tests}/test_registry.py (100%) diff --git a/python_modules/libraries/dagster-components/dagster_components/core/component_rendering.py b/python_modules/libraries/dagster-components/dagster_components/core/component_rendering.py index 6742debd01315..fd907e4e9340a 100644 --- a/python_modules/libraries/dagster-components/dagster_components/core/component_rendering.py +++ b/python_modules/libraries/dagster-components/dagster_components/core/component_rendering.py @@ -4,6 +4,9 @@ from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Type, TypeVar, Union import dagster._check as check +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) from dagster._record import record from jinja2.nativetypes import NativeTemplate from pydantic import BaseModel, Field @@ -17,6 +20,13 @@ CONTEXT_KEY = "required_rendering_scope" +def automation_condition_scope() -> Mapping[str, Any]: + return { + "eager": AutomationCondition.eager, + "on_cron": AutomationCondition.on_cron, + } + + def RenderingScope(field: Optional[FieldInfo] = None, *, required_scope: AbstractSet[str]) -> Any: """Defines a Pydantic Field that requires a specific scope to be available before rendering. @@ -50,7 +60,9 @@ class TemplatedValueResolver: @staticmethod def default() -> "TemplatedValueResolver": - return TemplatedValueResolver(context={"env": _env}) + return TemplatedValueResolver( + context={"env": _env, "automation_condition": automation_condition_scope()} + ) def with_context(self, **additional_context) -> "TemplatedValueResolver": return TemplatedValueResolver(context={**self.context, **additional_context}) diff --git a/python_modules/libraries/dagster-components/dagster_components/core/dsl_schema.py b/python_modules/libraries/dagster-components/dagster_components/core/dsl_schema.py index d2a2f1e05c26f..d6386900e0fd4 100644 --- a/python_modules/libraries/dagster-components/dagster_components/core/dsl_schema.py +++ b/python_modules/libraries/dagster-components/dagster_components/core/dsl_schema.py @@ -20,21 +20,34 @@ class OpSpecBaseModel(BaseModel): tags: Optional[Dict[str, str]] = None -class AutomationConditionModel(BaseModel): - type: str - params: Mapping[str, Any] = {} +class AssetAttributesModel(BaseModel): + key: Optional[str] = None + deps: Sequence[str] = [] + description: Optional[str] = None + metadata: Union[str, Mapping[str, Any]] = {} + group_name: Optional[str] = None + skippable: bool = False + code_version: Optional[str] = None + owners: Sequence[str] = [] + tags: Union[str, Mapping[str, str]] = {} + automation_condition: Optional[Union[str, AutomationCondition]] = RenderingScope( + Field(None), required_scope={"automation_condition"} + ) + + class Config: + # required for AutomationCondition + arbitrary_types_allowed = True - def to_automation_condition(self) -> AutomationCondition: - return getattr(AutomationCondition, self.type)(**self.params) + def get_resolved_attributes(self, value_resolver: TemplatedValueResolver) -> Mapping[str, Any]: + return value_resolver.resolve(self.model_dump(exclude_unset=True)) class AssetSpecProcessor(ABC, BaseModel): target: str = "*" - description: Optional[str] = None - metadata: Optional[Mapping[str, Any]] = None - group_name: Optional[str] = None - tags: Optional[Mapping[str, str]] = None - automation_condition: Optional[AutomationConditionModel] = None + attributes: AssetAttributesModel + + class Config: + arbitrary_types_allowed = True def _apply_to_spec(self, spec: AssetSpec, attributes: Mapping[str, Any]) -> AssetSpec: ... @@ -48,10 +61,9 @@ def apply_to_spec( return spec # add the original spec to the context and resolve values - attributes = value_resolver.with_context(asset=spec).resolve( - self.model_dump(exclude={"target", "operation"}, exclude_unset=True) + return self._apply_to_spec( + spec, self.attributes.get_resolved_attributes(value_resolver.with_context(asset=spec)) ) - return self._apply_to_spec(spec, attributes) def apply(self, defs: Definitions, value_resolver: TemplatedValueResolver) -> Definitions: target_selection = AssetSelection.from_string(self.target, include_sources=True) diff --git a/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py b/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py index 75a8a83730505..428d4f02bf726 100644 --- a/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py +++ b/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py @@ -1,51 +1,24 @@ import shutil from pathlib import Path -from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Mapping, Sequence -from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators.asset_decorator import multi_asset from dagster._core.execution.context.asset_execution_context import AssetExecutionContext from dagster._core.pipes.subprocess import PipesSubprocessClient -from dagster._utils.warnings import suppress_dagster_warnings from pydantic import BaseModel from dagster_components.core.component import Component, ComponentLoadContext, component -from dagster_components.core.dsl_schema import AutomationConditionModel +from dagster_components.core.dsl_schema import AssetAttributesModel if TYPE_CHECKING: from dagster._core.definitions.definitions_class import Definitions -class AssetSpecModel(BaseModel): - key: str - deps: Sequence[str] = [] - description: Optional[str] = None - metadata: Mapping[str, Any] = {} - group_name: Optional[str] = None - skippable: bool = False - code_version: Optional[str] = None - owners: Sequence[str] = [] - tags: Mapping[str, str] = {} - automation_condition: Optional[AutomationConditionModel] = None - - @suppress_dagster_warnings - def to_asset_spec(self) -> AssetSpec: - return AssetSpec( - **{ - **self.__dict__, - "key": AssetKey.from_user_string(self.key), - "automation_condition": self.automation_condition.to_automation_condition() - if self.automation_condition - else None, - }, - ) - - class PipesSubprocessScriptParams(BaseModel): path: str - assets: Sequence[AssetSpecModel] + assets: Sequence[AssetAttributesModel] class PipesSubprocessScriptCollectionParams(BaseModel): @@ -78,11 +51,14 @@ def load(cls, context: ComponentLoadContext) -> "PipesSubprocessScriptCollection script_path = context.path / script.path if not script_path.exists(): raise FileNotFoundError(f"Script {script_path} does not exist") - path_specs[script_path] = [spec.to_asset_spec() for spec in script.assets] + path_specs[script_path] = [ + AssetSpec(**asset.get_resolved_attributes(context.templated_value_resolver)) + for asset in script.assets + ] return cls(dirpath=context.path, path_specs=path_specs) - def build_defs(self, load_context: "ComponentLoadContext") -> "Definitions": + def build_defs(self, context: "ComponentLoadContext") -> "Definitions": from dagster._core.definitions.definitions_class import Definitions return Definitions( diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/scripts/component.yaml b/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/scripts/component.yaml index e65c0273bb93b..33737c820e152 100644 --- a/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/scripts/component.yaml +++ b/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/scripts/component.yaml @@ -5,13 +5,9 @@ params: - path: script_one.py assets: - key: a - automation_condition: - type: eager + automation_condition: "{{ automation_condition.eager() }}" - key: b - automation_condition: - type: on_cron - params: - cron_schedule: "@daily" + automation_condition: "{{ automation_condition.on_cron('@daily') }}" deps: [up1, up2] - path: script_two.py assets: diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/registry_tests/__init__.py b/python_modules/libraries/dagster-components/dagster_components_tests/registry_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_registry.py b/python_modules/libraries/dagster-components/dagster_components_tests/registry_tests/test_registry.py similarity index 100% rename from python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_registry.py rename to python_modules/libraries/dagster-components/dagster_components_tests/registry_tests/test_registry.py diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py index 69409ae590a51..577b53729b3d0 100644 --- a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py +++ b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py @@ -34,13 +34,13 @@ def test_python_params() -> None: { "path": "script_one.py", "assets": [ - {"key": "a", "automation_condition": {"type": "eager"}}, + { + "key": "a", + "automation_condition": "{{ automation_condition.eager() }}", + }, { "key": "b", - "automation_condition": { - "type": "on_cron", - "params": {"cron_schedule": "@daily"}, - }, + "automation_condition": "{{ automation_condition.on_cron('@daily') }}", "deps": ["up1", "up2"], }, ], diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_spec_processing.py b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_spec_processing.py index 80198bad19755..3fa7273775a39 100644 --- a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_spec_processing.py +++ b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_spec_processing.py @@ -1,7 +1,8 @@ import pytest -from dagster import AssetKey, AssetSpec, Definitions +from dagster import AssetKey, AssetSpec, AutomationCondition, Definitions from dagster_components.core.dsl_schema import ( AssetAttributes, + AssetAttributesModel, MergeAttributes, ReplaceAttributes, TemplatedValueResolver, @@ -23,7 +24,11 @@ class M(BaseModel): def test_replace_attributes() -> None: - op = ReplaceAttributes(operation="replace", target="group:g2", tags={"newtag": "newval"}) + op = ReplaceAttributes( + operation="replace", + target="group:g2", + attributes=AssetAttributesModel(tags={"newtag": "newval"}), + ) newdefs = op.apply(defs, TemplatedValueResolver.default()) asset_graph = newdefs.get_asset_graph() @@ -33,7 +38,11 @@ def test_replace_attributes() -> None: def test_merge_attributes() -> None: - op = MergeAttributes(operation="merge", target="group:g2", tags={"newtag": "newval"}) + op = MergeAttributes( + operation="merge", + target="group:g2", + attributes=AssetAttributesModel(tags={"newtag": "newval"}), + ) newdefs = op.apply(defs, TemplatedValueResolver.default()) asset_graph = newdefs.get_asset_graph() @@ -44,33 +53,68 @@ def test_merge_attributes() -> None: def test_render_attributes() -> None: op = ReplaceAttributes( - operation="replace", target="group:g2", tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"} + operation="replace", + target="group:g2", + attributes=AssetAttributesModel( + tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"}, + metadata="{{ metadata }}", + automation_condition="{{ custom_cron('@daily') }}", + ), ) - newdefs = op.apply(defs, TemplatedValueResolver.default().with_context(foo="theval")) + def _custom_cron(s): + return AutomationCondition.cron_tick_passed(s) & ~AutomationCondition.in_progress() + + metadata = {"a": 1, "b": "str", "d": 1.23} + newdefs = op.apply( + defs, + TemplatedValueResolver.default().with_context( + foo="theval", metadata=metadata, custom_cron=_custom_cron + ), + ) asset_graph = newdefs.get_asset_graph() assert asset_graph.get(AssetKey("a")).tags == {} - assert asset_graph.get(AssetKey("b")).tags == {"a": "theval", "b": "prefix_theval"} - assert asset_graph.get(AssetKey("c")).tags == {"a": "theval", "b": "prefix_theval"} + assert asset_graph.get(AssetKey("a")).metadata == {} + assert asset_graph.get(AssetKey("a")).automation_condition is None + + for k in ["b", "c"]: + node = asset_graph.get(AssetKey(k)) + assert node.tags == {"a": "theval", "b": "prefix_theval"} + assert node.metadata == metadata + assert node.automation_condition == _custom_cron("@daily") @pytest.mark.parametrize( "python,expected", [ # default to merge and a * target - ({"tags": {"a": "b"}}, MergeAttributes(target="*", tags={"a": "b"})), ( - {"operation": "replace", "tags": {"a": "b"}}, - ReplaceAttributes(operation="replace", target="*", tags={"a": "b"}), + {"attributes": {"tags": {"a": "b"}}}, + MergeAttributes(target="*", attributes=AssetAttributesModel(tags={"a": "b"})), + ), + ( + {"operation": "replace", "attributes": {"tags": {"a": "b"}}}, + ReplaceAttributes( + operation="replace", + target="*", + attributes=AssetAttributesModel(tags={"a": "b"}), + ), ), # explicit target ( - {"tags": {"a": "b"}, "target": "group:g2"}, - MergeAttributes(target="group:g2", tags={"a": "b"}), + {"attributes": {"tags": {"a": "b"}}, "target": "group:g2"}, + MergeAttributes( + target="group:g2", + attributes=AssetAttributesModel(tags={"a": "b"}), + ), ), ( - {"operation": "replace", "tags": {"a": "b"}, "target": "group:g2"}, - ReplaceAttributes(operation="replace", target="group:g2", tags={"a": "b"}), + {"operation": "replace", "attributes": {"tags": {"a": "b"}}, "target": "group:g2"}, + ReplaceAttributes( + operation="replace", + target="group:g2", + attributes=AssetAttributesModel(tags={"a": "b"}), + ), ), ], )