diff --git a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py index 6de4443e8f8d0..0a69bfab62daa 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py @@ -448,6 +448,8 @@ class AllowDelayedDependencies: def _resolve_required_resource_keys( self, resource_mapping: Mapping[int, str] ) -> AbstractSet[str]: + from dagster._core.execution.build_resources import wrap_resource_for_execution + # All dependent resources which are not fully configured # must be specified to the Definitions object so that the # resource can be configured at runtime by the user @@ -470,11 +472,13 @@ def _resolve_required_resource_keys( _resolve_required_resource_keys_for_resource(v, resource_mapping) ) - resources, _ = separate_resource_params(self.__class__, self.__dict__) + resources, _ = separate_resource_params( + cast(Type[BaseModel], self.__class__), self.__dict__ + ) for v in resources.values(): nested_resource_required_keys.update( _resolve_required_resource_keys_for_resource( - coerce_to_resource(v), resource_mapping + wrap_resource_for_execution(v), resource_mapping ) ) @@ -606,14 +610,6 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]: return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource)) -def coerce_to_resource( - coercible_to_resource: CoercibleToResource, -) -> ResourceDefinition: - from dagster._core.execution.build_resources import wrap_resources_for_execution - - return wrap_resources_for_execution({"test": coercible_to_resource})["test"] - - class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies): def __init__( self, @@ -622,7 +618,7 @@ def __init__( config_schema: Any, description: Optional[str], resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]], - nested_resources: Mapping[str, CoercibleToResource], + nested_resources: Mapping[str, Any], dagster_maintained: bool = False, ): super().__init__( @@ -642,7 +638,7 @@ def configurable_resource_cls(self) -> Type: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources def _resolve_required_resource_keys( @@ -662,7 +658,7 @@ def __init__( config_schema: Any, description: Optional[str], resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]], - nested_resources: Mapping[str, CoercibleToResource], + nested_resources: Mapping[str, Any], input_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None, output_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None, dagster_maintained: bool = False, @@ -696,7 +692,7 @@ def configurable_resource_cls(self) -> Type: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources def _resolve_required_resource_keys( @@ -706,11 +702,11 @@ def _resolve_required_resource_keys( class ConfigurableResourceFactoryState(NamedTuple): - nested_partial_resources: Mapping[str, CoercibleToResource] + nested_partial_resources: Mapping[str, Any] resolved_config_dict: Dict[str, Any] config_schema: DefinitionConfigSchema schema: DagsterField - nested_resources: Dict[str, CoercibleToResource] + nested_resources: Dict[str, Any] resource_context: Optional[InitResourceContext] @@ -863,7 +859,7 @@ def create_resource(self, context: InitResourceContext) -> TResValue: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources @classmethod @@ -899,6 +895,8 @@ def _resolve_and_update_nested_resources( Returns a new instance of the resource. """ + from dagster._core.execution.build_resources import wrap_resource_for_execution + partial_resources_to_update: Dict[str, Any] = {} if self._nested_partial_resources: context_with_mapping = cast( @@ -923,7 +921,7 @@ def _resolve_and_update_nested_resources( resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__) resources_to_update = { attr_name: _call_resource_fn_with_default( - stack, coerce_to_resource(resource), context + stack, wrap_resource_for_execution(resource), context ) for attr_name, resource in resources_to_update.items() if attr_name not in partial_resources_to_update @@ -1121,7 +1119,9 @@ def create_resource(self, context: InitResourceContext) -> TResValue: def _is_fully_configured(resource: CoercibleToResource) -> bool: - actual_resource = coerce_to_resource(resource) + from dagster._core.execution.build_resources import wrap_resource_for_execution + + actual_resource = wrap_resource_for_execution(resource) res = ( validate_config( actual_resource.config_schema.config_type, @@ -1136,11 +1136,11 @@ def _is_fully_configured(resource: CoercibleToResource) -> bool: class PartialResourceState(NamedTuple): - nested_partial_resources: Dict[str, CoercibleToResource] + nested_partial_resources: Dict[str, Any] config_schema: DagsterField resource_fn: Callable[[InitResourceContext], Any] description: Optional[str] - nested_resources: Dict[str, CoercibleToResource] + nested_resources: Dict[str, Any] class PartialResource(Generic[TResValue], AllowDelayedDependencies, MakeConfigCacheable): @@ -1180,13 +1180,13 @@ def resource_fn(context: InitResourceContext): @property def _nested_partial_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._state__internal__.nested_partial_resources @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._state__internal__.nested_resources @cached_method @@ -1774,7 +1774,7 @@ def infer_schema_from_config_class( class SeparatedResourceParams(NamedTuple): - resources: Dict[str, CoercibleToResource] + resources: Dict[str, Any] non_resources: Dict[str, Any] @@ -1831,14 +1831,14 @@ def _call_resource_fn_with_default( ) context = context.replace_config(cast(dict, evr.value)["config"]) - is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance( - obj.resource_fn, contextlib.ContextDecorator - ) - if has_at_least_one_parameter(obj.resource_fn): # type: ignore[unreachable] + if has_at_least_one_parameter(obj.resource_fn): result = cast(ResourceFunctionWithContext, obj.resource_fn)(context) else: result = cast(ResourceFunctionWithoutContext, obj.resource_fn)() + is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance( + obj.resource_fn, contextlib.ContextDecorator + ) if is_fn_generator: return stack.enter_context(cast(contextlib.AbstractContextManager, result)) else: diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 5fcf568bdd643..53cc8e6e164c1 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -18,7 +18,6 @@ ConfigurableIOManagerFactoryResourceDefinition, ConfigurableResourceFactoryResourceDefinition, ResourceWithKeyMapping, - coerce_to_resource, ) from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.assets_job import ( @@ -67,6 +66,8 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st resource's default config. This is used to extract environment variables from the top-level resources in a Definitions object. """ + from dagster._core.execution.build_resources import wrap_resource_for_execution + config_schema_default = cast( Mapping[str, Any], json.loads(resource_def.config_schema.default_value_as_json_str) @@ -86,7 +87,7 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st nested_resources = resource_def.inner_resource.nested_resources for nested_resource in nested_resources.values(): env_vars = env_vars.union( - _env_vars_from_resource_defaults(coerce_to_resource(nested_resource)) + _env_vars_from_resource_defaults(wrap_resource_for_execution(nested_resource)) ) return env_vars diff --git a/python_modules/dagster/dagster/_core/execution/build_resources.py b/python_modules/dagster/dagster/_core/execution/build_resources.py index 36d8bebebeb2b..5d6022ac77199 100644 --- a/python_modules/dagster/dagster/_core/execution/build_resources.py +++ b/python_modules/dagster/dagster/_core/execution/build_resources.py @@ -115,22 +115,26 @@ def the_resource(): def wrap_resources_for_execution( resources: Optional[Mapping[str, Any]] = None ) -> Dict[str, ResourceDefinition]: + return ( + { + resource_key: wrap_resource_for_execution(resource) + for resource_key, resource in resources.items() + } + if resources + else {} + ) + + +def wrap_resource_for_execution(resource: Any) -> ResourceDefinition: from dagster._config.pythonic_config import ConfigurableResourceFactory, PartialResource - resources = check.opt_mapping_param(resources, "resources", key_type=str) - resource_defs = {} # Wrap instantiated resource values in a resource definition. # If an instantiated IO manager is provided, wrap it in an IO manager definition. - for resource_key, resource in resources.items(): - # Wrap instantiated resource values in a resource definition. - # If an instantiated IO manager is provided, wrap it in an IO manager definition. - if isinstance(resource, (ConfigurableResourceFactory, PartialResource)): - resource_defs[resource_key] = resource.get_resource_definition() - elif isinstance(resource, ResourceDefinition): - resource_defs[resource_key] = resource - elif isinstance(resource, IOManager): - resource_defs[resource_key] = IOManagerDefinition.hardcoded_io_manager(resource) - else: - resource_defs[resource_key] = ResourceDefinition.hardcoded_resource(resource) - - return resource_defs + if isinstance(resource, (ConfigurableResourceFactory, PartialResource)): + return resource.get_resource_definition() + elif isinstance(resource, ResourceDefinition): + return resource + elif isinstance(resource, IOManager): + return IOManagerDefinition.hardcoded_io_manager(resource) + else: + return ResourceDefinition.hardcoded_resource(resource) diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py index a9adcd441a0f9..02085274cb55f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py @@ -1,6 +1,6 @@ import json from abc import ABC, abstractmethod -from typing import Any, Callable +from typing import Any, Callable, List import pytest from dagster import ( @@ -13,7 +13,11 @@ resource, ) from dagster._check import CheckError -from dagster._config.pythonic_config import ConfigurableResourceFactory +from dagster._config.pythonic_config import ( + ConfigurableIOManager, + ConfigurableResourceFactory, +) +from dagster._core.storage.io_manager import IOManager def test_nested_resources(): @@ -456,3 +460,69 @@ def my_asset(my_resource: MyResourceWithDep): ) assert defs.get_implicit_global_asset_job_def().execute_in_process().success assert executed["yes"] + + +def test_nested_resource_raw_value_io_manager() -> None: + class MyMultiwriteIOManager(ConfigurableIOManager): + base_io_manager: ResourceDependency[IOManager] + mirror_io_manager: ResourceDependency[IOManager] + + def handle_output(self, context, obj) -> None: + self.base_io_manager.handle_output(context, obj) + self.mirror_io_manager.handle_output(context, obj) + + def load_input(self, context) -> Any: + return self.base_io_manager.load_input(context) + + log = [] + + class ConfigIOManager(ConfigurableIOManager): + path_prefix: List[str] + + def handle_output(self, context, obj) -> None: + log.append( + "ConfigIOManager handle_output " + + "/".join(self.path_prefix + list(context.asset_key.path)) + ) + + def load_input(self, context) -> Any: + log.append( + "ConfigIOManager load_input " + + "/".join(self.path_prefix + list(context.asset_key.path)) + ) + return "foo" + + class RawIOManager(IOManager): + def handle_output(self, context, obj) -> None: + log.append("RawIOManager handle_output " + "/".join(list(context.asset_key.path))) + + def load_input(self, context) -> Any: + log.append("RawIOManager load_input " + "/".join(list(context.asset_key.path))) + return "foo" + + @asset + def my_asset() -> str: + return "foo" + + @asset + def my_downstream_asset(my_asset: str) -> str: + return my_asset + "bar" + + defs = Definitions( + assets=[my_asset, my_downstream_asset], + resources={ + "io_manager": MyMultiwriteIOManager( + base_io_manager=ConfigIOManager(path_prefix=["base"]), + mirror_io_manager=RawIOManager(), + ), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert log == [ + "ConfigIOManager handle_output base/my_asset", + "RawIOManager handle_output my_asset", + "ConfigIOManager load_input base/my_asset", + "ConfigIOManager handle_output base/my_downstream_asset", + "RawIOManager handle_output my_downstream_asset", + ]