diff --git a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py index 4d24fde66ef4b..0a69bfab62daa 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py @@ -9,10 +9,12 @@ Dict, Generator, Generic, + List, Mapping, NamedTuple, Optional, Set, + Tuple, Type, TypeVar, Union, @@ -20,7 +22,7 @@ ) from pydantic import ConstrainedFloat, ConstrainedInt, ConstrainedStr -from typing_extensions import TypeAlias, TypeGuard, get_args +from typing_extensions import TypeAlias, TypeGuard, get_args, get_origin from dagster import ( Enum as DagsterEnum, @@ -446,6 +448,8 @@ class AllowDelayedDependencies: def _resolve_required_resource_keys( self, resource_mapping: Mapping[int, str] ) -> AbstractSet[str]: + from dagster._core.execution.build_resources import wrap_resource_for_execution + # All dependent resources which are not fully configured # must be specified to the Definitions object so that the # resource can be configured at runtime by the user @@ -468,11 +472,13 @@ def _resolve_required_resource_keys( _resolve_required_resource_keys_for_resource(v, resource_mapping) ) - resources, _ = separate_resource_params(self.__dict__) + resources, _ = separate_resource_params( + cast(Type[BaseModel], self.__class__), self.__dict__ + ) for v in resources.values(): nested_resource_required_keys.update( _resolve_required_resource_keys_for_resource( - coerce_to_resource(v), resource_mapping + wrap_resource_for_execution(v), resource_mapping ) ) @@ -604,14 +610,6 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]: return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource)) -def coerce_to_resource( - coercible_to_resource: CoercibleToResource, -) -> ResourceDefinition: - if isinstance(coercible_to_resource, (ConfigurableResourceFactory, PartialResource)): - return coercible_to_resource.get_resource_definition() - return coercible_to_resource - - class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies): def __init__( self, @@ -620,7 +618,7 @@ def __init__( config_schema: Any, description: Optional[str], resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]], - nested_resources: Mapping[str, CoercibleToResource], + nested_resources: Mapping[str, Any], dagster_maintained: bool = False, ): super().__init__( @@ -640,7 +638,7 @@ def configurable_resource_cls(self) -> Type: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources def _resolve_required_resource_keys( @@ -660,7 +658,7 @@ def __init__( config_schema: Any, description: Optional[str], resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]], - nested_resources: Mapping[str, CoercibleToResource], + nested_resources: Mapping[str, Any], input_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None, output_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None, dagster_maintained: bool = False, @@ -694,7 +692,7 @@ def configurable_resource_cls(self) -> Type: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources def _resolve_required_resource_keys( @@ -704,11 +702,11 @@ def _resolve_required_resource_keys( class ConfigurableResourceFactoryState(NamedTuple): - nested_partial_resources: Mapping[str, CoercibleToResource] + nested_partial_resources: Mapping[str, Any] resolved_config_dict: Dict[str, Any] config_schema: DefinitionConfigSchema schema: DagsterField - nested_resources: Dict[str, CoercibleToResource] + nested_resources: Dict[str, Any] resource_context: Optional[InitResourceContext] @@ -766,7 +764,7 @@ def asset_that_uses_database(database: ResourceParam[Database]): """ def __init__(self, **data: Any): - resource_pointers, data_without_resources = separate_resource_params(data) + resource_pointers, data_without_resources = separate_resource_params(self.__class__, data) schema = infer_schema_from_config_class( self.__class__, fields_to_omit=set(resource_pointers.keys()) @@ -861,7 +859,7 @@ def create_resource(self, context: InitResourceContext) -> TResValue: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources @classmethod @@ -897,6 +895,8 @@ def _resolve_and_update_nested_resources( Returns a new instance of the resource. """ + from dagster._core.execution.build_resources import wrap_resource_for_execution + partial_resources_to_update: Dict[str, Any] = {} if self._nested_partial_resources: context_with_mapping = cast( @@ -918,10 +918,10 @@ def _resolve_and_update_nested_resources( # Also evaluate any resources that are not partial with contextlib.ExitStack() as stack: - resources_to_update, _ = separate_resource_params(self.__dict__) + resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__) resources_to_update = { attr_name: _call_resource_fn_with_default( - stack, coerce_to_resource(resource), context + stack, wrap_resource_for_execution(resource), context ) for attr_name, resource in resources_to_update.items() if attr_name not in partial_resources_to_update @@ -1119,7 +1119,9 @@ def create_resource(self, context: InitResourceContext) -> TResValue: def _is_fully_configured(resource: CoercibleToResource) -> bool: - actual_resource = coerce_to_resource(resource) + from dagster._core.execution.build_resources import wrap_resource_for_execution + + actual_resource = wrap_resource_for_execution(resource) res = ( validate_config( actual_resource.config_schema.config_type, @@ -1134,11 +1136,11 @@ def _is_fully_configured(resource: CoercibleToResource) -> bool: class PartialResourceState(NamedTuple): - nested_partial_resources: Dict[str, CoercibleToResource] + nested_partial_resources: Dict[str, Any] config_schema: DagsterField resource_fn: Callable[[InitResourceContext], Any] description: Optional[str] - nested_resources: Dict[str, CoercibleToResource] + nested_resources: Dict[str, Any] class PartialResource(Generic[TResValue], AllowDelayedDependencies, MakeConfigCacheable): @@ -1150,7 +1152,7 @@ def __init__( resource_cls: Type[ConfigurableResourceFactory[TResValue]], data: Dict[str, Any], ): - resource_pointers, _data_without_resources = separate_resource_params(data) + resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data) MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs @@ -1178,13 +1180,13 @@ def resource_fn(context: InitResourceContext): @property def _nested_partial_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._state__internal__.nested_partial_resources @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._state__internal__.nested_resources @cached_method @@ -1772,18 +1774,36 @@ def infer_schema_from_config_class( class SeparatedResourceParams(NamedTuple): - resources: Dict[str, CoercibleToResource] + resources: Dict[str, Any] non_resources: Dict[str, Any] -def separate_resource_params(data: Dict[str, Any]) -> SeparatedResourceParams: +def _is_annotated_as_resource_type(annotation: Type) -> bool: + """Determines if a field in a structured config class is annotated as a resource type or not.""" + is_annotated_as_resource_dependency = get_origin(annotation) == ResourceDependency or getattr( + annotation, "__metadata__", None + ) == ("resource_dependency",) + + return is_annotated_as_resource_dependency or safe_is_subclass( + annotation, (ResourceDefinition, ConfigurableResourceFactory) + ) + + +def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams: """Separates out the key/value inputs of fields in a structured config Resource class which - are themselves Resources and those which are not. + are marked as resources (ie, using ResourceDependency) from those which are not. """ - return SeparatedResourceParams( - resources={k: v for k, v in data.items() if is_coercible_to_resource(v)}, - non_resources={k: v for k, v in data.items() if not is_coercible_to_resource(v)}, + keys_by_alias = {field.alias: field for field in cls.__fields__.values()} + data_with_annotation: List[Tuple[str, Any, Type]] = [ + (k, v, keys_by_alias[k].annotation) for k, v in data.items() if k in keys_by_alias + ] + out = SeparatedResourceParams( + resources={k: v for k, v, t in data_with_annotation if _is_annotated_as_resource_type(t)}, + non_resources={ + k: v for k, v, t in data_with_annotation if not _is_annotated_as_resource_type(t) + }, ) + return out def _call_resource_fn_with_default( @@ -1811,14 +1831,14 @@ def _call_resource_fn_with_default( ) context = context.replace_config(cast(dict, evr.value)["config"]) - is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance( - obj.resource_fn, contextlib.ContextDecorator - ) - if has_at_least_one_parameter(obj.resource_fn): # type: ignore[unreachable] + if has_at_least_one_parameter(obj.resource_fn): result = cast(ResourceFunctionWithContext, obj.resource_fn)(context) else: result = cast(ResourceFunctionWithoutContext, obj.resource_fn)() + is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance( + obj.resource_fn, contextlib.ContextDecorator + ) if is_fn_generator: return stack.enter_context(cast(contextlib.AbstractContextManager, result)) else: diff --git a/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py b/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py index 30e5456d93bd4..6549a8e81cae4 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py @@ -2,7 +2,7 @@ import pydantic from pydantic import Field -from typing_extensions import dataclass_transform, get_origin +from typing_extensions import Annotated, dataclass_transform, get_origin from dagster._core.errors import DagsterInvalidDagsterTypeInPythonicConfigDefinitionError @@ -110,7 +110,7 @@ def __new__(cls, name, bases, namespaces, **kwargs) -> Any: # arg = get_args(annotations[field])[0] # If so, we treat it as a Union of a PartialResource and a Resource # for Pydantic's sake. - annotations[field] = Any + annotations[field] = Annotated[Any, "resource_dependency"] elif safe_is_subclass( annotations[field], LateBoundTypesForResourceTypeChecking.get_resource_type() ): @@ -118,8 +118,12 @@ def __new__(cls, name, bases, namespaces, **kwargs) -> Any: # and a Resource for Pydantic's sake, so that a user can pass in a partially # configured resource. base = annotations[field] - annotations[field] = Union[ - LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base), base + annotations[field] = Annotated[ + Union[ + LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base), + base, + ], + "resource_dependency", ] namespaces["__annotations__"] = annotations diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 639793082d0e9..7a1df3d2e6867 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -18,7 +18,6 @@ ConfigurableIOManagerFactoryResourceDefinition, ConfigurableResourceFactoryResourceDefinition, ResourceWithKeyMapping, - coerce_to_resource, ) from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.assets_job import ( @@ -67,6 +66,8 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st resource's default config. This is used to extract environment variables from the top-level resources in a Definitions object. """ + from dagster._core.execution.build_resources import wrap_resource_for_execution + config_schema_default = cast( Mapping[str, Any], json.loads(resource_def.config_schema.default_value_as_json_str) @@ -86,7 +87,7 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st nested_resources = resource_def.inner_resource.nested_resources for nested_resource in nested_resources.values(): env_vars = env_vars.union( - _env_vars_from_resource_defaults(coerce_to_resource(nested_resource)) + _env_vars_from_resource_defaults(wrap_resource_for_execution(nested_resource)) ) return env_vars diff --git a/python_modules/dagster/dagster/_core/execution/build_resources.py b/python_modules/dagster/dagster/_core/execution/build_resources.py index 36d8bebebeb2b..5d6022ac77199 100644 --- a/python_modules/dagster/dagster/_core/execution/build_resources.py +++ b/python_modules/dagster/dagster/_core/execution/build_resources.py @@ -115,22 +115,26 @@ def the_resource(): def wrap_resources_for_execution( resources: Optional[Mapping[str, Any]] = None ) -> Dict[str, ResourceDefinition]: + return ( + { + resource_key: wrap_resource_for_execution(resource) + for resource_key, resource in resources.items() + } + if resources + else {} + ) + + +def wrap_resource_for_execution(resource: Any) -> ResourceDefinition: from dagster._config.pythonic_config import ConfigurableResourceFactory, PartialResource - resources = check.opt_mapping_param(resources, "resources", key_type=str) - resource_defs = {} # Wrap instantiated resource values in a resource definition. # If an instantiated IO manager is provided, wrap it in an IO manager definition. - for resource_key, resource in resources.items(): - # Wrap instantiated resource values in a resource definition. - # If an instantiated IO manager is provided, wrap it in an IO manager definition. - if isinstance(resource, (ConfigurableResourceFactory, PartialResource)): - resource_defs[resource_key] = resource.get_resource_definition() - elif isinstance(resource, ResourceDefinition): - resource_defs[resource_key] = resource - elif isinstance(resource, IOManager): - resource_defs[resource_key] = IOManagerDefinition.hardcoded_io_manager(resource) - else: - resource_defs[resource_key] = ResourceDefinition.hardcoded_resource(resource) - - return resource_defs + if isinstance(resource, (ConfigurableResourceFactory, PartialResource)): + return resource.get_resource_definition() + elif isinstance(resource, ResourceDefinition): + return resource + elif isinstance(resource, IOManager): + return IOManagerDefinition.hardcoded_io_manager(resource) + else: + return ResourceDefinition.hardcoded_resource(resource) diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py index 4afa8bf1c767f..b7fd5ad61099a 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py @@ -1,8 +1,7 @@ import enum -import json import sys from abc import ABC, abstractmethod -from typing import Any, Callable, List, Mapping, Optional +from typing import List, Mapping, Optional import mock import pytest @@ -14,7 +13,6 @@ ConfigurableResource, DagsterInstance, Definitions, - Field, IAttachDifferentObjectToOpContext, InitResourceContext, IOManager, @@ -87,7 +85,8 @@ def hello_world_asset(writer: WriterResource): writer.output("hello, world!") defs = Definitions( - assets=[hello_world_asset], resources={"writer": WriterResource(prefix="greeting: ")} + assets=[hello_world_asset], + resources={"writer": WriterResource(prefix="greeting: ")}, ) assert defs.get_implicit_global_asset_job_def().execute_in_process().success @@ -236,7 +235,8 @@ def create_resource(self, context): @op def check_resource_created( - resource_with_cleanup_1: ResourceParam[bool], resource_with_cleanup_2: ResourceParam[bool] + resource_with_cleanup_1: ResourceParam[bool], + resource_with_cleanup_2: ResourceParam[bool], ): assert resource_with_cleanup_1 is True assert resource_with_cleanup_2 is True @@ -424,327 +424,6 @@ def hello_world_asset(writer: WriterResource): assert out_txt == ["greeting: hello, world!"] -def test_nested_resources(): - out_txt = [] - - class Writer(ConfigurableResource, ABC): - @abstractmethod - def output(self, text: str) -> None: - pass - - class WriterResource(Writer): - def output(self, text: str) -> None: - out_txt.append(text) - - class PrefixedWriterResource(Writer): - prefix: str - - def output(self, text: str) -> None: - out_txt.append(f"{self.prefix}{text}") - - class JsonWriterResource( - Writer, - ): - base_writer: Writer - indent: int - - def output(self, obj: Any) -> None: - self.base_writer.output(json.dumps(obj, indent=self.indent)) - - @asset - def hello_world_asset(writer: JsonWriterResource): - writer.output({"hello": "world"}) - - # Construct a resource that is needed by another resource - writer_resource = WriterResource() - json_writer_resource = JsonWriterResource(indent=2, base_writer=writer_resource) - - assert ( - Definitions( - assets=[hello_world_asset], - resources={ - "writer": json_writer_resource, - }, - ) - .get_implicit_global_asset_job_def() - .execute_in_process() - .success - ) - - assert out_txt == ['{\n "hello": "world"\n}'] - - # Do it again, with a different nested resource - out_txt.clear() - prefixed_writer_resource = PrefixedWriterResource(prefix="greeting: ") - prefixed_json_writer_resource = JsonWriterResource( - indent=2, base_writer=prefixed_writer_resource - ) - - assert ( - Definitions( - assets=[hello_world_asset], - resources={ - "writer": prefixed_json_writer_resource, - }, - ) - .get_implicit_global_asset_job_def() - .execute_in_process() - .success - ) - - assert out_txt == ['greeting: {\n "hello": "world"\n}'] - - -def test_nested_resources_multiuse(): - class AWSCredentialsResource(ConfigurableResource): - username: str - password: str - - class S3Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - bucket_name: str - - class EC2Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - - completed = {} - - @asset - def my_asset(s3: S3Resource, ec2: EC2Resource): - assert s3.aws_credentials.username == "foo" - assert s3.aws_credentials.password == "bar" - assert s3.bucket_name == "my_bucket" - - assert ec2.aws_credentials.username == "foo" - assert ec2.aws_credentials.password == "bar" - - completed["yes"] = True - - aws_credentials = AWSCredentialsResource(username="foo", password="bar") - defs = Definitions( - assets=[my_asset], - resources={ - "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), - "ec2": EC2Resource(aws_credentials=aws_credentials), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert completed["yes"] - - -def test_nested_resources_runtime_config(): - class AWSCredentialsResource(ConfigurableResource): - username: str - password: str - - class S3Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - bucket_name: str - - class EC2Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - - completed = {} - - @asset - def my_asset(s3: S3Resource, ec2: EC2Resource): - assert s3.aws_credentials.username == "foo" - assert s3.aws_credentials.password == "bar" - assert s3.bucket_name == "my_bucket" - - assert ec2.aws_credentials.username == "foo" - assert ec2.aws_credentials.password == "bar" - - completed["yes"] = True - - aws_credentials = AWSCredentialsResource.configure_at_launch() - defs = Definitions( - assets=[my_asset], - resources={ - "aws_credentials": aws_credentials, - "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), - "ec2": EC2Resource(aws_credentials=aws_credentials), - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "aws_credentials": { - "config": { - "username": "foo", - "password": "bar", - } - } - } - } - ) - .success - ) - assert completed["yes"] - - -def test_nested_resources_runtime_config_complex(): - class CredentialsResource(ConfigurableResource): - username: str - password: str - - class DBConfigResource(ConfigurableResource): - creds: CredentialsResource - host: str - database: str - - class DBResource(ConfigurableResource): - config: DBConfigResource - - completed = {} - - @asset - def my_asset(db: DBResource): - assert db.config.creds.username == "foo" - assert db.config.creds.password == "bar" - assert db.config.host == "localhost" - assert db.config.database == "my_db" - completed["yes"] = True - - credentials = CredentialsResource.configure_at_launch() - db_config = DBConfigResource.configure_at_launch(creds=credentials) - db = DBResource(config=db_config) - - defs = Definitions( - assets=[my_asset], - resources={ - "credentials": credentials, - "db_config": db_config, - "db": db, - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "credentials": { - "config": { - "username": "foo", - "password": "bar", - } - }, - "db_config": { - "config": { - "host": "localhost", - "database": "my_db", - } - }, - } - } - ) - .success - ) - assert completed["yes"] - - credentials = CredentialsResource.configure_at_launch() - db_config = DBConfigResource(creds=credentials, host="localhost", database="my_db") - db = DBResource(config=db_config) - - defs = Definitions( - assets=[my_asset], - resources={ - "credentials": credentials, - "db": db, - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "credentials": { - "config": { - "username": "foo", - "password": "bar", - } - }, - } - } - ) - .success - ) - assert completed["yes"] - - -def test_enum_nested_resource_no_run_config() -> None: - class MyEnum(enum.Enum): - A = "a_value" - B = "b_value" - - class ResourceWithEnum(ConfigurableResource): - my_enum: MyEnum - - class OuterResourceWithResourceWithEnum(ConfigurableResource): - resource_with_enum: ResourceWithEnum - - @asset - def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): - return outer_resource.resource_with_enum.my_enum.value - - defs = Definitions( - assets=[asset_with_outer_resource], - resources={ - "outer_resource": OuterResourceWithResourceWithEnum( - resource_with_enum=ResourceWithEnum(my_enum=MyEnum.A) - ) - }, - ) - - a_job = defs.get_implicit_global_asset_job_def() - - result = a_job.execute_in_process() - assert result.success - assert result.output_for_node("asset_with_outer_resource") == "a_value" - - -def test_enum_nested_resource_run_config_override() -> None: - class MyEnum(enum.Enum): - A = "a_value" - B = "b_value" - - class ResourceWithEnum(ConfigurableResource): - my_enum: MyEnum - - class OuterResourceWithResourceWithEnum(ConfigurableResource): - resource_with_enum: ResourceWithEnum - - @asset - def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): - return outer_resource.resource_with_enum.my_enum.value - - resource_with_enum = ResourceWithEnum.configure_at_launch() - defs = Definitions( - assets=[asset_with_outer_resource], - resources={ - "resource_with_enum": resource_with_enum, - "outer_resource": OuterResourceWithResourceWithEnum( - resource_with_enum=resource_with_enum - ), - }, - ) - - a_job = defs.get_implicit_global_asset_job_def() - - # Case: I'm re-specifying the nested enum at runtime - expect the runtime config to override the resource config - result = a_job.execute_in_process( - run_config={"resources": {"resource_with_enum": {"config": {"my_enum": "B"}}}} - ) - assert result.success - assert result.output_for_node("asset_with_outer_resource") == "b_value" - - def test_basic_enum_override_with_resource_instance() -> None: class BasicEnum(enum.Enum): A = "a_value" @@ -757,7 +436,10 @@ class MyResource(ConfigurableResource): def setup_for_execution(self, context: InitResourceContext) -> None: setup_executed["yes"] = True - assert context.resource_config["my_enum"] in [BasicEnum.A.value, BasicEnum.B.value] + assert context.resource_config["my_enum"] in [ + BasicEnum.A.value, + BasicEnum.B.value, + ] @asset def asset_with_resource(context, my_resource: MyResource): @@ -874,160 +556,6 @@ def my_asset(my_resource: MyResource): assert completed["yes"] -def test_nested_function_resource(): - out_txt = [] - - @resource - def writer_resource(context): - def output(text: str) -> None: - out_txt.append(text) - - return output - - class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): - writer: ResourceDependency[Callable[[str], None]] - postfix: str - - def create_resource(self, context) -> Callable[[str], None]: - def output(text: str): - self.writer(f"{text}{self.postfix}") - - return output - - @asset - def my_asset(writer: ResourceParam[Callable[[str], None]]): - writer("foo") - writer("bar") - - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert out_txt == ["foo!", "bar!"] - - -def test_nested_function_resource_configured(): - out_txt = [] - - @resource(config_schema={"prefix": Field(str, default_value="")}) - def writer_resource(context): - prefix = context.resource_config["prefix"] - - def output(text: str) -> None: - out_txt.append(f"{prefix}{text}") - - return output - - class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): - writer: ResourceDependency[Callable[[str], None]] - postfix: str - - def create_resource(self, context) -> Callable[[str], None]: - def output(text: str): - self.writer(f"{text}{self.postfix}") - - return output - - @asset - def my_asset(writer: ResourceParam[Callable[[str], None]]): - writer("foo") - writer("bar") - - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert out_txt == ["foo!", "bar!"] - - out_txt.clear() - - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource( - writer=writer_resource.configured({"prefix": "msg: "}), postfix="!" - ), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert out_txt == ["msg: foo!", "msg: bar!"] - - -def test_nested_function_resource_runtime_config(): - out_txt = [] - - @resource(config_schema={"prefix": str}) - def writer_resource(context): - prefix = context.resource_config["prefix"] - - def output(text: str) -> None: - out_txt.append(f"{prefix}{text}") - - return output - - class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): - writer: ResourceDependency[Callable[[str], None]] - postfix: str - - def create_resource(self, context) -> Callable[[str], None]: - def output(text: str): - self.writer(f"{text}{self.postfix}") - - return output - - @asset - def my_asset(writer: ResourceParam[Callable[[str], None]]): - writer("foo") - writer("bar") - - with pytest.raises( - CheckError, - match="Any partially configured, nested resources must be provided to Definitions", - ): - # errors b/c writer_resource is not configured - # and not provided as a top-level resource to Definitions - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - defs = Definitions( - assets=[my_asset], - resources={ - "base_writer": writer_resource, - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "base_writer": { - "config": { - "prefix": "msg: ", - }, - }, - }, - } - ) - .success - ) - assert out_txt == ["msg: foo!", "msg: bar!"] - - def test_nested_config_class() -> None: # Validate that we can nest Config classes in a pythonic resource diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py new file mode 100644 index 0000000000000..440b990f3db11 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py @@ -0,0 +1,596 @@ +import enum +import json +from abc import ABC, abstractmethod +from typing import Any, Callable, List + +import pytest +from dagster import ( + ConfigurableResource, + Definitions, + Field, + ResourceDependency, + ResourceParam, + asset, + resource, +) +from dagster._check import CheckError +from dagster._config.pythonic_config import ( + ConfigurableIOManager, + ConfigurableResourceFactory, +) +from dagster._core.storage.io_manager import IOManager + + +def test_nested_resources() -> None: + out_txt = [] + + class Writer(ConfigurableResource, ABC): + @abstractmethod + def output(self, text: str) -> None: + pass + + class WriterResource(Writer): + def output(self, text: str) -> None: + out_txt.append(text) + + class PrefixedWriterResource(Writer): + prefix: str + + def output(self, text: str) -> None: + out_txt.append(f"{self.prefix}{text}") + + class JsonWriterResource( + Writer, + ): + base_writer: Writer + indent: int + + def output(self, obj: Any) -> None: + self.base_writer.output(json.dumps(obj, indent=self.indent)) + + @asset + def hello_world_asset(writer: JsonWriterResource): + writer.output({"hello": "world"}) + + # Construct a resource that is needed by another resource + writer_resource = WriterResource() + json_writer_resource = JsonWriterResource(indent=2, base_writer=writer_resource) + + assert ( + Definitions( + assets=[hello_world_asset], + resources={ + "writer": json_writer_resource, + }, + ) + .get_implicit_global_asset_job_def() + .execute_in_process() + .success + ) + + assert out_txt == ['{\n "hello": "world"\n}'] + + # Do it again, with a different nested resource + out_txt.clear() + prefixed_writer_resource = PrefixedWriterResource(prefix="greeting: ") + prefixed_json_writer_resource = JsonWriterResource( + indent=2, base_writer=prefixed_writer_resource + ) + + assert ( + Definitions( + assets=[hello_world_asset], + resources={ + "writer": prefixed_json_writer_resource, + }, + ) + .get_implicit_global_asset_job_def() + .execute_in_process() + .success + ) + + assert out_txt == ['greeting: {\n "hello": "world"\n}'] + + +def test_nested_resources_multiuse() -> None: + class AWSCredentialsResource(ConfigurableResource): + username: str + password: str + + class S3Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + bucket_name: str + + class EC2Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + + completed = {} + + @asset + def my_asset(s3: S3Resource, ec2: EC2Resource): + assert s3.aws_credentials.username == "foo" + assert s3.aws_credentials.password == "bar" + assert s3.bucket_name == "my_bucket" + + assert ec2.aws_credentials.username == "foo" + assert ec2.aws_credentials.password == "bar" + + completed["yes"] = True + + aws_credentials = AWSCredentialsResource(username="foo", password="bar") + defs = Definitions( + assets=[my_asset], + resources={ + "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), + "ec2": EC2Resource(aws_credentials=aws_credentials), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert completed["yes"] + + +def test_nested_resources_runtime_config() -> None: + class AWSCredentialsResource(ConfigurableResource): + username: str + password: str + + class S3Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + bucket_name: str + + class EC2Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + + completed = {} + + @asset + def my_asset(s3: S3Resource, ec2: EC2Resource): + assert s3.aws_credentials.username == "foo" + assert s3.aws_credentials.password == "bar" + assert s3.bucket_name == "my_bucket" + + assert ec2.aws_credentials.username == "foo" + assert ec2.aws_credentials.password == "bar" + + completed["yes"] = True + + aws_credentials = AWSCredentialsResource.configure_at_launch() + defs = Definitions( + assets=[my_asset], + resources={ + "aws_credentials": aws_credentials, + "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), + "ec2": EC2Resource(aws_credentials=aws_credentials), + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "aws_credentials": { + "config": { + "username": "foo", + "password": "bar", + } + } + } + } + ) + .success + ) + assert completed["yes"] + + +def test_nested_resources_runtime_config_complex() -> None: + class CredentialsResource(ConfigurableResource): + username: str + password: str + + class DBConfigResource(ConfigurableResource): + creds: CredentialsResource + host: str + database: str + + class DBResource(ConfigurableResource): + config: DBConfigResource + + completed = {} + + @asset + def my_asset(db: DBResource): + assert db.config.creds.username == "foo" + assert db.config.creds.password == "bar" + assert db.config.host == "localhost" + assert db.config.database == "my_db" + completed["yes"] = True + + credentials = CredentialsResource.configure_at_launch() + db_config = DBConfigResource.configure_at_launch(creds=credentials) + db = DBResource(config=db_config) + + defs = Definitions( + assets=[my_asset], + resources={ + "credentials": credentials, + "db_config": db_config, + "db": db, + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "credentials": { + "config": { + "username": "foo", + "password": "bar", + } + }, + "db_config": { + "config": { + "host": "localhost", + "database": "my_db", + } + }, + } + } + ) + .success + ) + assert completed["yes"] + + credentials = CredentialsResource.configure_at_launch() + db_config = DBConfigResource(creds=credentials, host="localhost", database="my_db") + db = DBResource(config=db_config) + + defs = Definitions( + assets=[my_asset], + resources={ + "credentials": credentials, + "db": db, + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "credentials": { + "config": { + "username": "foo", + "password": "bar", + } + }, + } + } + ) + .success + ) + assert completed["yes"] + + +def test_nested_function_resource() -> None: + out_txt = [] + + @resource + def writer_resource(context): + def output(text: str) -> None: + out_txt.append(text) + + return output + + class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): + writer: ResourceDependency[Callable[[str], None]] + postfix: str + + def create_resource(self, context) -> Callable[[str], None]: + def output(text: str): + self.writer(f"{text}{self.postfix}") + + return output + + @asset + def my_asset(writer: ResourceParam[Callable[[str], None]]): + writer("foo") + writer("bar") + + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert out_txt == ["foo!", "bar!"] + + +def test_nested_function_resource_configured() -> None: + out_txt = [] + + @resource(config_schema={"prefix": Field(str, default_value="")}) + def writer_resource(context): + prefix = context.resource_config["prefix"] + + def output(text: str) -> None: + out_txt.append(f"{prefix}{text}") + + return output + + class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): + writer: ResourceDependency[Callable[[str], None]] + postfix: str + + def create_resource(self, context) -> Callable[[str], None]: + def output(text: str): + self.writer(f"{text}{self.postfix}") + + return output + + @asset + def my_asset(writer: ResourceParam[Callable[[str], None]]): + writer("foo") + writer("bar") + + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert out_txt == ["foo!", "bar!"] + + out_txt.clear() + + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource( + writer=writer_resource.configured({"prefix": "msg: "}), postfix="!" + ), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert out_txt == ["msg: foo!", "msg: bar!"] + + +def test_nested_function_resource_runtime_config() -> None: + out_txt = [] + + @resource(config_schema={"prefix": str}) + def writer_resource(context): + prefix = context.resource_config["prefix"] + + def output(text: str) -> None: + out_txt.append(f"{prefix}{text}") + + return output + + class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): + writer: ResourceDependency[Callable[[str], None]] + postfix: str + + def create_resource(self, context) -> Callable[[str], None]: + def output(text: str): + self.writer(f"{text}{self.postfix}") + + return output + + @asset + def my_asset(writer: ResourceParam[Callable[[str], None]]): + writer("foo") + writer("bar") + + with pytest.raises( + CheckError, + match="Any partially configured, nested resources must be provided to Definitions", + ): + # errors b/c writer_resource is not configured + # and not provided as a top-level resource to Definitions + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + defs = Definitions( + assets=[my_asset], + resources={ + "base_writer": writer_resource, + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "base_writer": { + "config": { + "prefix": "msg: ", + }, + }, + }, + } + ) + .success + ) + assert out_txt == ["msg: foo!", "msg: bar!"] + + +def test_nested_resource_raw_value() -> None: + class MyResourceWithDep(ConfigurableResource): + a_string: ResourceDependency[str] + + @resource + def string_resource(context) -> str: + return "foo" + + executed = {} + + @asset + def my_asset(my_resource: MyResourceWithDep): + assert my_resource.a_string == "foo" + executed["yes"] = True + + defs = Definitions( + assets=[my_asset], + resources={ + "my_resource": MyResourceWithDep(a_string=string_resource), + }, + ) + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert executed["yes"] + + executed.clear() + + defs = Definitions( + assets=[my_asset], + resources={"my_resource": MyResourceWithDep(a_string="foo")}, + ) + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert executed["yes"] + + +def test_nested_resource_raw_value_io_manager() -> None: + class MyMultiwriteIOManager(ConfigurableIOManager): + base_io_manager: ResourceDependency[IOManager] + mirror_io_manager: ResourceDependency[IOManager] + + def handle_output(self, context, obj) -> None: + self.base_io_manager.handle_output(context, obj) + self.mirror_io_manager.handle_output(context, obj) + + def load_input(self, context) -> Any: + return self.base_io_manager.load_input(context) + + log = [] + + class ConfigIOManager(ConfigurableIOManager): + path_prefix: List[str] + + def handle_output(self, context, obj) -> None: + log.append( + "ConfigIOManager handle_output " + + "/".join(self.path_prefix + list(context.asset_key.path)) + ) + + def load_input(self, context) -> Any: + log.append( + "ConfigIOManager load_input " + + "/".join(self.path_prefix + list(context.asset_key.path)) + ) + return "foo" + + class RawIOManager(IOManager): + def handle_output(self, context, obj) -> None: + log.append("RawIOManager handle_output " + "/".join(list(context.asset_key.path))) + + def load_input(self, context) -> Any: + log.append("RawIOManager load_input " + "/".join(list(context.asset_key.path))) + return "foo" + + @asset + def my_asset() -> str: + return "foo" + + @asset + def my_downstream_asset(my_asset: str) -> str: + return my_asset + "bar" + + defs = Definitions( + assets=[my_asset, my_downstream_asset], + resources={ + "io_manager": MyMultiwriteIOManager( + base_io_manager=ConfigIOManager(path_prefix=["base"]), + mirror_io_manager=RawIOManager(), + ), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert log == [ + "ConfigIOManager handle_output base/my_asset", + "RawIOManager handle_output my_asset", + "ConfigIOManager load_input base/my_asset", + "ConfigIOManager handle_output base/my_downstream_asset", + "RawIOManager handle_output my_downstream_asset", + ] + + +def test_enum_nested_resource_no_run_config() -> None: + class MyEnum(enum.Enum): + A = "a_value" + B = "b_value" + + class ResourceWithEnum(ConfigurableResource): + my_enum: MyEnum + + class OuterResourceWithResourceWithEnum(ConfigurableResource): + resource_with_enum: ResourceWithEnum + + @asset + def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): + return outer_resource.resource_with_enum.my_enum.value + + defs = Definitions( + assets=[asset_with_outer_resource], + resources={ + "outer_resource": OuterResourceWithResourceWithEnum( + resource_with_enum=ResourceWithEnum(my_enum=MyEnum.A) + ) + }, + ) + + a_job = defs.get_implicit_global_asset_job_def() + + result = a_job.execute_in_process() + assert result.success + assert result.output_for_node("asset_with_outer_resource") == "a_value" + + +def test_enum_nested_resource_run_config_override() -> None: + class MyEnum(enum.Enum): + A = "a_value" + B = "b_value" + + class ResourceWithEnum(ConfigurableResource): + my_enum: MyEnum + + class OuterResourceWithResourceWithEnum(ConfigurableResource): + resource_with_enum: ResourceWithEnum + + @asset + def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): + return outer_resource.resource_with_enum.my_enum.value + + resource_with_enum = ResourceWithEnum.configure_at_launch() + defs = Definitions( + assets=[asset_with_outer_resource], + resources={ + "resource_with_enum": resource_with_enum, + "outer_resource": OuterResourceWithResourceWithEnum( + resource_with_enum=resource_with_enum + ), + }, + ) + + a_job = defs.get_implicit_global_asset_job_def() + + # Case: I'm re-specifying the nested enum at runtime - expect the runtime config to override the resource config + result = a_job.execute_in_process( + run_config={"resources": {"resource_with_enum": {"config": {"my_enum": "B"}}}} + ) + assert result.success + assert result.output_for_node("asset_with_outer_resource") == "b_value"