Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[components] Remove AutomationConditionModel in favor of raw python object #26649

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Type, TypeVar, Union

import dagster._check as check
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._record import record
from jinja2.nativetypes import NativeTemplate
from pydantic import BaseModel, Field
Expand All @@ -17,6 +20,13 @@
CONTEXT_KEY = "required_rendering_scope"


def automation_condition_scope() -> Mapping[str, Any]:
return {
"eager": AutomationCondition.eager,
"on_cron": AutomationCondition.on_cron,
}


def RenderingScope(field: Optional[FieldInfo] = None, *, required_scope: AbstractSet[str]) -> Any:
"""Defines a Pydantic Field that requires a specific scope to be available before rendering.

Expand Down Expand Up @@ -50,7 +60,9 @@ class TemplatedValueResolver:

@staticmethod
def default() -> "TemplatedValueResolver":
return TemplatedValueResolver(context={"env": _env})
return TemplatedValueResolver(
context={"env": _env, "automation_condition": automation_condition_scope()}
)

def with_context(self, **additional_context) -> "TemplatedValueResolver":
return TemplatedValueResolver(context={**self.context, **additional_context})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,34 @@ class OpSpecBaseModel(BaseModel):
tags: Optional[Dict[str, str]] = None


class AutomationConditionModel(BaseModel):
type: str
params: Mapping[str, Any] = {}
class AssetAttributesModel(BaseModel):
key: Optional[str] = None
deps: Sequence[str] = []
description: Optional[str] = None
metadata: Union[str, Mapping[str, Any]] = {}
group_name: Optional[str] = None
skippable: bool = False
code_version: Optional[str] = None
owners: Sequence[str] = []
tags: Union[str, Mapping[str, str]] = {}
automation_condition: Optional[Union[str, AutomationCondition]] = RenderingScope(
Field(None), required_scope={"automation_condition"}
)
Comment on lines +33 to +35
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, this field should actually just be of type Optional[str].

I want to do a followup here where we do something like:

automation_condition: Optional[str] = RenderedField(Optional[AutomationCondition], required_scope={"automation_condition"})

where we can have separate type checking for the rendered value

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a great followup.

Leave comment to this effect explaining the next steps.


class Config:
# required for AutomationCondition
arbitrary_types_allowed = True

def to_automation_condition(self) -> AutomationCondition:
return getattr(AutomationCondition, self.type)(**self.params)
def get_resolved_attributes(self, value_resolver: TemplatedValueResolver) -> Mapping[str, Any]:
return value_resolver.render_obj(self.model_dump(exclude_unset=True))


class AssetSpecProcessor(ABC, BaseModel):
target: str = "*"
description: Optional[str] = None
metadata: Optional[Mapping[str, Any]] = None
group_name: Optional[str] = None
tags: Optional[Mapping[str, str]] = None
automation_condition: Optional[AutomationConditionModel] = None
attributes: AssetAttributesModel

class Config:
arbitrary_types_allowed = True

def _apply_to_spec(self, spec: AssetSpec, attributes: Mapping[str, Any]) -> AssetSpec: ...

Expand All @@ -48,10 +61,9 @@ def apply_to_spec(
return spec

# add the original spec to the context and resolve values
attributes = value_resolver.with_context(asset=spec).render_obj(
self.model_dump(exclude={"target", "operation"}, exclude_unset=True)
return self._apply_to_spec(
spec, self.attributes.get_resolved_attributes(value_resolver.with_context(asset=spec))
)
return self._apply_to_spec(spec, attributes)

def apply(self, defs: Definitions, value_resolver: TemplatedValueResolver) -> Definitions:
target_selection = AssetSelection.from_string(self.target, include_sources=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,24 @@
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Mapping, Sequence

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.pipes.subprocess import PipesSubprocessClient
from dagster._utils.warnings import suppress_dagster_warnings
from pydantic import BaseModel

from dagster_components.core.component import Component, ComponentLoadContext, component_type
from dagster_components.core.dsl_schema import AutomationConditionModel
from dagster_components.core.dsl_schema import AssetAttributesModel

if TYPE_CHECKING:
from dagster._core.definitions.definitions_class import Definitions


class AssetSpecModel(BaseModel):
key: str
deps: Sequence[str] = []
description: Optional[str] = None
metadata: Mapping[str, Any] = {}
group_name: Optional[str] = None
skippable: bool = False
code_version: Optional[str] = None
owners: Sequence[str] = []
tags: Mapping[str, str] = {}
automation_condition: Optional[AutomationConditionModel] = None

@suppress_dagster_warnings
def to_asset_spec(self) -> AssetSpec:
return AssetSpec(
**{
**self.__dict__,
"key": AssetKey.from_user_string(self.key),
"automation_condition": self.automation_condition.to_automation_condition()
if self.automation_condition
else None,
},
)


class PipesSubprocessScriptParams(BaseModel):
path: str
assets: Sequence[AssetSpecModel]
assets: Sequence[AssetAttributesModel]


class PipesSubprocessScriptCollectionParams(BaseModel):
Expand Down Expand Up @@ -78,11 +51,14 @@ def load(cls, context: ComponentLoadContext) -> "PipesSubprocessScriptCollection
script_path = context.path / script.path
if not script_path.exists():
raise FileNotFoundError(f"Script {script_path} does not exist")
path_specs[script_path] = [spec.to_asset_spec() for spec in script.assets]
path_specs[script_path] = [
AssetSpec(**asset.get_resolved_attributes(context.templated_value_resolver))
for asset in script.assets
]

return cls(dirpath=context.path, path_specs=path_specs)

def build_defs(self, load_context: "ComponentLoadContext") -> "Definitions":
def build_defs(self, context: "ComponentLoadContext") -> "Definitions":
from dagster._core.definitions.definitions_class import Definitions

return Definitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ params:
project_dir: jaffle_shop

asset_attributes:
- tags:
foo: bar
metadata:
something: 1
automation_condition:
type: on_cron
params:
cron_schedule: "@daily"
- tags:
another: one
- attributes:
tags:
foo: bar
metadata:
something: 1
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
- attributes:
tags:
another: one
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@ params:
- path: script_one.py
assets:
- key: a
automation_condition:
type: eager
automation_condition: "{{ automation_condition.eager() }}"
- key: b
automation_condition:
type: on_cron
params:
cron_schedule: "@daily"
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
deps: [up1, up2]
- path: script_two.py
assets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ def test_python_params() -> None:
{
"path": "script_one.py",
"assets": [
{"key": "a", "automation_condition": {"type": "eager"}},
{
"key": "a",
"automation_condition": "{{ automation_condition.eager() }}",
},
{
"key": "b",
"automation_condition": {
"type": "on_cron",
"params": {"cron_schedule": "@daily"},
},
"automation_condition": "{{ automation_condition.on_cron('@daily') }}",
"deps": ["up1", "up2"],
},
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from dagster import AssetKey, AssetSpec, Definitions
from dagster import AssetKey, AssetSpec, AutomationCondition, Definitions
from dagster_components.core.dsl_schema import (
AssetAttributes,
AssetAttributesModel,
MergeAttributes,
ReplaceAttributes,
TemplatedValueResolver,
Expand All @@ -23,7 +24,11 @@ class M(BaseModel):


def test_replace_attributes() -> None:
op = ReplaceAttributes(operation="replace", target="group:g2", tags={"newtag": "newval"})
op = ReplaceAttributes(
operation="replace",
target="group:g2",
attributes=AssetAttributesModel(tags={"newtag": "newval"}),
)

newdefs = op.apply(defs, TemplatedValueResolver.default())
asset_graph = newdefs.get_asset_graph()
Expand All @@ -33,7 +38,11 @@ def test_replace_attributes() -> None:


def test_merge_attributes() -> None:
op = MergeAttributes(operation="merge", target="group:g2", tags={"newtag": "newval"})
op = MergeAttributes(
operation="merge",
target="group:g2",
attributes=AssetAttributesModel(tags={"newtag": "newval"}),
)

newdefs = op.apply(defs, TemplatedValueResolver.default())
asset_graph = newdefs.get_asset_graph()
Expand All @@ -43,7 +52,9 @@ def test_merge_attributes() -> None:


def test_render_attributes_asset_context() -> None:
op = MergeAttributes(tags={"group_name_tag": "group__{{ asset.group_name }}"})
op = MergeAttributes(
attributes=AssetAttributesModel(tags={"group_name_tag": "group__{{ asset.group_name }}"})
)

newdefs = op.apply(defs, TemplatedValueResolver.default().with_context(foo="theval"))
asset_graph = newdefs.get_asset_graph()
Expand All @@ -54,33 +65,68 @@ def test_render_attributes_asset_context() -> None:

def test_render_attributes_custom_context() -> None:
op = ReplaceAttributes(
operation="replace", target="group:g2", tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"}
operation="replace",
target="group:g2",
attributes=AssetAttributesModel(
tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"},
metadata="{{ metadata }}",
automation_condition="{{ custom_cron('@daily') }}",
),
)

newdefs = op.apply(defs, TemplatedValueResolver.default().with_context(foo="theval"))
def _custom_cron(s):
return AutomationCondition.cron_tick_passed(s) & ~AutomationCondition.in_progress()

metadata = {"a": 1, "b": "str", "d": 1.23}
newdefs = op.apply(
defs,
TemplatedValueResolver.default().with_context(
foo="theval", metadata=metadata, custom_cron=_custom_cron
),
)
asset_graph = newdefs.get_asset_graph()
assert asset_graph.get(AssetKey("a")).tags == {}
assert asset_graph.get(AssetKey("b")).tags == {"a": "theval", "b": "prefix_theval"}
assert asset_graph.get(AssetKey("c")).tags == {"a": "theval", "b": "prefix_theval"}
assert asset_graph.get(AssetKey("a")).metadata == {}
assert asset_graph.get(AssetKey("a")).automation_condition is None

for k in ["b", "c"]:
node = asset_graph.get(AssetKey(k))
assert node.tags == {"a": "theval", "b": "prefix_theval"}
assert node.metadata == metadata
assert node.automation_condition == _custom_cron("@daily")


@pytest.mark.parametrize(
"python,expected",
[
# default to merge and a * target
({"tags": {"a": "b"}}, MergeAttributes(target="*", tags={"a": "b"})),
(
{"operation": "replace", "tags": {"a": "b"}},
ReplaceAttributes(operation="replace", target="*", tags={"a": "b"}),
{"attributes": {"tags": {"a": "b"}}},
MergeAttributes(target="*", attributes=AssetAttributesModel(tags={"a": "b"})),
),
(
{"operation": "replace", "attributes": {"tags": {"a": "b"}}},
ReplaceAttributes(
operation="replace",
target="*",
attributes=AssetAttributesModel(tags={"a": "b"}),
),
),
# explicit target
(
{"tags": {"a": "b"}, "target": "group:g2"},
MergeAttributes(target="group:g2", tags={"a": "b"}),
{"attributes": {"tags": {"a": "b"}}, "target": "group:g2"},
MergeAttributes(
target="group:g2",
attributes=AssetAttributesModel(tags={"a": "b"}),
),
),
(
{"operation": "replace", "tags": {"a": "b"}, "target": "group:g2"},
ReplaceAttributes(operation="replace", target="group:g2", tags={"a": "b"}),
{"operation": "replace", "attributes": {"tags": {"a": "b"}}, "target": "group:g2"},
ReplaceAttributes(
operation="replace",
target="group:g2",
attributes=AssetAttributesModel(tags={"a": "b"}),
),
),
],
)
Expand Down