Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pythonic resources] Improve nested resource evaluation logic #14807

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 57 additions & 37 deletions python_modules/dagster/dagster/_config/pythonic_config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
Dict,
Generator,
Generic,
List,
Mapping,
NamedTuple,
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

from pydantic import ConstrainedFloat, ConstrainedInt, ConstrainedStr
from typing_extensions import TypeAlias, TypeGuard, get_args
from typing_extensions import TypeAlias, TypeGuard, get_args, get_origin

from dagster import (
Enum as DagsterEnum,
Expand Down Expand Up @@ -446,6 +448,8 @@ class AllowDelayedDependencies:
def _resolve_required_resource_keys(
self, resource_mapping: Mapping[int, str]
) -> AbstractSet[str]:
from dagster._core.execution.build_resources import wrap_resource_for_execution

# All dependent resources which are not fully configured
# must be specified to the Definitions object so that the
# resource can be configured at runtime by the user
Expand All @@ -468,11 +472,13 @@ def _resolve_required_resource_keys(
_resolve_required_resource_keys_for_resource(v, resource_mapping)
)

resources, _ = separate_resource_params(self.__dict__)
resources, _ = separate_resource_params(
cast(Type[BaseModel], self.__class__), self.__dict__
)
for v in resources.values():
nested_resource_required_keys.update(
_resolve_required_resource_keys_for_resource(
coerce_to_resource(v), resource_mapping
wrap_resource_for_execution(v), resource_mapping
)
)

Expand Down Expand Up @@ -604,14 +610,6 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]:
return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource))


def coerce_to_resource(
coercible_to_resource: CoercibleToResource,
) -> ResourceDefinition:
if isinstance(coercible_to_resource, (ConfigurableResourceFactory, PartialResource)):
return coercible_to_resource.get_resource_definition()
return coercible_to_resource


class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies):
def __init__(
self,
Expand All @@ -620,7 +618,7 @@ def __init__(
config_schema: Any,
description: Optional[str],
resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]],
nested_resources: Mapping[str, CoercibleToResource],
nested_resources: Mapping[str, Any],
dagster_maintained: bool = False,
):
super().__init__(
Expand All @@ -640,7 +638,7 @@ def configurable_resource_cls(self) -> Type:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

def _resolve_required_resource_keys(
Expand All @@ -660,7 +658,7 @@ def __init__(
config_schema: Any,
description: Optional[str],
resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]],
nested_resources: Mapping[str, CoercibleToResource],
nested_resources: Mapping[str, Any],
input_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None,
output_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None,
dagster_maintained: bool = False,
Expand Down Expand Up @@ -694,7 +692,7 @@ def configurable_resource_cls(self) -> Type:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

def _resolve_required_resource_keys(
Expand All @@ -704,11 +702,11 @@ def _resolve_required_resource_keys(


class ConfigurableResourceFactoryState(NamedTuple):
nested_partial_resources: Mapping[str, CoercibleToResource]
nested_partial_resources: Mapping[str, Any]
resolved_config_dict: Dict[str, Any]
config_schema: DefinitionConfigSchema
schema: DagsterField
nested_resources: Dict[str, CoercibleToResource]
nested_resources: Dict[str, Any]
resource_context: Optional[InitResourceContext]


Expand Down Expand Up @@ -766,7 +764,7 @@ def asset_that_uses_database(database: ResourceParam[Database]):
"""

def __init__(self, **data: Any):
resource_pointers, data_without_resources = separate_resource_params(data)
resource_pointers, data_without_resources = separate_resource_params(self.__class__, data)

schema = infer_schema_from_config_class(
self.__class__, fields_to_omit=set(resource_pointers.keys())
Expand Down Expand Up @@ -861,7 +859,7 @@ def create_resource(self, context: InitResourceContext) -> TResValue:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

@classmethod
Expand Down Expand Up @@ -897,6 +895,8 @@ def _resolve_and_update_nested_resources(

Returns a new instance of the resource.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution

partial_resources_to_update: Dict[str, Any] = {}
if self._nested_partial_resources:
context_with_mapping = cast(
Expand All @@ -918,10 +918,10 @@ def _resolve_and_update_nested_resources(

# Also evaluate any resources that are not partial
with contextlib.ExitStack() as stack:
resources_to_update, _ = separate_resource_params(self.__dict__)
resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__)
resources_to_update = {
attr_name: _call_resource_fn_with_default(
stack, coerce_to_resource(resource), context
stack, wrap_resource_for_execution(resource), context
)
for attr_name, resource in resources_to_update.items()
if attr_name not in partial_resources_to_update
Expand Down Expand Up @@ -1119,7 +1119,9 @@ def create_resource(self, context: InitResourceContext) -> TResValue:


def _is_fully_configured(resource: CoercibleToResource) -> bool:
actual_resource = coerce_to_resource(resource)
from dagster._core.execution.build_resources import wrap_resource_for_execution

actual_resource = wrap_resource_for_execution(resource)
res = (
validate_config(
actual_resource.config_schema.config_type,
Expand All @@ -1134,11 +1136,11 @@ def _is_fully_configured(resource: CoercibleToResource) -> bool:


class PartialResourceState(NamedTuple):
nested_partial_resources: Dict[str, CoercibleToResource]
nested_partial_resources: Dict[str, Any]
config_schema: DagsterField
resource_fn: Callable[[InitResourceContext], Any]
description: Optional[str]
nested_resources: Dict[str, CoercibleToResource]
nested_resources: Dict[str, Any]


class PartialResource(Generic[TResValue], AllowDelayedDependencies, MakeConfigCacheable):
Expand All @@ -1150,7 +1152,7 @@ def __init__(
resource_cls: Type[ConfigurableResourceFactory[TResValue]],
data: Dict[str, Any],
):
resource_pointers, _data_without_resources = separate_resource_params(data)
resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data)

MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs

Expand Down Expand Up @@ -1178,13 +1180,13 @@ def resource_fn(context: InitResourceContext):
@property
def _nested_partial_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._state__internal__.nested_partial_resources

@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._state__internal__.nested_resources

@cached_method
Expand Down Expand Up @@ -1772,18 +1774,36 @@ def infer_schema_from_config_class(


class SeparatedResourceParams(NamedTuple):
resources: Dict[str, CoercibleToResource]
resources: Dict[str, Any]
non_resources: Dict[str, Any]


def separate_resource_params(data: Dict[str, Any]) -> SeparatedResourceParams:
def _is_annotated_as_resource_type(annotation: Type) -> bool:
"""Determines if a field in a structured config class is annotated as a resource type or not."""
is_annotated_as_resource_dependency = get_origin(annotation) == ResourceDependency or getattr(
annotation, "__metadata__", None
) == ("resource_dependency",)

return is_annotated_as_resource_dependency or safe_is_subclass(
annotation, (ResourceDefinition, ConfigurableResourceFactory)
)


def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are themselves Resources and those which are not.
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
return SeparatedResourceParams(
resources={k: v for k, v in data.items() if is_coercible_to_resource(v)},
non_resources={k: v for k, v in data.items() if not is_coercible_to_resource(v)},
keys_by_alias = {field.alias: field for field in cls.__fields__.values()}
data_with_annotation: List[Tuple[str, Any, Type]] = [
(k, v, keys_by_alias[k].annotation) for k, v in data.items() if k in keys_by_alias
]
out = SeparatedResourceParams(
resources={k: v for k, v, t in data_with_annotation if _is_annotated_as_resource_type(t)},
non_resources={
k: v for k, v, t in data_with_annotation if not _is_annotated_as_resource_type(t)
},
)
return out


def _call_resource_fn_with_default(
Expand Down Expand Up @@ -1811,14 +1831,14 @@ def _call_resource_fn_with_default(
)
context = context.replace_config(cast(dict, evr.value)["config"])

is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance(
obj.resource_fn, contextlib.ContextDecorator
)
if has_at_least_one_parameter(obj.resource_fn): # type: ignore[unreachable]
if has_at_least_one_parameter(obj.resource_fn):
result = cast(ResourceFunctionWithContext, obj.resource_fn)(context)
else:
result = cast(ResourceFunctionWithoutContext, obj.resource_fn)()

is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance(
obj.resource_fn, contextlib.ContextDecorator
)
if is_fn_generator:
return stack.enter_context(cast(contextlib.AbstractContextManager, result))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pydantic
from pydantic import Field
from typing_extensions import dataclass_transform, get_origin
from typing_extensions import Annotated, dataclass_transform, get_origin

from dagster._core.errors import DagsterInvalidDagsterTypeInPythonicConfigDefinitionError

Expand Down Expand Up @@ -110,16 +110,20 @@ def __new__(cls, name, bases, namespaces, **kwargs) -> Any:
# arg = get_args(annotations[field])[0]
# If so, we treat it as a Union of a PartialResource and a Resource
# for Pydantic's sake.
annotations[field] = Any
annotations[field] = Annotated[Any, "resource_dependency"]
elif safe_is_subclass(
annotations[field], LateBoundTypesForResourceTypeChecking.get_resource_type()
):
# If the annotation is a Resource, we treat it as a Union of a PartialResource
# and a Resource for Pydantic's sake, so that a user can pass in a partially
# configured resource.
base = annotations[field]
annotations[field] = Union[
LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base), base
annotations[field] = Annotated[
Union[
LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base),
base,
],
"resource_dependency",
]

namespaces["__annotations__"] = annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ConfigurableIOManagerFactoryResourceDefinition,
ConfigurableResourceFactoryResourceDefinition,
ResourceWithKeyMapping,
coerce_to_resource,
)
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets_job import (
Expand Down Expand Up @@ -67,6 +66,8 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st
resource's default config. This is used to extract environment variables from the top-level
resources in a Definitions object.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution

config_schema_default = cast(
Mapping[str, Any],
json.loads(resource_def.config_schema.default_value_as_json_str)
Expand All @@ -86,7 +87,7 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st
nested_resources = resource_def.inner_resource.nested_resources
for nested_resource in nested_resources.values():
env_vars = env_vars.union(
_env_vars_from_resource_defaults(coerce_to_resource(nested_resource))
_env_vars_from_resource_defaults(wrap_resource_for_execution(nested_resource))
)

return env_vars
Expand Down
34 changes: 19 additions & 15 deletions python_modules/dagster/dagster/_core/execution/build_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,26 @@ def the_resource():
def wrap_resources_for_execution(
resources: Optional[Mapping[str, Any]] = None
) -> Dict[str, ResourceDefinition]:
return (
{
resource_key: wrap_resource_for_execution(resource)
for resource_key, resource in resources.items()
}
if resources
else {}
)


def wrap_resource_for_execution(resource: Any) -> ResourceDefinition:
from dagster._config.pythonic_config import ConfigurableResourceFactory, PartialResource

resources = check.opt_mapping_param(resources, "resources", key_type=str)
resource_defs = {}
# Wrap instantiated resource values in a resource definition.
# If an instantiated IO manager is provided, wrap it in an IO manager definition.
for resource_key, resource in resources.items():
# Wrap instantiated resource values in a resource definition.
# If an instantiated IO manager is provided, wrap it in an IO manager definition.
if isinstance(resource, (ConfigurableResourceFactory, PartialResource)):
resource_defs[resource_key] = resource.get_resource_definition()
elif isinstance(resource, ResourceDefinition):
resource_defs[resource_key] = resource
elif isinstance(resource, IOManager):
resource_defs[resource_key] = IOManagerDefinition.hardcoded_io_manager(resource)
else:
resource_defs[resource_key] = ResourceDefinition.hardcoded_resource(resource)

return resource_defs
if isinstance(resource, (ConfigurableResourceFactory, PartialResource)):
return resource.get_resource_definition()
elif isinstance(resource, ResourceDefinition):
return resource
elif isinstance(resource, IOManager):
return IOManagerDefinition.hardcoded_io_manager(resource)
else:
return ResourceDefinition.hardcoded_resource(resource)
Comment on lines +118 to +140
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a behavior change or a refactor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor, so that we're able to wrap just a single resource in some cases.

Loading