From 128db23085e5592619fa2886d634e64e87a35f23 Mon Sep 17 00:00:00 2001 From: benpankow Date: Wed, 14 Jun 2023 16:57:00 -0700 Subject: [PATCH 1/3] [pythonic resources] Improve nested resource evaluation logic --- .../_config/pythonic_config/__init__.py | 46 +- .../_config/pythonic_config/typing_utils.py | 12 +- .../test_general_pythonic_resources.py | 600 ++---------------- .../pythonic_resources/test_nesting.py | 458 +++++++++++++ 4 files changed, 538 insertions(+), 578 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py diff --git a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py index 4d24fde66ef4b..6de4443e8f8d0 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py @@ -9,10 +9,12 @@ Dict, Generator, Generic, + List, Mapping, NamedTuple, Optional, Set, + Tuple, Type, TypeVar, Union, @@ -20,7 +22,7 @@ ) from pydantic import ConstrainedFloat, ConstrainedInt, ConstrainedStr -from typing_extensions import TypeAlias, TypeGuard, get_args +from typing_extensions import TypeAlias, TypeGuard, get_args, get_origin from dagster import ( Enum as DagsterEnum, @@ -468,7 +470,7 @@ def _resolve_required_resource_keys( _resolve_required_resource_keys_for_resource(v, resource_mapping) ) - resources, _ = separate_resource_params(self.__dict__) + resources, _ = separate_resource_params(self.__class__, self.__dict__) for v in resources.values(): nested_resource_required_keys.update( _resolve_required_resource_keys_for_resource( @@ -607,9 +609,9 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]: def coerce_to_resource( coercible_to_resource: CoercibleToResource, ) -> ResourceDefinition: - if isinstance(coercible_to_resource, (ConfigurableResourceFactory, PartialResource)): - return coercible_to_resource.get_resource_definition() - return coercible_to_resource + from dagster._core.execution.build_resources import wrap_resources_for_execution + + return wrap_resources_for_execution({"test": coercible_to_resource})["test"] class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies): @@ -766,7 +768,7 @@ def asset_that_uses_database(database: ResourceParam[Database]): """ def __init__(self, **data: Any): - resource_pointers, data_without_resources = separate_resource_params(data) + resource_pointers, data_without_resources = separate_resource_params(self.__class__, data) schema = infer_schema_from_config_class( self.__class__, fields_to_omit=set(resource_pointers.keys()) @@ -918,7 +920,7 @@ def _resolve_and_update_nested_resources( # Also evaluate any resources that are not partial with contextlib.ExitStack() as stack: - resources_to_update, _ = separate_resource_params(self.__dict__) + resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__) resources_to_update = { attr_name: _call_resource_fn_with_default( stack, coerce_to_resource(resource), context @@ -1150,7 +1152,7 @@ def __init__( resource_cls: Type[ConfigurableResourceFactory[TResValue]], data: Dict[str, Any], ): - resource_pointers, _data_without_resources = separate_resource_params(data) + resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data) MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs @@ -1776,14 +1778,32 @@ class SeparatedResourceParams(NamedTuple): non_resources: Dict[str, Any] -def separate_resource_params(data: Dict[str, Any]) -> SeparatedResourceParams: +def _is_annotated_as_resource_type(annotation: Type) -> bool: + """Determines if a field in a structured config class is annotated as a resource type or not.""" + is_annotated_as_resource_dependency = get_origin(annotation) == ResourceDependency or getattr( + annotation, "__metadata__", None + ) == ("resource_dependency",) + + return is_annotated_as_resource_dependency or safe_is_subclass( + annotation, (ResourceDefinition, ConfigurableResourceFactory) + ) + + +def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams: """Separates out the key/value inputs of fields in a structured config Resource class which - are themselves Resources and those which are not. + are marked as resources (ie, using ResourceDependency) from those which are not. """ - return SeparatedResourceParams( - resources={k: v for k, v in data.items() if is_coercible_to_resource(v)}, - non_resources={k: v for k, v in data.items() if not is_coercible_to_resource(v)}, + keys_by_alias = {field.alias: field for field in cls.__fields__.values()} + data_with_annotation: List[Tuple[str, Any, Type]] = [ + (k, v, keys_by_alias[k].annotation) for k, v in data.items() if k in keys_by_alias + ] + out = SeparatedResourceParams( + resources={k: v for k, v, t in data_with_annotation if _is_annotated_as_resource_type(t)}, + non_resources={ + k: v for k, v, t in data_with_annotation if not _is_annotated_as_resource_type(t) + }, ) + return out def _call_resource_fn_with_default( diff --git a/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py b/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py index 30e5456d93bd4..6549a8e81cae4 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/typing_utils.py @@ -2,7 +2,7 @@ import pydantic from pydantic import Field -from typing_extensions import dataclass_transform, get_origin +from typing_extensions import Annotated, dataclass_transform, get_origin from dagster._core.errors import DagsterInvalidDagsterTypeInPythonicConfigDefinitionError @@ -110,7 +110,7 @@ def __new__(cls, name, bases, namespaces, **kwargs) -> Any: # arg = get_args(annotations[field])[0] # If so, we treat it as a Union of a PartialResource and a Resource # for Pydantic's sake. - annotations[field] = Any + annotations[field] = Annotated[Any, "resource_dependency"] elif safe_is_subclass( annotations[field], LateBoundTypesForResourceTypeChecking.get_resource_type() ): @@ -118,8 +118,12 @@ def __new__(cls, name, bases, namespaces, **kwargs) -> Any: # and a Resource for Pydantic's sake, so that a user can pass in a partially # configured resource. base = annotations[field] - annotations[field] = Union[ - LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base), base + annotations[field] = Annotated[ + Union[ + LateBoundTypesForResourceTypeChecking.get_partial_resource_type(base), + base, + ], + "resource_dependency", ] namespaces["__annotations__"] = annotations diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py index 4afa8bf1c767f..95063142ebcfa 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py @@ -1,8 +1,7 @@ import enum -import json import sys from abc import ABC, abstractmethod -from typing import Any, Callable, List, Mapping, Optional +from typing import List, Mapping, Optional import mock import pytest @@ -14,7 +13,6 @@ ConfigurableResource, DagsterInstance, Definitions, - Field, IAttachDifferentObjectToOpContext, InitResourceContext, IOManager, @@ -424,327 +422,6 @@ def hello_world_asset(writer: WriterResource): assert out_txt == ["greeting: hello, world!"] -def test_nested_resources(): - out_txt = [] - - class Writer(ConfigurableResource, ABC): - @abstractmethod - def output(self, text: str) -> None: - pass - - class WriterResource(Writer): - def output(self, text: str) -> None: - out_txt.append(text) - - class PrefixedWriterResource(Writer): - prefix: str - - def output(self, text: str) -> None: - out_txt.append(f"{self.prefix}{text}") - - class JsonWriterResource( - Writer, - ): - base_writer: Writer - indent: int - - def output(self, obj: Any) -> None: - self.base_writer.output(json.dumps(obj, indent=self.indent)) - - @asset - def hello_world_asset(writer: JsonWriterResource): - writer.output({"hello": "world"}) - - # Construct a resource that is needed by another resource - writer_resource = WriterResource() - json_writer_resource = JsonWriterResource(indent=2, base_writer=writer_resource) - - assert ( - Definitions( - assets=[hello_world_asset], - resources={ - "writer": json_writer_resource, - }, - ) - .get_implicit_global_asset_job_def() - .execute_in_process() - .success - ) - - assert out_txt == ['{\n "hello": "world"\n}'] - - # Do it again, with a different nested resource - out_txt.clear() - prefixed_writer_resource = PrefixedWriterResource(prefix="greeting: ") - prefixed_json_writer_resource = JsonWriterResource( - indent=2, base_writer=prefixed_writer_resource - ) - - assert ( - Definitions( - assets=[hello_world_asset], - resources={ - "writer": prefixed_json_writer_resource, - }, - ) - .get_implicit_global_asset_job_def() - .execute_in_process() - .success - ) - - assert out_txt == ['greeting: {\n "hello": "world"\n}'] - - -def test_nested_resources_multiuse(): - class AWSCredentialsResource(ConfigurableResource): - username: str - password: str - - class S3Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - bucket_name: str - - class EC2Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - - completed = {} - - @asset - def my_asset(s3: S3Resource, ec2: EC2Resource): - assert s3.aws_credentials.username == "foo" - assert s3.aws_credentials.password == "bar" - assert s3.bucket_name == "my_bucket" - - assert ec2.aws_credentials.username == "foo" - assert ec2.aws_credentials.password == "bar" - - completed["yes"] = True - - aws_credentials = AWSCredentialsResource(username="foo", password="bar") - defs = Definitions( - assets=[my_asset], - resources={ - "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), - "ec2": EC2Resource(aws_credentials=aws_credentials), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert completed["yes"] - - -def test_nested_resources_runtime_config(): - class AWSCredentialsResource(ConfigurableResource): - username: str - password: str - - class S3Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - bucket_name: str - - class EC2Resource(ConfigurableResource): - aws_credentials: AWSCredentialsResource - - completed = {} - - @asset - def my_asset(s3: S3Resource, ec2: EC2Resource): - assert s3.aws_credentials.username == "foo" - assert s3.aws_credentials.password == "bar" - assert s3.bucket_name == "my_bucket" - - assert ec2.aws_credentials.username == "foo" - assert ec2.aws_credentials.password == "bar" - - completed["yes"] = True - - aws_credentials = AWSCredentialsResource.configure_at_launch() - defs = Definitions( - assets=[my_asset], - resources={ - "aws_credentials": aws_credentials, - "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), - "ec2": EC2Resource(aws_credentials=aws_credentials), - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "aws_credentials": { - "config": { - "username": "foo", - "password": "bar", - } - } - } - } - ) - .success - ) - assert completed["yes"] - - -def test_nested_resources_runtime_config_complex(): - class CredentialsResource(ConfigurableResource): - username: str - password: str - - class DBConfigResource(ConfigurableResource): - creds: CredentialsResource - host: str - database: str - - class DBResource(ConfigurableResource): - config: DBConfigResource - - completed = {} - - @asset - def my_asset(db: DBResource): - assert db.config.creds.username == "foo" - assert db.config.creds.password == "bar" - assert db.config.host == "localhost" - assert db.config.database == "my_db" - completed["yes"] = True - - credentials = CredentialsResource.configure_at_launch() - db_config = DBConfigResource.configure_at_launch(creds=credentials) - db = DBResource(config=db_config) - - defs = Definitions( - assets=[my_asset], - resources={ - "credentials": credentials, - "db_config": db_config, - "db": db, - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "credentials": { - "config": { - "username": "foo", - "password": "bar", - } - }, - "db_config": { - "config": { - "host": "localhost", - "database": "my_db", - } - }, - } - } - ) - .success - ) - assert completed["yes"] - - credentials = CredentialsResource.configure_at_launch() - db_config = DBConfigResource(creds=credentials, host="localhost", database="my_db") - db = DBResource(config=db_config) - - defs = Definitions( - assets=[my_asset], - resources={ - "credentials": credentials, - "db": db, - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "credentials": { - "config": { - "username": "foo", - "password": "bar", - } - }, - } - } - ) - .success - ) - assert completed["yes"] - - -def test_enum_nested_resource_no_run_config() -> None: - class MyEnum(enum.Enum): - A = "a_value" - B = "b_value" - - class ResourceWithEnum(ConfigurableResource): - my_enum: MyEnum - - class OuterResourceWithResourceWithEnum(ConfigurableResource): - resource_with_enum: ResourceWithEnum - - @asset - def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): - return outer_resource.resource_with_enum.my_enum.value - - defs = Definitions( - assets=[asset_with_outer_resource], - resources={ - "outer_resource": OuterResourceWithResourceWithEnum( - resource_with_enum=ResourceWithEnum(my_enum=MyEnum.A) - ) - }, - ) - - a_job = defs.get_implicit_global_asset_job_def() - - result = a_job.execute_in_process() - assert result.success - assert result.output_for_node("asset_with_outer_resource") == "a_value" - - -def test_enum_nested_resource_run_config_override() -> None: - class MyEnum(enum.Enum): - A = "a_value" - B = "b_value" - - class ResourceWithEnum(ConfigurableResource): - my_enum: MyEnum - - class OuterResourceWithResourceWithEnum(ConfigurableResource): - resource_with_enum: ResourceWithEnum - - @asset - def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): - return outer_resource.resource_with_enum.my_enum.value - - resource_with_enum = ResourceWithEnum.configure_at_launch() - defs = Definitions( - assets=[asset_with_outer_resource], - resources={ - "resource_with_enum": resource_with_enum, - "outer_resource": OuterResourceWithResourceWithEnum( - resource_with_enum=resource_with_enum - ), - }, - ) - - a_job = defs.get_implicit_global_asset_job_def() - - # Case: I'm re-specifying the nested enum at runtime - expect the runtime config to override the resource config - result = a_job.execute_in_process( - run_config={"resources": {"resource_with_enum": {"config": {"my_enum": "B"}}}} - ) - assert result.success - assert result.output_for_node("asset_with_outer_resource") == "b_value" - - def test_basic_enum_override_with_resource_instance() -> None: class BasicEnum(enum.Enum): A = "a_value" @@ -874,243 +551,6 @@ def my_asset(my_resource: MyResource): assert completed["yes"] -def test_nested_function_resource(): - out_txt = [] - - @resource - def writer_resource(context): - def output(text: str) -> None: - out_txt.append(text) - - return output - - class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): - writer: ResourceDependency[Callable[[str], None]] - postfix: str - - def create_resource(self, context) -> Callable[[str], None]: - def output(text: str): - self.writer(f"{text}{self.postfix}") - - return output - - @asset - def my_asset(writer: ResourceParam[Callable[[str], None]]): - writer("foo") - writer("bar") - - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert out_txt == ["foo!", "bar!"] - - -def test_nested_function_resource_configured(): - out_txt = [] - - @resource(config_schema={"prefix": Field(str, default_value="")}) - def writer_resource(context): - prefix = context.resource_config["prefix"] - - def output(text: str) -> None: - out_txt.append(f"{prefix}{text}") - - return output - - class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): - writer: ResourceDependency[Callable[[str], None]] - postfix: str - - def create_resource(self, context) -> Callable[[str], None]: - def output(text: str): - self.writer(f"{text}{self.postfix}") - - return output - - @asset - def my_asset(writer: ResourceParam[Callable[[str], None]]): - writer("foo") - writer("bar") - - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert out_txt == ["foo!", "bar!"] - - out_txt.clear() - - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource( - writer=writer_resource.configured({"prefix": "msg: "}), postfix="!" - ), - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert out_txt == ["msg: foo!", "msg: bar!"] - - -def test_nested_function_resource_runtime_config(): - out_txt = [] - - @resource(config_schema={"prefix": str}) - def writer_resource(context): - prefix = context.resource_config["prefix"] - - def output(text: str) -> None: - out_txt.append(f"{prefix}{text}") - - return output - - class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): - writer: ResourceDependency[Callable[[str], None]] - postfix: str - - def create_resource(self, context) -> Callable[[str], None]: - def output(text: str): - self.writer(f"{text}{self.postfix}") - - return output - - @asset - def my_asset(writer: ResourceParam[Callable[[str], None]]): - writer("foo") - writer("bar") - - with pytest.raises( - CheckError, - match="Any partially configured, nested resources must be provided to Definitions", - ): - # errors b/c writer_resource is not configured - # and not provided as a top-level resource to Definitions - defs = Definitions( - assets=[my_asset], - resources={ - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - defs = Definitions( - assets=[my_asset], - resources={ - "base_writer": writer_resource, - "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - { - "resources": { - "base_writer": { - "config": { - "prefix": "msg: ", - }, - }, - }, - } - ) - .success - ) - assert out_txt == ["msg: foo!", "msg: bar!"] - - -def test_nested_config_class() -> None: - # Validate that we can nest Config classes in a pythonic resource - - class User(Config): - name: str - age: int - - class UsersResource(ConfigurableResource): - users: List[User] - - executed = {} - - @asset - def an_asset(users_resource: UsersResource): - assert len(users_resource.users) == 2 - assert users_resource.users[0].name == "Bob" - assert users_resource.users[0].age == 25 - assert users_resource.users[1].name == "Alice" - assert users_resource.users[1].age == 30 - - executed["yes"] = True - - defs = Definitions( - assets=[an_asset], - resources={ - "users_resource": UsersResource( - users=[ - User(name="Bob", age=25), - User(name="Alice", age=30), - ] - ) - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert executed["yes"] - - -def test_using_enum_simple() -> None: - executed = {} - - class SimpleEnum(enum.Enum): - FOO = "foo" - BAR = "bar" - - class MyResource(ConfigurableResource): - an_enum: SimpleEnum - - @asset - def an_asset(my_resource: MyResource): - assert my_resource.an_enum == SimpleEnum.FOO - executed["yes"] = True - - defs = Definitions( - assets=[an_asset], - resources={ - "my_resource": MyResource( - an_enum=SimpleEnum.FOO, - ) - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert executed["yes"] - executed.clear() - - defs = Definitions( - assets=[an_asset], - resources={ - "my_resource": MyResource.configure_at_launch(), - }, - ) - - assert ( - defs.get_implicit_global_asset_job_def() - .execute_in_process( - {"resources": {"my_resource": {"config": {"an_enum": SimpleEnum.FOO.name}}}} - ) - .success - ) - assert executed["yes"] - - def test_using_enum_complex() -> None: executed = {} @@ -1555,3 +995,41 @@ def _is_dagster_maintained(cls) -> bool: return True assert MyResource(my_value="foo")._is_dagster_maintained() # noqa: SLF001 + + +def test_nested_config_class() -> None: + # Validate that we can nest Config classes in a pythonic resource + + class User(Config): + name: str + age: int + + class UsersResource(ConfigurableResource): + users: List[User] + + executed = {} + + @asset + def an_asset(users_resource: UsersResource): + assert len(users_resource.users) == 2 + assert users_resource.users[0].name == "Bob" + assert users_resource.users[0].age == 25 + assert users_resource.users[1].name == "Alice" + assert users_resource.users[1].age == 30 + + executed["yes"] = True + + defs = Definitions( + assets=[an_asset], + resources={ + "users_resource": UsersResource( + users=[ + User(name="Bob", age=25), + User(name="Alice", age=30), + ] + ) + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert executed["yes"] diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py new file mode 100644 index 0000000000000..a9adcd441a0f9 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py @@ -0,0 +1,458 @@ +import json +from abc import ABC, abstractmethod +from typing import Any, Callable + +import pytest +from dagster import ( + ConfigurableResource, + Definitions, + Field, + ResourceDependency, + ResourceParam, + asset, + resource, +) +from dagster._check import CheckError +from dagster._config.pythonic_config import ConfigurableResourceFactory + + +def test_nested_resources(): + out_txt = [] + + class Writer(ConfigurableResource, ABC): + @abstractmethod + def output(self, text: str) -> None: + pass + + class WriterResource(Writer): + def output(self, text: str) -> None: + out_txt.append(text) + + class PrefixedWriterResource(Writer): + prefix: str + + def output(self, text: str) -> None: + out_txt.append(f"{self.prefix}{text}") + + class JsonWriterResource( + Writer, + ): + base_writer: Writer + indent: int + + def output(self, obj: Any) -> None: + self.base_writer.output(json.dumps(obj, indent=self.indent)) + + @asset + def hello_world_asset(writer: JsonWriterResource): + writer.output({"hello": "world"}) + + # Construct a resource that is needed by another resource + writer_resource = WriterResource() + json_writer_resource = JsonWriterResource(indent=2, base_writer=writer_resource) + + assert ( + Definitions( + assets=[hello_world_asset], + resources={ + "writer": json_writer_resource, + }, + ) + .get_implicit_global_asset_job_def() + .execute_in_process() + .success + ) + + assert out_txt == ['{\n "hello": "world"\n}'] + + # Do it again, with a different nested resource + out_txt.clear() + prefixed_writer_resource = PrefixedWriterResource(prefix="greeting: ") + prefixed_json_writer_resource = JsonWriterResource( + indent=2, base_writer=prefixed_writer_resource + ) + + assert ( + Definitions( + assets=[hello_world_asset], + resources={ + "writer": prefixed_json_writer_resource, + }, + ) + .get_implicit_global_asset_job_def() + .execute_in_process() + .success + ) + + assert out_txt == ['greeting: {\n "hello": "world"\n}'] + + +def test_nested_resources_multiuse(): + class AWSCredentialsResource(ConfigurableResource): + username: str + password: str + + class S3Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + bucket_name: str + + class EC2Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + + completed = {} + + @asset + def my_asset(s3: S3Resource, ec2: EC2Resource): + assert s3.aws_credentials.username == "foo" + assert s3.aws_credentials.password == "bar" + assert s3.bucket_name == "my_bucket" + + assert ec2.aws_credentials.username == "foo" + assert ec2.aws_credentials.password == "bar" + + completed["yes"] = True + + aws_credentials = AWSCredentialsResource(username="foo", password="bar") + defs = Definitions( + assets=[my_asset], + resources={ + "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), + "ec2": EC2Resource(aws_credentials=aws_credentials), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert completed["yes"] + + +def test_nested_resources_runtime_config(): + class AWSCredentialsResource(ConfigurableResource): + username: str + password: str + + class S3Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + bucket_name: str + + class EC2Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + + completed = {} + + @asset + def my_asset(s3: S3Resource, ec2: EC2Resource): + assert s3.aws_credentials.username == "foo" + assert s3.aws_credentials.password == "bar" + assert s3.bucket_name == "my_bucket" + + assert ec2.aws_credentials.username == "foo" + assert ec2.aws_credentials.password == "bar" + + completed["yes"] = True + + aws_credentials = AWSCredentialsResource.configure_at_launch() + defs = Definitions( + assets=[my_asset], + resources={ + "aws_credentials": aws_credentials, + "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), + "ec2": EC2Resource(aws_credentials=aws_credentials), + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "aws_credentials": { + "config": { + "username": "foo", + "password": "bar", + } + } + } + } + ) + .success + ) + assert completed["yes"] + + +def test_nested_resources_runtime_config_complex(): + class CredentialsResource(ConfigurableResource): + username: str + password: str + + class DBConfigResource(ConfigurableResource): + creds: CredentialsResource + host: str + database: str + + class DBResource(ConfigurableResource): + config: DBConfigResource + + completed = {} + + @asset + def my_asset(db: DBResource): + assert db.config.creds.username == "foo" + assert db.config.creds.password == "bar" + assert db.config.host == "localhost" + assert db.config.database == "my_db" + completed["yes"] = True + + credentials = CredentialsResource.configure_at_launch() + db_config = DBConfigResource.configure_at_launch(creds=credentials) + db = DBResource(config=db_config) + + defs = Definitions( + assets=[my_asset], + resources={ + "credentials": credentials, + "db_config": db_config, + "db": db, + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "credentials": { + "config": { + "username": "foo", + "password": "bar", + } + }, + "db_config": { + "config": { + "host": "localhost", + "database": "my_db", + } + }, + } + } + ) + .success + ) + assert completed["yes"] + + credentials = CredentialsResource.configure_at_launch() + db_config = DBConfigResource(creds=credentials, host="localhost", database="my_db") + db = DBResource(config=db_config) + + defs = Definitions( + assets=[my_asset], + resources={ + "credentials": credentials, + "db": db, + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "credentials": { + "config": { + "username": "foo", + "password": "bar", + } + }, + } + } + ) + .success + ) + assert completed["yes"] + + +def test_nested_function_resource(): + out_txt = [] + + @resource + def writer_resource(context): + def output(text: str) -> None: + out_txt.append(text) + + return output + + class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): + writer: ResourceDependency[Callable[[str], None]] + postfix: str + + def create_resource(self, context) -> Callable[[str], None]: + def output(text: str): + self.writer(f"{text}{self.postfix}") + + return output + + @asset + def my_asset(writer: ResourceParam[Callable[[str], None]]): + writer("foo") + writer("bar") + + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert out_txt == ["foo!", "bar!"] + + +def test_nested_function_resource_configured(): + out_txt = [] + + @resource(config_schema={"prefix": Field(str, default_value="")}) + def writer_resource(context): + prefix = context.resource_config["prefix"] + + def output(text: str) -> None: + out_txt.append(f"{prefix}{text}") + + return output + + class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): + writer: ResourceDependency[Callable[[str], None]] + postfix: str + + def create_resource(self, context) -> Callable[[str], None]: + def output(text: str): + self.writer(f"{text}{self.postfix}") + + return output + + @asset + def my_asset(writer: ResourceParam[Callable[[str], None]]): + writer("foo") + writer("bar") + + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert out_txt == ["foo!", "bar!"] + + out_txt.clear() + + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource( + writer=writer_resource.configured({"prefix": "msg: "}), postfix="!" + ), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert out_txt == ["msg: foo!", "msg: bar!"] + + +def test_nested_function_resource_runtime_config(): + out_txt = [] + + @resource(config_schema={"prefix": str}) + def writer_resource(context): + prefix = context.resource_config["prefix"] + + def output(text: str) -> None: + out_txt.append(f"{prefix}{text}") + + return output + + class PostfixWriterResource(ConfigurableResourceFactory[Callable[[str], None]]): + writer: ResourceDependency[Callable[[str], None]] + postfix: str + + def create_resource(self, context) -> Callable[[str], None]: + def output(text: str): + self.writer(f"{text}{self.postfix}") + + return output + + @asset + def my_asset(writer: ResourceParam[Callable[[str], None]]): + writer("foo") + writer("bar") + + with pytest.raises( + CheckError, + match="Any partially configured, nested resources must be provided to Definitions", + ): + # errors b/c writer_resource is not configured + # and not provided as a top-level resource to Definitions + defs = Definitions( + assets=[my_asset], + resources={ + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + defs = Definitions( + assets=[my_asset], + resources={ + "base_writer": writer_resource, + "writer": PostfixWriterResource(writer=writer_resource, postfix="!"), + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "base_writer": { + "config": { + "prefix": "msg: ", + }, + }, + }, + } + ) + .success + ) + assert out_txt == ["msg: foo!", "msg: bar!"] + + +def test_nested_resource_raw_value() -> None: + class MyResourceWithDep(ConfigurableResource): + a_string: ResourceDependency[str] + + @resource + def string_resource(context) -> str: + return "foo" + + executed = {} + + @asset + def my_asset(my_resource: MyResourceWithDep): + assert my_resource.a_string == "foo" + executed["yes"] = True + + defs = Definitions( + assets=[my_asset], + resources={ + "my_resource": MyResourceWithDep(a_string=string_resource), + }, + ) + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert executed["yes"] + + executed.clear() + + defs = Definitions( + assets=[my_asset], + resources={"my_resource": MyResourceWithDep(a_string="foo")}, + ) + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert executed["yes"] From 9769ad1a5a372ef1459bc73d9499362778a3cf53 Mon Sep 17 00:00:00 2001 From: benpankow Date: Thu, 15 Jun 2023 10:29:44 -0700 Subject: [PATCH 2/3] add more test --- .../_config/pythonic_config/__init__.py | 56 +++++++------- .../repository_data_builder.py | 5 +- .../_core/execution/build_resources.py | 34 +++++---- .../pythonic_resources/test_nesting.py | 74 ++++++++++++++++++- 4 files changed, 122 insertions(+), 47 deletions(-) diff --git a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py index 6de4443e8f8d0..0a69bfab62daa 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py @@ -448,6 +448,8 @@ class AllowDelayedDependencies: def _resolve_required_resource_keys( self, resource_mapping: Mapping[int, str] ) -> AbstractSet[str]: + from dagster._core.execution.build_resources import wrap_resource_for_execution + # All dependent resources which are not fully configured # must be specified to the Definitions object so that the # resource can be configured at runtime by the user @@ -470,11 +472,13 @@ def _resolve_required_resource_keys( _resolve_required_resource_keys_for_resource(v, resource_mapping) ) - resources, _ = separate_resource_params(self.__class__, self.__dict__) + resources, _ = separate_resource_params( + cast(Type[BaseModel], self.__class__), self.__dict__ + ) for v in resources.values(): nested_resource_required_keys.update( _resolve_required_resource_keys_for_resource( - coerce_to_resource(v), resource_mapping + wrap_resource_for_execution(v), resource_mapping ) ) @@ -606,14 +610,6 @@ def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]: return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource)) -def coerce_to_resource( - coercible_to_resource: CoercibleToResource, -) -> ResourceDefinition: - from dagster._core.execution.build_resources import wrap_resources_for_execution - - return wrap_resources_for_execution({"test": coercible_to_resource})["test"] - - class ConfigurableResourceFactoryResourceDefinition(ResourceDefinition, AllowDelayedDependencies): def __init__( self, @@ -622,7 +618,7 @@ def __init__( config_schema: Any, description: Optional[str], resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]], - nested_resources: Mapping[str, CoercibleToResource], + nested_resources: Mapping[str, Any], dagster_maintained: bool = False, ): super().__init__( @@ -642,7 +638,7 @@ def configurable_resource_cls(self) -> Type: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources def _resolve_required_resource_keys( @@ -662,7 +658,7 @@ def __init__( config_schema: Any, description: Optional[str], resolve_resource_keys: Callable[[Mapping[int, str]], AbstractSet[str]], - nested_resources: Mapping[str, CoercibleToResource], + nested_resources: Mapping[str, Any], input_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None, output_config_schema: Optional[Union[CoercableToConfigSchema, Type[Config]]] = None, dagster_maintained: bool = False, @@ -696,7 +692,7 @@ def configurable_resource_cls(self) -> Type: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources def _resolve_required_resource_keys( @@ -706,11 +702,11 @@ def _resolve_required_resource_keys( class ConfigurableResourceFactoryState(NamedTuple): - nested_partial_resources: Mapping[str, CoercibleToResource] + nested_partial_resources: Mapping[str, Any] resolved_config_dict: Dict[str, Any] config_schema: DefinitionConfigSchema schema: DagsterField - nested_resources: Dict[str, CoercibleToResource] + nested_resources: Dict[str, Any] resource_context: Optional[InitResourceContext] @@ -863,7 +859,7 @@ def create_resource(self, context: InitResourceContext) -> TResValue: @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._nested_resources @classmethod @@ -899,6 +895,8 @@ def _resolve_and_update_nested_resources( Returns a new instance of the resource. """ + from dagster._core.execution.build_resources import wrap_resource_for_execution + partial_resources_to_update: Dict[str, Any] = {} if self._nested_partial_resources: context_with_mapping = cast( @@ -923,7 +921,7 @@ def _resolve_and_update_nested_resources( resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__) resources_to_update = { attr_name: _call_resource_fn_with_default( - stack, coerce_to_resource(resource), context + stack, wrap_resource_for_execution(resource), context ) for attr_name, resource in resources_to_update.items() if attr_name not in partial_resources_to_update @@ -1121,7 +1119,9 @@ def create_resource(self, context: InitResourceContext) -> TResValue: def _is_fully_configured(resource: CoercibleToResource) -> bool: - actual_resource = coerce_to_resource(resource) + from dagster._core.execution.build_resources import wrap_resource_for_execution + + actual_resource = wrap_resource_for_execution(resource) res = ( validate_config( actual_resource.config_schema.config_type, @@ -1136,11 +1136,11 @@ def _is_fully_configured(resource: CoercibleToResource) -> bool: class PartialResourceState(NamedTuple): - nested_partial_resources: Dict[str, CoercibleToResource] + nested_partial_resources: Dict[str, Any] config_schema: DagsterField resource_fn: Callable[[InitResourceContext], Any] description: Optional[str] - nested_resources: Dict[str, CoercibleToResource] + nested_resources: Dict[str, Any] class PartialResource(Generic[TResValue], AllowDelayedDependencies, MakeConfigCacheable): @@ -1180,13 +1180,13 @@ def resource_fn(context: InitResourceContext): @property def _nested_partial_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._state__internal__.nested_partial_resources @property def nested_resources( self, - ) -> Mapping[str, CoercibleToResource]: + ) -> Mapping[str, Any]: return self._state__internal__.nested_resources @cached_method @@ -1774,7 +1774,7 @@ def infer_schema_from_config_class( class SeparatedResourceParams(NamedTuple): - resources: Dict[str, CoercibleToResource] + resources: Dict[str, Any] non_resources: Dict[str, Any] @@ -1831,14 +1831,14 @@ def _call_resource_fn_with_default( ) context = context.replace_config(cast(dict, evr.value)["config"]) - is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance( - obj.resource_fn, contextlib.ContextDecorator - ) - if has_at_least_one_parameter(obj.resource_fn): # type: ignore[unreachable] + if has_at_least_one_parameter(obj.resource_fn): result = cast(ResourceFunctionWithContext, obj.resource_fn)(context) else: result = cast(ResourceFunctionWithoutContext, obj.resource_fn)() + is_fn_generator = inspect.isgenerator(obj.resource_fn) or isinstance( + obj.resource_fn, contextlib.ContextDecorator + ) if is_fn_generator: return stack.enter_context(cast(contextlib.AbstractContextManager, result)) else: diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 5fcf568bdd643..53cc8e6e164c1 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -18,7 +18,6 @@ ConfigurableIOManagerFactoryResourceDefinition, ConfigurableResourceFactoryResourceDefinition, ResourceWithKeyMapping, - coerce_to_resource, ) from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.assets_job import ( @@ -67,6 +66,8 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st resource's default config. This is used to extract environment variables from the top-level resources in a Definitions object. """ + from dagster._core.execution.build_resources import wrap_resource_for_execution + config_schema_default = cast( Mapping[str, Any], json.loads(resource_def.config_schema.default_value_as_json_str) @@ -86,7 +87,7 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st nested_resources = resource_def.inner_resource.nested_resources for nested_resource in nested_resources.values(): env_vars = env_vars.union( - _env_vars_from_resource_defaults(coerce_to_resource(nested_resource)) + _env_vars_from_resource_defaults(wrap_resource_for_execution(nested_resource)) ) return env_vars diff --git a/python_modules/dagster/dagster/_core/execution/build_resources.py b/python_modules/dagster/dagster/_core/execution/build_resources.py index 36d8bebebeb2b..5d6022ac77199 100644 --- a/python_modules/dagster/dagster/_core/execution/build_resources.py +++ b/python_modules/dagster/dagster/_core/execution/build_resources.py @@ -115,22 +115,26 @@ def the_resource(): def wrap_resources_for_execution( resources: Optional[Mapping[str, Any]] = None ) -> Dict[str, ResourceDefinition]: + return ( + { + resource_key: wrap_resource_for_execution(resource) + for resource_key, resource in resources.items() + } + if resources + else {} + ) + + +def wrap_resource_for_execution(resource: Any) -> ResourceDefinition: from dagster._config.pythonic_config import ConfigurableResourceFactory, PartialResource - resources = check.opt_mapping_param(resources, "resources", key_type=str) - resource_defs = {} # Wrap instantiated resource values in a resource definition. # If an instantiated IO manager is provided, wrap it in an IO manager definition. - for resource_key, resource in resources.items(): - # Wrap instantiated resource values in a resource definition. - # If an instantiated IO manager is provided, wrap it in an IO manager definition. - if isinstance(resource, (ConfigurableResourceFactory, PartialResource)): - resource_defs[resource_key] = resource.get_resource_definition() - elif isinstance(resource, ResourceDefinition): - resource_defs[resource_key] = resource - elif isinstance(resource, IOManager): - resource_defs[resource_key] = IOManagerDefinition.hardcoded_io_manager(resource) - else: - resource_defs[resource_key] = ResourceDefinition.hardcoded_resource(resource) - - return resource_defs + if isinstance(resource, (ConfigurableResourceFactory, PartialResource)): + return resource.get_resource_definition() + elif isinstance(resource, ResourceDefinition): + return resource + elif isinstance(resource, IOManager): + return IOManagerDefinition.hardcoded_io_manager(resource) + else: + return ResourceDefinition.hardcoded_resource(resource) diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py index a9adcd441a0f9..02085274cb55f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py @@ -1,6 +1,6 @@ import json from abc import ABC, abstractmethod -from typing import Any, Callable +from typing import Any, Callable, List import pytest from dagster import ( @@ -13,7 +13,11 @@ resource, ) from dagster._check import CheckError -from dagster._config.pythonic_config import ConfigurableResourceFactory +from dagster._config.pythonic_config import ( + ConfigurableIOManager, + ConfigurableResourceFactory, +) +from dagster._core.storage.io_manager import IOManager def test_nested_resources(): @@ -456,3 +460,69 @@ def my_asset(my_resource: MyResourceWithDep): ) assert defs.get_implicit_global_asset_job_def().execute_in_process().success assert executed["yes"] + + +def test_nested_resource_raw_value_io_manager() -> None: + class MyMultiwriteIOManager(ConfigurableIOManager): + base_io_manager: ResourceDependency[IOManager] + mirror_io_manager: ResourceDependency[IOManager] + + def handle_output(self, context, obj) -> None: + self.base_io_manager.handle_output(context, obj) + self.mirror_io_manager.handle_output(context, obj) + + def load_input(self, context) -> Any: + return self.base_io_manager.load_input(context) + + log = [] + + class ConfigIOManager(ConfigurableIOManager): + path_prefix: List[str] + + def handle_output(self, context, obj) -> None: + log.append( + "ConfigIOManager handle_output " + + "/".join(self.path_prefix + list(context.asset_key.path)) + ) + + def load_input(self, context) -> Any: + log.append( + "ConfigIOManager load_input " + + "/".join(self.path_prefix + list(context.asset_key.path)) + ) + return "foo" + + class RawIOManager(IOManager): + def handle_output(self, context, obj) -> None: + log.append("RawIOManager handle_output " + "/".join(list(context.asset_key.path))) + + def load_input(self, context) -> Any: + log.append("RawIOManager load_input " + "/".join(list(context.asset_key.path))) + return "foo" + + @asset + def my_asset() -> str: + return "foo" + + @asset + def my_downstream_asset(my_asset: str) -> str: + return my_asset + "bar" + + defs = Definitions( + assets=[my_asset, my_downstream_asset], + resources={ + "io_manager": MyMultiwriteIOManager( + base_io_manager=ConfigIOManager(path_prefix=["base"]), + mirror_io_manager=RawIOManager(), + ), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert log == [ + "ConfigIOManager handle_output base/my_asset", + "RawIOManager handle_output my_asset", + "ConfigIOManager load_input base/my_asset", + "ConfigIOManager handle_output base/my_downstream_asset", + "RawIOManager handle_output my_downstream_asset", + ] From 929c2376bfe7ed3dbe94b1028f074f0888129a71 Mon Sep 17 00:00:00 2001 From: benpankow Date: Thu, 22 Jun 2023 13:29:27 -0700 Subject: [PATCH 3/3] update tests --- .../test_general_pythonic_resources.py | 132 ++++++++++++------ .../pythonic_resources/test_nesting.py | 82 ++++++++++- 2 files changed, 166 insertions(+), 48 deletions(-) diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py index 95063142ebcfa..b7fd5ad61099a 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_general_pythonic_resources.py @@ -85,7 +85,8 @@ def hello_world_asset(writer: WriterResource): writer.output("hello, world!") defs = Definitions( - assets=[hello_world_asset], resources={"writer": WriterResource(prefix="greeting: ")} + assets=[hello_world_asset], + resources={"writer": WriterResource(prefix="greeting: ")}, ) assert defs.get_implicit_global_asset_job_def().execute_in_process().success @@ -234,7 +235,8 @@ def create_resource(self, context): @op def check_resource_created( - resource_with_cleanup_1: ResourceParam[bool], resource_with_cleanup_2: ResourceParam[bool] + resource_with_cleanup_1: ResourceParam[bool], + resource_with_cleanup_2: ResourceParam[bool], ): assert resource_with_cleanup_1 is True assert resource_with_cleanup_2 is True @@ -434,7 +436,10 @@ class MyResource(ConfigurableResource): def setup_for_execution(self, context: InitResourceContext) -> None: setup_executed["yes"] = True - assert context.resource_config["my_enum"] in [BasicEnum.A.value, BasicEnum.B.value] + assert context.resource_config["my_enum"] in [ + BasicEnum.A.value, + BasicEnum.B.value, + ] @asset def asset_with_resource(context, my_resource: MyResource): @@ -551,6 +556,89 @@ def my_asset(my_resource: MyResource): assert completed["yes"] +def test_nested_config_class() -> None: + # Validate that we can nest Config classes in a pythonic resource + + class User(Config): + name: str + age: int + + class UsersResource(ConfigurableResource): + users: List[User] + + executed = {} + + @asset + def an_asset(users_resource: UsersResource): + assert len(users_resource.users) == 2 + assert users_resource.users[0].name == "Bob" + assert users_resource.users[0].age == 25 + assert users_resource.users[1].name == "Alice" + assert users_resource.users[1].age == 30 + + executed["yes"] = True + + defs = Definitions( + assets=[an_asset], + resources={ + "users_resource": UsersResource( + users=[ + User(name="Bob", age=25), + User(name="Alice", age=30), + ] + ) + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert executed["yes"] + + +def test_using_enum_simple() -> None: + executed = {} + + class SimpleEnum(enum.Enum): + FOO = "foo" + BAR = "bar" + + class MyResource(ConfigurableResource): + an_enum: SimpleEnum + + @asset + def an_asset(my_resource: MyResource): + assert my_resource.an_enum == SimpleEnum.FOO + executed["yes"] = True + + defs = Definitions( + assets=[an_asset], + resources={ + "my_resource": MyResource( + an_enum=SimpleEnum.FOO, + ) + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert executed["yes"] + executed.clear() + + defs = Definitions( + assets=[an_asset], + resources={ + "my_resource": MyResource.configure_at_launch(), + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + {"resources": {"my_resource": {"config": {"an_enum": SimpleEnum.FOO.name}}}} + ) + .success + ) + assert executed["yes"] + + def test_using_enum_complex() -> None: executed = {} @@ -995,41 +1083,3 @@ def _is_dagster_maintained(cls) -> bool: return True assert MyResource(my_value="foo")._is_dagster_maintained() # noqa: SLF001 - - -def test_nested_config_class() -> None: - # Validate that we can nest Config classes in a pythonic resource - - class User(Config): - name: str - age: int - - class UsersResource(ConfigurableResource): - users: List[User] - - executed = {} - - @asset - def an_asset(users_resource: UsersResource): - assert len(users_resource.users) == 2 - assert users_resource.users[0].name == "Bob" - assert users_resource.users[0].age == 25 - assert users_resource.users[1].name == "Alice" - assert users_resource.users[1].age == 30 - - executed["yes"] = True - - defs = Definitions( - assets=[an_asset], - resources={ - "users_resource": UsersResource( - users=[ - User(name="Bob", age=25), - User(name="Alice", age=30), - ] - ) - }, - ) - - assert defs.get_implicit_global_asset_job_def().execute_in_process().success - assert executed["yes"] diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py index 02085274cb55f..440b990f3db11 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py @@ -1,3 +1,4 @@ +import enum import json from abc import ABC, abstractmethod from typing import Any, Callable, List @@ -20,7 +21,7 @@ from dagster._core.storage.io_manager import IOManager -def test_nested_resources(): +def test_nested_resources() -> None: out_txt = [] class Writer(ConfigurableResource, ABC): @@ -91,7 +92,7 @@ def hello_world_asset(writer: JsonWriterResource): assert out_txt == ['greeting: {\n "hello": "world"\n}'] -def test_nested_resources_multiuse(): +def test_nested_resources_multiuse() -> None: class AWSCredentialsResource(ConfigurableResource): username: str password: str @@ -129,7 +130,7 @@ def my_asset(s3: S3Resource, ec2: EC2Resource): assert completed["yes"] -def test_nested_resources_runtime_config(): +def test_nested_resources_runtime_config() -> None: class AWSCredentialsResource(ConfigurableResource): username: str password: str @@ -183,7 +184,7 @@ def my_asset(s3: S3Resource, ec2: EC2Resource): assert completed["yes"] -def test_nested_resources_runtime_config_complex(): +def test_nested_resources_runtime_config_complex() -> None: class CredentialsResource(ConfigurableResource): username: str password: str @@ -274,7 +275,7 @@ def my_asset(db: DBResource): assert completed["yes"] -def test_nested_function_resource(): +def test_nested_function_resource() -> None: out_txt = [] @resource @@ -310,7 +311,7 @@ def my_asset(writer: ResourceParam[Callable[[str], None]]): assert out_txt == ["foo!", "bar!"] -def test_nested_function_resource_configured(): +def test_nested_function_resource_configured() -> None: out_txt = [] @resource(config_schema={"prefix": Field(str, default_value="")}) @@ -362,7 +363,7 @@ def my_asset(writer: ResourceParam[Callable[[str], None]]): assert out_txt == ["msg: foo!", "msg: bar!"] -def test_nested_function_resource_runtime_config(): +def test_nested_function_resource_runtime_config() -> None: out_txt = [] @resource(config_schema={"prefix": str}) @@ -526,3 +527,70 @@ def my_downstream_asset(my_asset: str) -> str: "ConfigIOManager handle_output base/my_downstream_asset", "RawIOManager handle_output my_downstream_asset", ] + + +def test_enum_nested_resource_no_run_config() -> None: + class MyEnum(enum.Enum): + A = "a_value" + B = "b_value" + + class ResourceWithEnum(ConfigurableResource): + my_enum: MyEnum + + class OuterResourceWithResourceWithEnum(ConfigurableResource): + resource_with_enum: ResourceWithEnum + + @asset + def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): + return outer_resource.resource_with_enum.my_enum.value + + defs = Definitions( + assets=[asset_with_outer_resource], + resources={ + "outer_resource": OuterResourceWithResourceWithEnum( + resource_with_enum=ResourceWithEnum(my_enum=MyEnum.A) + ) + }, + ) + + a_job = defs.get_implicit_global_asset_job_def() + + result = a_job.execute_in_process() + assert result.success + assert result.output_for_node("asset_with_outer_resource") == "a_value" + + +def test_enum_nested_resource_run_config_override() -> None: + class MyEnum(enum.Enum): + A = "a_value" + B = "b_value" + + class ResourceWithEnum(ConfigurableResource): + my_enum: MyEnum + + class OuterResourceWithResourceWithEnum(ConfigurableResource): + resource_with_enum: ResourceWithEnum + + @asset + def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum): + return outer_resource.resource_with_enum.my_enum.value + + resource_with_enum = ResourceWithEnum.configure_at_launch() + defs = Definitions( + assets=[asset_with_outer_resource], + resources={ + "resource_with_enum": resource_with_enum, + "outer_resource": OuterResourceWithResourceWithEnum( + resource_with_enum=resource_with_enum + ), + }, + ) + + a_job = defs.get_implicit_global_asset_job_def() + + # Case: I'm re-specifying the nested enum at runtime - expect the runtime config to override the resource config + result = a_job.execute_in_process( + run_config={"resources": {"resource_with_enum": {"config": {"my_enum": "B"}}}} + ) + assert result.success + assert result.output_for_node("asset_with_outer_resource") == "b_value"