Skip to content

Commit

Permalink
add more test
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Jun 21, 2023
1 parent a0dc4d9 commit 68ad4e6
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 47 deletions.
56 changes: 28 additions & 28 deletions python_modules/dagster/dagster/_config/pythonic_config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ class AllowDelayedDependencies:
def _resolve_required_resource_keys(
self, resource_mapping: Mapping[int, str]
) -> AbstractSet[str]:
from dagster._core.execution.build_resources import wrap_resource_for_execution

# All dependent resources which are not fully configured
# must be specified to the Definitions object so that the
# resource can be configured at runtime by the user
Expand All @@ -470,11 +472,13 @@ def _resolve_required_resource_keys(
_resolve_required_resource_keys_for_resource(v, resource_mapping)
)

resources, _ = separate_resource_params(self.__class__, self.__dict__)
resources, _ = separate_resource_params(
cast(Type[BaseModel], self.__class__), self.__dict__
)
for v in resources.values():
nested_resource_required_keys.update(
_resolve_required_resource_keys_for_resource(
coerce_to_resource(v), resource_mapping
wrap_resource_for_execution(v), resource_mapping
)
)

Expand Down Expand Up @@ -606,14 +610,6 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]:
return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource))


def coerce_to_resource(
coercible_to_resource: CoercibleToResource,
) -> ResourceDefinition:
from dagster._core.execution.build_resources import wrap_resources_for_execution

return wrap_resources_for_execution({"test": coercible_to_resource})["test"]


class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies):
def __init__(
self,
Expand All @@ -622,7 +618,7 @@ def __init__(
config_schema: Any,
description: Optional[str],
resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]],
nested_resources: Mapping[str, CoercibleToResource],
nested_resources: Mapping[str, Any],
dagster_maintained: bool = False,
):
super().__init__(
Expand All @@ -642,7 +638,7 @@ def configurable_resource_cls(self) -> Type:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

def _resolve_required_resource_keys(
Expand All @@ -662,7 +658,7 @@ def __init__(
config_schema: Any,
description: Optional[str],
resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]],
nested_resources: Mapping[str, CoercibleToResource],
nested_resources: Mapping[str, Any],
input_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None,
output_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None,
dagster_maintained: bool = False,
Expand Down Expand Up @@ -696,7 +692,7 @@ def configurable_resource_cls(self) -> Type:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

def _resolve_required_resource_keys(
Expand All @@ -706,11 +702,11 @@ def _resolve_required_resource_keys(


class ConfigurableResourceFactoryState(NamedTuple):
nested_partial_resources: Mapping[str, CoercibleToResource]
nested_partial_resources: Mapping[str, Any]
resolved_config_dict: Dict[str, Any]
config_schema: DefinitionConfigSchema
schema: DagsterField
nested_resources: Dict[str, CoercibleToResource]
nested_resources: Dict[str, Any]
resource_context: Optional[InitResourceContext]


Expand Down Expand Up @@ -863,7 +859,7 @@ def create_resource(self, context: InitResourceContext) -> TResValue:
@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._nested_resources

@classmethod
Expand Down Expand Up @@ -899,6 +895,8 @@ def _resolve_and_update_nested_resources(
Returns a new instance of the resource.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution

partial_resources_to_update: Dict[str, Any] = {}
if self._nested_partial_resources:
context_with_mapping = cast(
Expand All @@ -923,7 +921,7 @@ def _resolve_and_update_nested_resources(
resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__)
resources_to_update = {
attr_name: _call_resource_fn_with_default(
stack, coerce_to_resource(resource), context
stack, wrap_resource_for_execution(resource), context
)
for attr_name, resource in resources_to_update.items()
if attr_name not in partial_resources_to_update
Expand Down Expand Up @@ -1121,7 +1119,9 @@ def create_resource(self, context: InitResourceContext) -> TResValue:


def _is_fully_configured(resource: CoercibleToResource) -> bool:
actual_resource = coerce_to_resource(resource)
from dagster._core.execution.build_resources import wrap_resource_for_execution

actual_resource = wrap_resource_for_execution(resource)
res = (
validate_config(
actual_resource.config_schema.config_type,
Expand All @@ -1136,11 +1136,11 @@ def _is_fully_configured(resource: CoercibleToResource) -> bool:


class PartialResourceState(NamedTuple):
nested_partial_resources: Dict[str, CoercibleToResource]
nested_partial_resources: Dict[str, Any]
config_schema: DagsterField
resource_fn: Callable[[InitResourceContext], Any]
description: Optional[str]
nested_resources: Dict[str, CoercibleToResource]
nested_resources: Dict[str, Any]


class PartialResource(Generic[TResValue], AllowDelayedDependencies, MakeConfigCacheable):
Expand Down Expand Up @@ -1180,13 +1180,13 @@ def resource_fn(context: InitResourceContext):
@property
def _nested_partial_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._state__internal__.nested_partial_resources

@property
def nested_resources(
self,
) -> Mapping[str, CoercibleToResource]:
) -> Mapping[str, Any]:
return self._state__internal__.nested_resources

@cached_method
Expand Down Expand Up @@ -1774,7 +1774,7 @@ def infer_schema_from_config_class(


class SeparatedResourceParams(NamedTuple):
resources: Dict[str, CoercibleToResource]
resources: Dict[str, Any]
non_resources: Dict[str, Any]


Expand Down Expand Up @@ -1831,14 +1831,14 @@ def _call_resource_fn_with_default(
)
context = context.replace_config(cast(dict, evr.value)["config"])

is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance(
obj.resource_fn, contextlib.ContextDecorator
)
if has_at_least_one_parameter(obj.resource_fn): # type: ignore[unreachable]
if has_at_least_one_parameter(obj.resource_fn):
result = cast(ResourceFunctionWithContext, obj.resource_fn)(context)
else:
result = cast(ResourceFunctionWithoutContext, obj.resource_fn)()

is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance(
obj.resource_fn, contextlib.ContextDecorator
)
if is_fn_generator:
return stack.enter_context(cast(contextlib.AbstractContextManager, result))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
ConfigurableIOManagerFactoryResourceDefinition,
ConfigurableResourceFactoryResourceDefinition,
ResourceWithKeyMapping,
coerce_to_resource,
)
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets_job import (
Expand Down Expand Up @@ -67,6 +66,8 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st
resource's default config. This is used to extract environment variables from the top-level
resources in a Definitions object.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution

config_schema_default = cast(
Mapping[str, Any],
json.loads(resource_def.config_schema.default_value_as_json_str)
Expand All @@ -86,7 +87,7 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st
nested_resources = resource_def.inner_resource.nested_resources
for nested_resource in nested_resources.values():
env_vars = env_vars.union(
_env_vars_from_resource_defaults(coerce_to_resource(nested_resource))
_env_vars_from_resource_defaults(wrap_resource_for_execution(nested_resource))
)

return env_vars
Expand Down
34 changes: 19 additions & 15 deletions python_modules/dagster/dagster/_core/execution/build_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,26 @@ def the_resource():
def wrap_resources_for_execution(
resources: Optional[Mapping[str, Any]] = None
) -> Dict[str, ResourceDefinition]:
return (
{
resource_key: wrap_resource_for_execution(resource)
for resource_key, resource in resources.items()
}
if resources
else {}
)


def wrap_resource_for_execution(resource: Any) -> ResourceDefinition:
from dagster._config.pythonic_config import ConfigurableResourceFactory, PartialResource

resources = check.opt_mapping_param(resources, "resources", key_type=str)
resource_defs = {}
# Wrap instantiated resource values in a resource definition.
# If an instantiated IO manager is provided, wrap it in an IO manager definition.
for resource_key, resource in resources.items():
# Wrap instantiated resource values in a resource definition.
# If an instantiated IO manager is provided, wrap it in an IO manager definition.
if isinstance(resource, (ConfigurableResourceFactory, PartialResource)):
resource_defs[resource_key] = resource.get_resource_definition()
elif isinstance(resource, ResourceDefinition):
resource_defs[resource_key] = resource
elif isinstance(resource, IOManager):
resource_defs[resource_key] = IOManagerDefinition.hardcoded_io_manager(resource)
else:
resource_defs[resource_key] = ResourceDefinition.hardcoded_resource(resource)

return resource_defs
if isinstance(resource, (ConfigurableResourceFactory, PartialResource)):
return resource.get_resource_definition()
elif isinstance(resource, ResourceDefinition):
return resource
elif isinstance(resource, IOManager):
return IOManagerDefinition.hardcoded_io_manager(resource)
else:
return ResourceDefinition.hardcoded_resource(resource)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from abc import ABC, abstractmethod
from typing import Any, Callable
from typing import Any, Callable, List

import pytest
from dagster import (
Expand All @@ -13,7 +13,11 @@
resource,
)
from dagster._check import CheckError
from dagster._config.pythonic_config import ConfigurableResourceFactory
from dagster._config.pythonic_config import (
ConfigurableIOManager,
ConfigurableResourceFactory,
)
from dagster._core.storage.io_manager import IOManager


def test_nested_resources():
Expand Down Expand Up @@ -456,3 +460,69 @@ def my_asset(my_resource: MyResourceWithDep):
)
assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert executed["yes"]


def test_nested_resource_raw_value_io_manager() -> None:
class MyMultiwriteIOManager(ConfigurableIOManager):
base_io_manager: ResourceDependency[IOManager]
mirror_io_manager: ResourceDependency[IOManager]

def handle_output(self, context, obj) -> None:
self.base_io_manager.handle_output(context, obj)
self.mirror_io_manager.handle_output(context, obj)

def load_input(self, context) -> Any:
return self.base_io_manager.load_input(context)

log = []

class ConfigIOManager(ConfigurableIOManager):
path_prefix: List[str]

def handle_output(self, context, obj) -> None:
log.append(
"ConfigIOManager handle_output "
+ "/".join(self.path_prefix + list(context.asset_key.path))
)

def load_input(self, context) -> Any:
log.append(
"ConfigIOManager load_input "
+ "/".join(self.path_prefix + list(context.asset_key.path))
)
return "foo"

class RawIOManager(IOManager):
def handle_output(self, context, obj) -> None:
log.append("RawIOManager handle_output " + "/".join(list(context.asset_key.path)))

def load_input(self, context) -> Any:
log.append("RawIOManager load_input " + "/".join(list(context.asset_key.path)))
return "foo"

@asset
def my_asset() -> str:
return "foo"

@asset
def my_downstream_asset(my_asset: str) -> str:
return my_asset + "bar"

defs = Definitions(
assets=[my_asset, my_downstream_asset],
resources={
"io_manager": MyMultiwriteIOManager(
base_io_manager=ConfigIOManager(path_prefix=["base"]),
mirror_io_manager=RawIOManager(),
),
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert log == [
"ConfigIOManager handle_output base/my_asset",
"RawIOManager handle_output my_asset",
"ConfigIOManager load_input base/my_asset",
"ConfigIOManager handle_output base/my_downstream_asset",
"RawIOManager handle_output my_downstream_asset",
]

0 comments on commit 68ad4e6

Please sign in to comment.