Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Jun 22, 2023
1 parent 9769ad1 commit 929c237
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def hello_world_asset(writer: WriterResource):
writer.output("hello, world!")

defs = Definitions(
assets=[hello_world_asset], resources={"writer": WriterResource(prefix="greeting: ")}
assets=[hello_world_asset],
resources={"writer": WriterResource(prefix="greeting: ")},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
Expand Down Expand Up @@ -234,7 +235,8 @@ def create_resource(self, context):

@op
def check_resource_created(
resource_with_cleanup_1: ResourceParam[bool], resource_with_cleanup_2: ResourceParam[bool]
resource_with_cleanup_1: ResourceParam[bool],
resource_with_cleanup_2: ResourceParam[bool],
):
assert resource_with_cleanup_1 is True
assert resource_with_cleanup_2 is True
Expand Down Expand Up @@ -434,7 +436,10 @@ class MyResource(ConfigurableResource):

def setup_for_execution(self, context: InitResourceContext) -> None:
setup_executed["yes"] = True
assert context.resource_config["my_enum"] in [BasicEnum.A.value, BasicEnum.B.value]
assert context.resource_config["my_enum"] in [
BasicEnum.A.value,
BasicEnum.B.value,
]

@asset
def asset_with_resource(context, my_resource: MyResource):
Expand Down Expand Up @@ -551,6 +556,89 @@ def my_asset(my_resource: MyResource):
assert completed["yes"]


def test_nested_config_class() -> None:
# Validate that we can nest Config classes in a pythonic resource

class User(Config):
name: str
age: int

class UsersResource(ConfigurableResource):
users: List[User]

executed = {}

@asset
def an_asset(users_resource: UsersResource):
assert len(users_resource.users) == 2
assert users_resource.users[0].name == "Bob"
assert users_resource.users[0].age == 25
assert users_resource.users[1].name == "Alice"
assert users_resource.users[1].age == 30

executed["yes"] = True

defs = Definitions(
assets=[an_asset],
resources={
"users_resource": UsersResource(
users=[
User(name="Bob", age=25),
User(name="Alice", age=30),
]
)
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert executed["yes"]


def test_using_enum_simple() -> None:
executed = {}

class SimpleEnum(enum.Enum):
FOO = "foo"
BAR = "bar"

class MyResource(ConfigurableResource):
an_enum: SimpleEnum

@asset
def an_asset(my_resource: MyResource):
assert my_resource.an_enum == SimpleEnum.FOO
executed["yes"] = True

defs = Definitions(
assets=[an_asset],
resources={
"my_resource": MyResource(
an_enum=SimpleEnum.FOO,
)
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert executed["yes"]
executed.clear()

defs = Definitions(
assets=[an_asset],
resources={
"my_resource": MyResource.configure_at_launch(),
},
)

assert (
defs.get_implicit_global_asset_job_def()
.execute_in_process(
{"resources": {"my_resource": {"config": {"an_enum": SimpleEnum.FOO.name}}}}
)
.success
)
assert executed["yes"]


def test_using_enum_complex() -> None:
executed = {}

Expand Down Expand Up @@ -995,41 +1083,3 @@ def _is_dagster_maintained(cls) -> bool:
return True

assert MyResource(my_value="foo")._is_dagster_maintained() # noqa: SLF001


def test_nested_config_class() -> None:
# Validate that we can nest Config classes in a pythonic resource

class User(Config):
name: str
age: int

class UsersResource(ConfigurableResource):
users: List[User]

executed = {}

@asset
def an_asset(users_resource: UsersResource):
assert len(users_resource.users) == 2
assert users_resource.users[0].name == "Bob"
assert users_resource.users[0].age == 25
assert users_resource.users[1].name == "Alice"
assert users_resource.users[1].age == 30

executed["yes"] = True

defs = Definitions(
assets=[an_asset],
resources={
"users_resource": UsersResource(
users=[
User(name="Bob", age=25),
User(name="Alice", age=30),
]
)
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert executed["yes"]
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
import json
from abc import ABC, abstractmethod
from typing import Any, Callable, List
Expand All @@ -20,7 +21,7 @@
from dagster._core.storage.io_manager import IOManager


def test_nested_resources():
def test_nested_resources() -> None:
out_txt = []

class Writer(ConfigurableResource, ABC):
Expand Down Expand Up @@ -91,7 +92,7 @@ def hello_world_asset(writer: JsonWriterResource):
assert out_txt == ['greeting: {\n "hello": "world"\n}']


def test_nested_resources_multiuse():
def test_nested_resources_multiuse() -> None:
class AWSCredentialsResource(ConfigurableResource):
username: str
password: str
Expand Down Expand Up @@ -129,7 +130,7 @@ def my_asset(s3: S3Resource, ec2: EC2Resource):
assert completed["yes"]


def test_nested_resources_runtime_config():
def test_nested_resources_runtime_config() -> None:
class AWSCredentialsResource(ConfigurableResource):
username: str
password: str
Expand Down Expand Up @@ -183,7 +184,7 @@ def my_asset(s3: S3Resource, ec2: EC2Resource):
assert completed["yes"]


def test_nested_resources_runtime_config_complex():
def test_nested_resources_runtime_config_complex() -> None:
class CredentialsResource(ConfigurableResource):
username: str
password: str
Expand Down Expand Up @@ -274,7 +275,7 @@ def my_asset(db: DBResource):
assert completed["yes"]


def test_nested_function_resource():
def test_nested_function_resource() -> None:
out_txt = []

@resource
Expand Down Expand Up @@ -310,7 +311,7 @@ def my_asset(writer: ResourceParam[Callable[[str], None]]):
assert out_txt == ["foo!", "bar!"]


def test_nested_function_resource_configured():
def test_nested_function_resource_configured() -> None:
out_txt = []

@resource(config_schema={"prefix": Field(str, default_value="")})
Expand Down Expand Up @@ -362,7 +363,7 @@ def my_asset(writer: ResourceParam[Callable[[str], None]]):
assert out_txt == ["msg: foo!", "msg: bar!"]


def test_nested_function_resource_runtime_config():
def test_nested_function_resource_runtime_config() -> None:
out_txt = []

@resource(config_schema={"prefix": str})
Expand Down Expand Up @@ -526,3 +527,70 @@ def my_downstream_asset(my_asset: str) -> str:
"ConfigIOManager handle_output base/my_downstream_asset",
"RawIOManager handle_output my_downstream_asset",
]


def test_enum_nested_resource_no_run_config() -> None:
class MyEnum(enum.Enum):
A = "a_value"
B = "b_value"

class ResourceWithEnum(ConfigurableResource):
my_enum: MyEnum

class OuterResourceWithResourceWithEnum(ConfigurableResource):
resource_with_enum: ResourceWithEnum

@asset
def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum):
return outer_resource.resource_with_enum.my_enum.value

defs = Definitions(
assets=[asset_with_outer_resource],
resources={
"outer_resource": OuterResourceWithResourceWithEnum(
resource_with_enum=ResourceWithEnum(my_enum=MyEnum.A)
)
},
)

a_job = defs.get_implicit_global_asset_job_def()

result = a_job.execute_in_process()
assert result.success
assert result.output_for_node("asset_with_outer_resource") == "a_value"


def test_enum_nested_resource_run_config_override() -> None:
class MyEnum(enum.Enum):
A = "a_value"
B = "b_value"

class ResourceWithEnum(ConfigurableResource):
my_enum: MyEnum

class OuterResourceWithResourceWithEnum(ConfigurableResource):
resource_with_enum: ResourceWithEnum

@asset
def asset_with_outer_resource(outer_resource: OuterResourceWithResourceWithEnum):
return outer_resource.resource_with_enum.my_enum.value

resource_with_enum = ResourceWithEnum.configure_at_launch()
defs = Definitions(
assets=[asset_with_outer_resource],
resources={
"resource_with_enum": resource_with_enum,
"outer_resource": OuterResourceWithResourceWithEnum(
resource_with_enum=resource_with_enum
),
},
)

a_job = defs.get_implicit_global_asset_job_def()

# Case: I'm re-specifying the nested enum at runtime - expect the runtime config to override the resource config
result = a_job.execute_in_process(
run_config={"resources": {"resource_with_enum": {"config": {"my_enum": "B"}}}}
)
assert result.success
assert result.output_for_node("asset_with_outer_resource") == "b_value"

0 comments on commit 929c237

Please sign in to comment.