Skip to content

Commit

Permalink
[pythonic resources] Improve nested resource evaluation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Jun 21, 2023
1 parent c7dde26 commit a0dc4d9
Show file tree
Hide file tree
Showing 4 changed files with 538 additions and 578 deletions.
46 changes: 33 additions & 13 deletions python_modules/dagster/dagster/_config/pythonic_config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
Dict,
Generator,
Generic,
List,
Mapping,
NamedTuple,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

from pydantic import ConstrainedFloat, ConstrainedInt, ConstrainedStr
from typing_extensions import TypeAlias, TypeGuard, get_args
from typing_extensions import TypeAlias, TypeGuard, get_args, get_origin

from dagster import (
Enum as DagsterEnum,
Expand Down Expand Up @@ -468,7 +470,7 @@ def _resolve_required_resource_keys(
_resolve_required_resource_keys_for_resource(v, resource_mapping)
)

resources, _ = separate_resource_params(self.__dict__)
resources, _ = separate_resource_params(self.__class__, self.__dict__)
for v in resources.values():
nested_resource_required_keys.update(
_resolve_required_resource_keys_for_resource(
Expand Down Expand Up @@ -607,9 +609,9 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]:
def coerce_to_resource(
coercible_to_resource: CoercibleToResource,
) -> ResourceDefinition:
if isinstance(coercible_to_resource, (ConfigurableResourceFactory, PartialResource)):
return coercible_to_resource.get_resource_definition()
return coercible_to_resource
from dagster._core.execution.build_resources import wrap_resources_for_execution

return wrap_resources_for_execution({"test": coercible_to_resource})["test"]


class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies):
Expand Down Expand Up @@ -766,7 +768,7 @@ def asset_that_uses_database(database: ResourceParam[Database]):
"""

def __init__(self, **data: Any):
resource_pointers, data_without_resources = separate_resource_params(data)
resource_pointers, data_without_resources = separate_resource_params(self.__class__, data)

schema = infer_schema_from_config_class(
self.__class__, fields_to_omit=set(resource_pointers.keys())
Expand Down Expand Up @@ -918,7 +920,7 @@ def _resolve_and_update_nested_resources(

# Also evaluate any resources that are not partial
with contextlib.ExitStack() as stack:
resources_to_update, _ = separate_resource_params(self.__dict__)
resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__)
resources_to_update = {
attr_name: _call_resource_fn_with_default(
stack, coerce_to_resource(resource), context
Expand Down Expand Up @@ -1150,7 +1152,7 @@ def __init__(
resource_cls: Type[ConfigurableResourceFactory[TResValue]],
data: Dict[str, Any],
):
resource_pointers, _data_without_resources = separate_resource_params(data)
resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data)

MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs

Expand Down Expand Up @@ -1776,14 +1778,32 @@ class SeparatedResourceParams(NamedTuple):
non_resources: Dict[str, Any]


def separate_resource_params(data: Dict[str, Any]) -> SeparatedResourceParams:
def _is_annotated_as_resource_type(annotation: Type) -> bool:
"""Determines if a field in a structured config class is annotated as a resource type or not."""
is_annotated_as_resource_dependency = get_origin(annotation) == ResourceDependency or getattr(
annotation, "__metadata__", None
) == ("resource_dependency",)

return is_annotated_as_resource_dependency or safe_is_subclass(
annotation, (ResourceDefinition, ConfigurableResourceFactory)
)


def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are themselves Resources and those which are not.
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
return SeparatedResourceParams(
resources={k: v for k, v in data.items() if is_coercible_to_resource(v)},
non_resources={k: v for k, v in data.items() if not is_coercible_to_resource(v)},
keys_by_alias = {field.alias: field for field in cls.__fields__.values()}
data_with_annotation: List[Tuple[str, Any, Type]] = [
(k, v, keys_by_alias[k].annotation) for k, v in data.items() if k in keys_by_alias
]
out = SeparatedResourceParams(
resources={k: v for k, v, t in data_with_annotation if _is_annotated_as_resource_type(t)},
non_resources={
k: v for k, v, t in data_with_annotation if not _is_annotated_as_resource_type(t)
},
)
return out


def _call_resource_fn_with_default(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pydantic
from pydantic import Field
from typing_extensions import dataclass_transform, get_origin
from typing_extensions import Annotated, dataclass_transform, get_origin

from dagster._core.errors import DagsterInvalidDagsterTypeInPythonicConfigDefinitionError

Expand Down Expand Up @@ -110,16 +110,20 @@ def __new__(cls, name, bases, namespaces, **kwargs) -> Any:
# arg = get_args(annotations[field])[0]
# If so, we treat it as a Union of a PartialResource and a Resource
# for Pydantic's sake.
annotations[field] = Any
annotations[field] = Annotated[Any, "resource_dependency"]
elif safe_is_subclass(
annotations[field], LateBoundTypesForResourceTypeChecking.get_resource_type()
):
# If the annotation is a Resource, we treat it as a Union of a PartialResource
# and a Resource for Pydantic's sake, so that a user can pass in a partially
# configured resource.
base = annotations[field]
annotations[field] = Union[
LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base), base
annotations[field] = Annotated[
Union[
LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base),
base,
],
"resource_dependency",
]

namespaces["__annotations__"] = annotations
Expand Down
Loading

0 comments on commit a0dc4d9

Please sign in to comment.