Skip to content

Commit

Permalink
[pythonic resources] Improve nested resource evaluation logic (#14807)
Browse files Browse the repository at this point in the history
## Summary

This PR attempts to address a couple shortcomings with the Pythonic
resource system which are exposed in #14751.

First, the logic to determine whether a field on a resource represents a
nested resource or a config field is not a good heuristic currently.
Right now, the check only assesses whether the value is a resource type
- but this fails in the case that a user wants to provide a raw value
(ie an instance of `IOManager`). This PR updates this check to inspect
the annotation on the field, and treat it as a resource only if the
annotation indicates as much.

Second, the initialization logic for nested resources
(`coerce_to_resource`) doesn't properly handle raw values such as a
plain `IOManager`. This PR updates the logic in `coerce_to_resource` to
call out to `wrap_resources_for_execution`, which can successfully deal
with these cases.

## Example

The following resource previously would not accept a raw `IOManager` as
input, only a resource which produces one:
```python
class MyResourceNeedsIOManager(ConfigurableIOManager):
    base_io_manager: ResourceDependency[IOManager]


@io_manager
def my_io_manager(context) -> IOManager:
    pass

# previously OK, since my_io_manager is a resource
MyResourceNeedsIOManager(base_io_manager=my_io_manager)


class MyIOManager(IOManager):
    pass

# previously errors, since MyIOManager is not a resource
MyResourceNeedsIOManager(base_io_manager=MyIOManager())
```

The new code will coerce `MyIOManager` to a resource by wrapping it,
since it is annotated as a `ResourceDependency`. This gives the user
more flexibility when specifying resource-resource dependencies to input
raw values or mocks.

## Test Plan

Existing unit tests succeed. New unit tests of nesting behavior with raw
values (such as an IOManager).
  • Loading branch information
benpankow authored Jun 27, 2023
1 parent 959a4dd commit 625034a
Show file tree
Hide file tree
Showing 6 changed files with 692 additions and 539 deletions.
94 changes: 57 additions & 37 deletions python_modules/dagster/dagster/_config/pythonic_config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
Dict,
Generator,
Generic,
List,
Mapping,
NamedTuple,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

from pydantic import ConstrainedFloat, ConstrainedInt, ConstrainedStr
from typing_extensions import TypeAlias, TypeGuard, get_args
from typing_extensions import TypeAlias, TypeGuard, get_args, get_origin

from dagster import (
Enum as DagsterEnum,
Expand Down Expand Up @@ -446,6 +448,8 @@ class AllowDelayedDependencies:
def _resolve_required_resource_keys(
self, resource_mapping: Mapping[int, str]
) -> AbstractSet[str]:
from dagster._core.execution.build_resources import wrap_resource_for_execution

# All dependent resources which are not fully configured
# must be specified to the Definitions object so that the
# resource can be configured at runtime by the user
Expand All @@ -468,11 +472,13 @@ def _resolve_required_resource_keys(
_resolve_required_resource_keys_for_resource(v, resource_mapping)
)

resources, _ = separate_resource_params(self.__dict__)
resources, _ = separate_resource_params(
cast(Type[BaseModel], self.__class__), self.__dict__
)
for v in resources.values():
nested_resource_required_keys.update(
_resolve_required_resource_keys_for_resource(
coerce_to_resource(v), resource_mapping
wrap_resource_for_execution(v), resource_mapping
)
)

Expand Down Expand Up @@ -604,14 +610,6 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]:
return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource))


def coerce_to_resource(
coercible_to_resource: CoercibleToResource,
) -> ResourceDefinition:
if isinstance(coercible_to_resource, (ConfigurableResourceFactory, PartialResource)):
return coercible_to_resource.get_resource_definition()
return coercible_to_resource


class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies):
def __init__(
self,
Expand All @@ -620,7 +618,7 @@ def __init__(
config_schema: Any,
description: Optional[str],
resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]],
nested_resources: Mapping[str, CoercibleToResource],
nested_resources: Mapping[str, Any],
dagster_maintained: bool = False,
):
super().__init__(
Expand All @@ -640,7 +638,7 @@ def configurable_resource_cls(self) -> Type:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

def _resolve_required_resource_keys(
Expand All @@ -660,7 +658,7 @@ def __init__(
config_schema: Any,
description: Optional[str],
resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]],
nested_resources: Mapping[str, CoercibleToResource],
nested_resources: Mapping[str, Any],
input_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None,
output_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None,
dagster_maintained: bool = False,
Expand Down Expand Up @@ -694,7 +692,7 @@ def configurable_resource_cls(self) -> Type:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

def _resolve_required_resource_keys(
Expand All @@ -704,11 +702,11 @@ def _resolve_required_resource_keys(


class ConfigurableResourceFactoryState(NamedTuple):
nested_partial_resources: Mapping[str, CoercibleToResource]
nested_partial_resources: Mapping[str, Any]
resolved_config_dict: Dict[str, Any]
config_schema: DefinitionConfigSchema
schema: DagsterField
nested_resources: Dict[str, CoercibleToResource]
nested_resources: Dict[str, Any]
resource_context: Optional[InitResourceContext]


Expand Down Expand Up @@ -766,7 +764,7 @@ def asset_that_uses_database(database: ResourceParam[Database]):
"""

def __init__(self, **data: Any):
resource_pointers, data_without_resources = separate_resource_params(data)
resource_pointers, data_without_resources = separate_resource_params(self.__class__, data)

schema = infer_schema_from_config_class(
self.__class__, fields_to_omit=set(resource_pointers.keys())
Expand Down Expand Up @@ -861,7 +859,7 @@ def create_resource(self, context: InitResourceContext) -> TResValue:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

@classmethod
Expand Down Expand Up @@ -897,6 +895,8 @@ def _resolve_and_update_nested_resources(
Returns a new instance of the resource.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution

partial_resources_to_update: Dict[str, Any] = {}
if self._nested_partial_resources:
context_with_mapping = cast(
Expand All @@ -918,10 +918,10 @@ def _resolve_and_update_nested_resources(

# Also evaluate any resources that are not partial
with contextlib.ExitStack() as stack:
resources_to_update, _ = separate_resource_params(self.__dict__)
resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__)
resources_to_update = {
attr_name: _call_resource_fn_with_default(
stack, coerce_to_resource(resource), context
stack, wrap_resource_for_execution(resource), context
)
for attr_name, resource in resources_to_update.items()
if attr_name not in partial_resources_to_update
Expand Down Expand Up @@ -1119,7 +1119,9 @@ def create_resource(self, context: InitResourceContext) -> TResValue:


def _is_fully_configured(resource: CoercibleToResource) -> bool:
actual_resource = coerce_to_resource(resource)
from dagster._core.execution.build_resources import wrap_resource_for_execution

actual_resource = wrap_resource_for_execution(resource)
res = (
validate_config(
actual_resource.config_schema.config_type,
Expand All @@ -1134,11 +1136,11 @@ def _is_fully_configured(resource: CoercibleToResource) -> bool:


class PartialResourceState(NamedTuple):
nested_partial_resources: Dict[str, CoercibleToResource]
nested_partial_resources: Dict[str, Any]
config_schema: DagsterField
resource_fn: Callable[[InitResourceContext], Any]
description: Optional[str]
nested_resources: Dict[str, CoercibleToResource]
nested_resources: Dict[str, Any]


class PartialResource(Generic[TResValue], AllowDelayedDependencies, MakeConfigCacheable):
Expand All @@ -1150,7 +1152,7 @@ def __init__(
resource_cls: Type[ConfigurableResourceFactory[TResValue]],
data: Dict[str, Any],
):
resource_pointers, _data_without_resources = separate_resource_params(data)
resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data)

MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs

Expand Down Expand Up @@ -1178,13 +1180,13 @@ def resource_fn(context: InitResourceContext):
@property
def _nested_partial_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._state__internal__.nested_partial_resources

@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._state__internal__.nested_resources

@cached_method
Expand Down Expand Up @@ -1772,18 +1774,36 @@ def infer_schema_from_config_class(


class SeparatedResourceParams(NamedTuple):
resources: Dict[str, CoercibleToResource]
resources: Dict[str, Any]
non_resources: Dict[str, Any]


def separate_resource_params(data: Dict[str, Any]) -> SeparatedResourceParams:
def _is_annotated_as_resource_type(annotation: Type) -> bool:
"""Determines if a field in a structured config class is annotated as a resource type or not."""
is_annotated_as_resource_dependency = get_origin(annotation) == ResourceDependency or getattr(
annotation, "__metadata__", None
) == ("resource_dependency",)

return is_annotated_as_resource_dependency or safe_is_subclass(
annotation, (ResourceDefinition, ConfigurableResourceFactory)
)


def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are themselves Resources and those which are not.
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
return SeparatedResourceParams(
resources={k: v for k, v in data.items() if is_coercible_to_resource(v)},
non_resources={k: v for k, v in data.items() if not is_coercible_to_resource(v)},
keys_by_alias = {field.alias: field for field in cls.__fields__.values()}
data_with_annotation: List[Tuple[str, Any, Type]] = [
(k, v, keys_by_alias[k].annotation) for k, v in data.items() if k in keys_by_alias
]
out = SeparatedResourceParams(
resources={k: v for k, v, t in data_with_annotation if _is_annotated_as_resource_type(t)},
non_resources={
k: v for k, v, t in data_with_annotation if not _is_annotated_as_resource_type(t)
},
)
return out


def _call_resource_fn_with_default(
Expand Down Expand Up @@ -1811,14 +1831,14 @@ def _call_resource_fn_with_default(
)
context = context.replace_config(cast(dict, evr.value)["config"])

is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance(
obj.resource_fn, contextlib.ContextDecorator
)
if has_at_least_one_parameter(obj.resource_fn): # type: ignore[unreachable]
if has_at_least_one_parameter(obj.resource_fn):
result = cast(ResourceFunctionWithContext, obj.resource_fn)(context)
else:
result = cast(ResourceFunctionWithoutContext, obj.resource_fn)()

is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance(
obj.resource_fn, contextlib.ContextDecorator
)
if is_fn_generator:
return stack.enter_context(cast(contextlib.AbstractContextManager, result))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pydantic
from pydantic import Field
from typing_extensions import dataclass_transform, get_origin
from typing_extensions import Annotated, dataclass_transform, get_origin

from dagster._core.errors import DagsterInvalidDagsterTypeInPythonicConfigDefinitionError

Expand Down Expand Up @@ -110,16 +110,20 @@ def __new__(cls, name, bases, namespaces, **kwargs) -> Any:
# arg = get_args(annotations[field])[0]
# If so, we treat it as a Union of a PartialResource and a Resource
# for Pydantic's sake.
annotations[field] = Any
annotations[field] = Annotated[Any, "resource_dependency"]
elif safe_is_subclass(
annotations[field], LateBoundTypesForResourceTypeChecking.get_resource_type()
):
# If the annotation is a Resource, we treat it as a Union of a PartialResource
# and a Resource for Pydantic's sake, so that a user can pass in a partially
# configured resource.
base = annotations[field]
annotations[field] = Union[
LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base), base
annotations[field] = Annotated[
Union[
LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base),
base,
],
"resource_dependency",
]

namespaces["__annotations__"] = annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ConfigurableIOManagerFactoryResourceDefinition,
ConfigurableResourceFactoryResourceDefinition,
ResourceWithKeyMapping,
coerce_to_resource,
)
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets_job import (
Expand Down Expand Up @@ -67,6 +66,8 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st
resource's default config. This is used to extract environment variables from the top-level
resources in a Definitions object.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution

config_schema_default = cast(
Mapping[str, Any],
json.loads(resource_def.config_schema.default_value_as_json_str)
Expand All @@ -86,7 +87,7 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st
nested_resources = resource_def.inner_resource.nested_resources
for nested_resource in nested_resources.values():
env_vars = env_vars.union(
_env_vars_from_resource_defaults(coerce_to_resource(nested_resource))
_env_vars_from_resource_defaults(wrap_resource_for_execution(nested_resource))
)

return env_vars
Expand Down
34 changes: 19 additions & 15 deletions python_modules/dagster/dagster/_core/execution/build_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,26 @@ def the_resource():
def wrap_resources_for_execution(
resources: Optional[Mapping[str, Any]] = None
) -> Dict[str, ResourceDefinition]:
return (
{
resource_key: wrap_resource_for_execution(resource)
for resource_key, resource in resources.items()
}
if resources
else {}
)


def wrap_resource_for_execution(resource: Any) -> ResourceDefinition:
from dagster._config.pythonic_config import ConfigurableResourceFactory, PartialResource

resources = check.opt_mapping_param(resources, "resources", key_type=str)
resource_defs = {}
# Wrap instantiated resource values in a resource definition.
# If an instantiated IO manager is provided, wrap it in an IO manager definition.
for resource_key, resource in resources.items():
# Wrap instantiated resource values in a resource definition.
# If an instantiated IO manager is provided, wrap it in an IO manager definition.
if isinstance(resource, (ConfigurableResourceFactory, PartialResource)):
resource_defs[resource_key] = resource.get_resource_definition()
elif isinstance(resource, ResourceDefinition):
resource_defs[resource_key] = resource
elif isinstance(resource, IOManager):
resource_defs[resource_key] = IOManagerDefinition.hardcoded_io_manager(resource)
else:
resource_defs[resource_key] = ResourceDefinition.hardcoded_resource(resource)

return resource_defs
if isinstance(resource, (ConfigurableResourceFactory, PartialResource)):
return resource.get_resource_definition()
elif isinstance(resource, ResourceDefinition):
return resource
elif isinstance(resource, IOManager):
return IOManagerDefinition.hardcoded_io_manager(resource)
else:
return ResourceDefinition.hardcoded_resource(resource)
Loading

0 comments on commit 625034a

Please sign in to comment.