-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[components] Templating for asset_attributes #26633
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import functools | ||
import json | ||
import os | ||
from typing import AbstractSet, Any, Mapping, Optional, Sequence, Type, TypeVar, Union | ||
from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Type, TypeVar, Union | ||
|
||
import dagster._check as check | ||
from dagster._record import record | ||
|
@@ -40,6 +41,9 @@ def _env(key: str) -> Optional[str]: | |
return os.environ.get(key) | ||
|
||
|
||
ShouldRenderFn = Callable[[Sequence[Union[str, int]]], bool] | ||
|
||
|
||
@record | ||
class TemplatedValueResolver: | ||
context: Mapping[str, Any] | ||
|
@@ -51,11 +55,49 @@ def default() -> "TemplatedValueResolver": | |
def with_context(self, **additional_context) -> "TemplatedValueResolver": | ||
return TemplatedValueResolver(context={**self.context, **additional_context}) | ||
|
||
def resolve(self, val: Any) -> Any: | ||
def _resolve_value(self, val: Any) -> Any: | ||
return NativeTemplate(val).render(**self.context) if isinstance(val, str) else val | ||
|
||
def _resolve( | ||
self, | ||
val: Any, | ||
valpath: Optional[Sequence[Union[str, int]]], | ||
should_render: Callable[[Sequence[Union[str, int]]], bool], | ||
) -> Any: | ||
if valpath is not None and not should_render(valpath): | ||
return val | ||
elif isinstance(val, dict): | ||
return { | ||
k: self._resolve(v, [*valpath, k] if valpath is not None else None, should_render) | ||
for k, v in val.items() | ||
} | ||
elif isinstance(val, list): | ||
return [ | ||
self._resolve(v, [*valpath, i] if valpath is not None else None, should_render) | ||
for i, v in enumerate(val) | ||
] | ||
else: | ||
return self._resolve_value(val) | ||
|
||
def _should_render( | ||
def resolve(self, val: Any) -> Any: | ||
"""Given a raw value, preprocesses it by rendering any templated values.""" | ||
return self._resolve(val, None, lambda _: True) | ||
|
||
def resolve_params(self, val: T, target_type: Type) -> T: | ||
"""Given a raw value, preprocesses it by rendering any templated values that are not marked as deferred in the target_type's json schema.""" | ||
json_schema = ( | ||
target_type.model_json_schema() if issubclass(target_type, BaseModel) else None | ||
) | ||
if json_schema is None: | ||
should_render = lambda _: True | ||
else: | ||
should_render = functools.partial( | ||
has_required_scope, json_schema=json_schema, subschema=json_schema | ||
) | ||
return self._resolve(val, [], should_render=should_render) | ||
|
||
|
||
def has_required_scope( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think part of my confusion is that this function name is a bit ambigious. Does it mean that there is a field that has the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's right yeah -- basically should_render is a function that reads the json schema and determines if anything along the path has some required scope There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
valpath: Sequence[Union[str, int]], json_schema: Mapping[str, Any], subschema: Mapping[str, Any] | ||
) -> bool: | ||
# List[ComplexType] (e.g.) will contain a reference to the complex type schema in the | ||
|
@@ -70,7 +112,7 @@ def _should_render( | |
|
||
# Optional[ComplexType] (e.g.) will contain multiple schemas in the "anyOf" field | ||
if "anyOf" in subschema: | ||
return all(_should_render(valpath, json_schema, inner) for inner in subschema["anyOf"]) | ||
return all(has_required_scope(valpath, json_schema, inner) for inner in subschema["anyOf"]) | ||
|
||
el = valpath[0] | ||
if isinstance(el, str): | ||
|
@@ -89,30 +131,4 @@ def _should_render( | |
return subschema.get("additionalProperties", True) | ||
|
||
_, *rest = valpath | ||
return _should_render(rest, json_schema, inner) | ||
|
||
|
||
def _render_values( | ||
value_resolver: TemplatedValueResolver, | ||
val: Any, | ||
valpath: Sequence[Union[str, int]], | ||
json_schema: Optional[Mapping[str, Any]], | ||
) -> Any: | ||
if json_schema and not _should_render(valpath, json_schema, json_schema): | ||
return val | ||
elif isinstance(val, dict): | ||
return { | ||
k: _render_values(value_resolver, v, [*valpath, k], json_schema) for k, v in val.items() | ||
} | ||
elif isinstance(val, list): | ||
return [ | ||
_render_values(value_resolver, v, [*valpath, i], json_schema) for i, v in enumerate(val) | ||
] | ||
else: | ||
return value_resolver.resolve(val) | ||
|
||
|
||
def preprocess_value(renderer: TemplatedValueResolver, val: T, target_type: Type) -> T: | ||
"""Given a raw value, preprocesses it by rendering any templated values that are not marked as deferred in the target_type's json schema.""" | ||
json_schema = target_type.model_json_schema() if issubclass(target_type, BaseModel) else None | ||
return _render_values(renderer, val, [], json_schema) | ||
return has_required_scope(rest, json_schema, inner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not following this
should_render
stuff. Under what conditions will this be true versus false?