Skip to content

Commit

Permalink
[components] Remove AutomationConditionModel in favor of raw python o…
Browse files Browse the repository at this point in the history
…bject
  • Loading branch information
OwenKephart committed Dec 23, 2024
1 parent 7eb1d13 commit 5fa066b
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Type, TypeVar, Union

import dagster._check as check
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._record import record
from jinja2.nativetypes import NativeTemplate
from pydantic import BaseModel, Field
Expand All @@ -17,6 +20,13 @@
CONTEXT_KEY = "required_rendering_scope"


def automation_condition_scope() -> Mapping[str, Any]:
return {
"eager": AutomationCondition.eager,
"on_cron": AutomationCondition.on_cron,
}


def RenderingScope(field: Optional[FieldInfo] = None, *, required_scope: AbstractSet[str]) -> Any:
"""Defines a Pydantic Field that requires a specific scope to be available before rendering.
Expand Down Expand Up @@ -50,7 +60,9 @@ class TemplatedValueResolver:

@staticmethod
def default() -> "TemplatedValueResolver":
return TemplatedValueResolver(context={"env": _env})
return TemplatedValueResolver(
context={"env": _env, "automation_condition": automation_condition_scope()}
)

def with_context(self, **additional_context) -> "TemplatedValueResolver":
return TemplatedValueResolver(context={**self.context, **additional_context})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,34 @@ class OpSpecBaseModel(BaseModel):
tags: Optional[Dict[str, str]] = None


class AutomationConditionModel(BaseModel):
type: str
params: Mapping[str, Any] = {}
class AssetAttributesModel(BaseModel):
key: Optional[str] = None
deps: Sequence[str] = []
description: Optional[str] = None
metadata: Union[str, Mapping[str, Any]] = {}
group_name: Optional[str] = None
skippable: bool = False
code_version: Optional[str] = None
owners: Sequence[str] = []
tags: Union[str, Mapping[str, str]] = {}
automation_condition: Optional[Union[str, AutomationCondition]] = RenderingScope(
Field(None), required_scope={"automation_condition"}
)

class Config:
# required for AutomationCondition
arbitrary_types_allowed = True

def to_automation_condition(self) -> AutomationCondition:
return getattr(AutomationCondition, self.type)(**self.params)
def get_resolved_attributes(self, value_resolver: TemplatedValueResolver) -> Mapping[str, Any]:
return value_resolver.resolve(self.model_dump(exclude_unset=True))


class AssetSpecProcessor(ABC, BaseModel):
target: str = "*"
description: Optional[str] = None
metadata: Optional[Mapping[str, Any]] = None
group_name: Optional[str] = None
tags: Optional[Mapping[str, str]] = None
automation_condition: Optional[AutomationConditionModel] = None
attributes: AssetAttributesModel

class Config:
arbitrary_types_allowed = True

def _apply_to_spec(self, spec: AssetSpec, attributes: Mapping[str, Any]) -> AssetSpec: ...

Expand All @@ -48,10 +61,9 @@ def apply_to_spec(
return spec

# add the original spec to the context and resolve values
attributes = value_resolver.with_context(asset=spec).resolve(
self.model_dump(exclude={"target", "operation"}, exclude_unset=True)
return self._apply_to_spec(
spec, self.attributes.get_resolved_attributes(value_resolver.with_context(asset=spec))
)
return self._apply_to_spec(spec, attributes)

def apply(self, defs: Definitions, value_resolver: TemplatedValueResolver) -> Definitions:
target_selection = AssetSelection.from_string(self.target, include_sources=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,24 @@
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Mapping, Sequence

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.pipes.subprocess import PipesSubprocessClient
from dagster._utils.warnings import suppress_dagster_warnings
from pydantic import BaseModel

from dagster_components.core.component import Component, ComponentLoadContext, component
from dagster_components.core.dsl_schema import AutomationConditionModel
from dagster_components.core.dsl_schema import AssetAttributesModel

if TYPE_CHECKING:
from dagster._core.definitions.definitions_class import Definitions


class AssetSpecModel(BaseModel):
key: str
deps: Sequence[str] = []
description: Optional[str] = None
metadata: Mapping[str, Any] = {}
group_name: Optional[str] = None
skippable: bool = False
code_version: Optional[str] = None
owners: Sequence[str] = []
tags: Mapping[str, str] = {}
automation_condition: Optional[AutomationConditionModel] = None

@suppress_dagster_warnings
def to_asset_spec(self) -> AssetSpec:
return AssetSpec(
**{
**self.__dict__,
"key": AssetKey.from_user_string(self.key),
"automation_condition": self.automation_condition.to_automation_condition()
if self.automation_condition
else None,
},
)


class PipesSubprocessScriptParams(BaseModel):
path: str
assets: Sequence[AssetSpecModel]
assets: Sequence[AssetAttributesModel]


class PipesSubprocessScriptCollectionParams(BaseModel):
Expand Down Expand Up @@ -78,11 +51,14 @@ def load(cls, context: ComponentLoadContext) -> "PipesSubprocessScriptCollection
script_path = context.path / script.path
if not script_path.exists():
raise FileNotFoundError(f"Script {script_path} does not exist")
path_specs[script_path] = [spec.to_asset_spec() for spec in script.assets]
path_specs[script_path] = [
AssetSpec(**asset.get_resolved_attributes(context.templated_value_resolver))
for asset in script.assets
]

return cls(dirpath=context.path, path_specs=path_specs)

def build_defs(self, load_context: "ComponentLoadContext") -> "Definitions":
def build_defs(self, context: "ComponentLoadContext") -> "Definitions":
from dagster._core.definitions.definitions_class import Definitions

return Definitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@ params:
- path: script_one.py
assets:
- key: a
automation_condition:
type: eager
automation_condition: "{{ automation_condition.eager() }}"
- key: b
automation_condition:
type: on_cron
params:
cron_schedule: "@daily"
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
deps: [up1, up2]
- path: script_two.py
assets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ params:
project_dir: jaffle_shop

asset_attributes:
- tags:
foo: bar
metadata:
something: 1
automation_condition:
type: on_cron
params:
cron_schedule: "@daily"
- tags:
another: one
- attributes:
tags:
foo: bar
metadata:
something: 1
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
- attributes:
tags:
another: one
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ def test_python_params() -> None:
{
"path": "script_one.py",
"assets": [
{"key": "a", "automation_condition": {"type": "eager"}},
{
"key": "a",
"automation_condition": "{{ automation_condition.eager() }}",
},
{
"key": "b",
"automation_condition": {
"type": "on_cron",
"params": {"cron_schedule": "@daily"},
},
"automation_condition": "{{ automation_condition.on_cron('@daily') }}",
"deps": ["up1", "up2"],
},
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from dagster import AssetKey, AssetSpec, Definitions
from dagster import AssetKey, AssetSpec, AutomationCondition, Definitions
from dagster_components.core.dsl_schema import (
AssetAttributes,
AssetAttributesModel,
MergeAttributes,
ReplaceAttributes,
TemplatedValueResolver,
Expand All @@ -23,7 +24,11 @@ class M(BaseModel):


def test_replace_attributes() -> None:
op = ReplaceAttributes(operation="replace", target="group:g2", tags={"newtag": "newval"})
op = ReplaceAttributes(
operation="replace",
target="group:g2",
attributes=AssetAttributesModel(tags={"newtag": "newval"}),
)

newdefs = op.apply(defs, TemplatedValueResolver.default())
asset_graph = newdefs.get_asset_graph()
Expand All @@ -33,7 +38,11 @@ def test_replace_attributes() -> None:


def test_merge_attributes() -> None:
op = MergeAttributes(operation="merge", target="group:g2", tags={"newtag": "newval"})
op = MergeAttributes(
operation="merge",
target="group:g2",
attributes=AssetAttributesModel(tags={"newtag": "newval"}),
)

newdefs = op.apply(defs, TemplatedValueResolver.default())
asset_graph = newdefs.get_asset_graph()
Expand All @@ -43,7 +52,9 @@ def test_merge_attributes() -> None:


def test_render_attributes_asset_context() -> None:
op = MergeAttributes(tags={"group_name_tag": "group__{{ asset.group_name }}"})
op = MergeAttributes(
attributes=AssetAttributesModel(tags={"group_name_tag": "group__{{ asset.group_name }}"})
)

newdefs = op.apply(defs, TemplatedValueResolver.default().with_context(foo="theval"))
asset_graph = newdefs.get_asset_graph()
Expand All @@ -54,33 +65,68 @@ def test_render_attributes_asset_context() -> None:

def test_render_attributes_custom_context() -> None:
op = ReplaceAttributes(
operation="replace", target="group:g2", tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"}
operation="replace",
target="group:g2",
attributes=AssetAttributesModel(
tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"},
metadata="{{ metadata }}",
automation_condition="{{ custom_cron('@daily') }}",
),
)

newdefs = op.apply(defs, TemplatedValueResolver.default().with_context(foo="theval"))
def _custom_cron(s):
return AutomationCondition.cron_tick_passed(s) & ~AutomationCondition.in_progress()

metadata = {"a": 1, "b": "str", "d": 1.23}
newdefs = op.apply(
defs,
TemplatedValueResolver.default().with_context(
foo="theval", metadata=metadata, custom_cron=_custom_cron
),
)
asset_graph = newdefs.get_asset_graph()
assert asset_graph.get(AssetKey("a")).tags == {}
assert asset_graph.get(AssetKey("b")).tags == {"a": "theval", "b": "prefix_theval"}
assert asset_graph.get(AssetKey("c")).tags == {"a": "theval", "b": "prefix_theval"}
assert asset_graph.get(AssetKey("a")).metadata == {}
assert asset_graph.get(AssetKey("a")).automation_condition is None

for k in ["b", "c"]:
node = asset_graph.get(AssetKey(k))
assert node.tags == {"a": "theval", "b": "prefix_theval"}
assert node.metadata == metadata
assert node.automation_condition == _custom_cron("@daily")


@pytest.mark.parametrize(
"python,expected",
[
# default to merge and a * target
({"tags": {"a": "b"}}, MergeAttributes(target="*", tags={"a": "b"})),
(
{"operation": "replace", "tags": {"a": "b"}},
ReplaceAttributes(operation="replace", target="*", tags={"a": "b"}),
{"attributes": {"tags": {"a": "b"}}},
MergeAttributes(target="*", attributes=AssetAttributesModel(tags={"a": "b"})),
),
(
{"operation": "replace", "attributes": {"tags": {"a": "b"}}},
ReplaceAttributes(
operation="replace",
target="*",
attributes=AssetAttributesModel(tags={"a": "b"}),
),
),
# explicit target
(
{"tags": {"a": "b"}, "target": "group:g2"},
MergeAttributes(target="group:g2", tags={"a": "b"}),
{"attributes": {"tags": {"a": "b"}}, "target": "group:g2"},
MergeAttributes(
target="group:g2",
attributes=AssetAttributesModel(tags={"a": "b"}),
),
),
(
{"operation": "replace", "tags": {"a": "b"}, "target": "group:g2"},
ReplaceAttributes(operation="replace", target="group:g2", tags={"a": "b"}),
{"operation": "replace", "attributes": {"tags": {"a": "b"}}, "target": "group:g2"},
ReplaceAttributes(
operation="replace",
target="group:g2",
attributes=AssetAttributesModel(tags={"a": "b"}),
),
),
],
)
Expand Down

0 comments on commit 5fa066b

Please sign in to comment.