From a06b828ff9e101708961b1bff849700acc0d915f Mon Sep 17 00:00:00 2001 From: benpankow Date: Tue, 11 Apr 2023 20:54:38 -0700 Subject: [PATCH] add comments, tests --- .../_config/pythonic_config/__init__.py | 108 ++++----- .../repository_data_builder.py | 5 +- .../test_pythonic_config_resources.py | 221 +++++++++++++++++- 3 files changed, 275 insertions(+), 59 deletions(-) diff --git a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py index a6a548f72ad8b..c43c840638199 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/__init__.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/__init__.py @@ -178,16 +178,23 @@ class Config: """ -def _recursively_apply_field_defaults(old_field: Field, additional_default_values: Any) -> Field: +def _apply_defaults_to_schema_field(old_field: Field, additional_default_values: Any) -> Field: + """Given a config Field and a set of default values (usually a dictionary or raw default value), + return a new Field with the default values applied to it (and recursively to any sub-fields). + """ + # If the field has subfields and the default value is a dictionary, iterate + # over the subfields and apply the defaults to them. if isinstance(old_field.config_type, _ConfigHasFields) and isinstance( additional_default_values, dict ): updated_sub_fields = { - k: _recursively_apply_field_defaults( + k: _apply_defaults_to_schema_field( sub_field, additional_default_values.get(k, FIELD_NO_DEFAULT_PROVIDED) ) for k, sub_field in old_field.config_type.fields.items() } + + # We also apply a new default value to the field if all of its subfields have defaults new_default = ( old_field.default_value if old_field.default_provided else FIELD_NO_DEFAULT_PROVIDED ) @@ -210,26 +217,11 @@ def _recursively_apply_field_defaults(old_field: Field, additional_default_value return copy_with_default(old_field, additional_default_values) -# This is from https://github.com/dagster-io/dagster/pull/11470 -def _apply_defaults_to_schema_field(field: Field, additional_default_values: Any) -> Field: - # This work by validating the top-level config and then - # just setting it at that top-level field. Config fields - # can actually take nested values so we only need to set it - # at a single level - - # evr = validate_config(field.config_type, additional_default_values) - - # if not evr.success: - # raise DagsterInvalidConfigError( - # "Incorrect values passed to .configured", - # evr.errors, - # additional_default_values, - # ) - - return _recursively_apply_field_defaults(field, additional_default_values) - - def copy_with_default(old_field: Field, new_config_value: Any) -> Field: + """Copies a Field, but replaces the default value with the provided value. + Also updates the is_required flag depending on whether the new config value is + actually specified. + """ return Field( config=old_field.config_type, default_value=old_field.default_value @@ -281,6 +273,11 @@ def _resolve_required_resource_keys_for_resource( return resource.required_resource_keys +class SeparatedResourceParams(NamedTuple): + resources: Dict[str, Any] + non_resources: Dict[str, Any] + + class AllowDelayedDependencies: _nested_partial_resources: Mapping[str, ResourceDefinition] = {} @@ -309,7 +306,7 @@ def _resolve_required_resource_keys( _resolve_required_resource_keys_for_resource(v, resource_mapping) ) - resources, _ = separate_resource_params(self.__class__, self.__dict__) + resources, _ = self.separate_resource_params(self.__dict__) for v in resources.values(): nested_resource_required_keys.update( _resolve_required_resource_keys_for_resource(v, resource_mapping) @@ -320,6 +317,10 @@ def _resolve_required_resource_keys( ) return out + @abstractmethod + def separate_resource_params(self, data: Dict[str, Any]) -> SeparatedResourceParams: + raise NotImplementedError() + class InitResourceContextWithKeyMapping(InitResourceContext): """Passes along a mapping from ResourceDefinition id to resource key alongside the @@ -484,7 +485,7 @@ def asset_that_uses_database(database: ResourceParam[Database]): """ def __init__(self, **data: Any): - resource_pointers, data_without_resources = separate_resource_params(self.__class__, data) + resource_pointers, data_without_resources = self.separate_resource_params(data) schema = infer_schema_from_config_class( self.__class__, fields_to_omit=set(resource_pointers.keys()) @@ -601,7 +602,7 @@ def _resolve_and_update_nested_resources( } # Also evaluate any resources that are not partial - resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__) + resources_to_update, _ = self.separate_resource_params(self.__dict__) resources_to_update = { attr_name: _call_resource_fn_with_default(resource_def, context) for attr_name, resource_def in resources_to_update.items() @@ -640,6 +641,26 @@ def my_resource(context: InitResourceContext) -> MyResource: """ return cls(**context.resource_config or {}).create_resource(context) + @classmethod + def separate_resource_params_on_cls(cls, data: Dict[str, Any]) -> SeparatedResourceParams: + """Separates out the key/value inputs of fields in a structured config Resource class which + are themselves Resources and those which are not. + """ + resources = {} + non_resources = {} + for k, v in data.items(): + field = getattr(cls, "__fields__", {}).get(k) + if field and _is_resource_dependency(field.annotation): + resources[k] = v + elif isinstance(v, ResourceDefinition): + resources[k] = v + else: + non_resources[k] = v + return SeparatedResourceParams(resources=resources, non_resources=non_resources) + + def separate_resource_params(self, data: Dict[str, Any]) -> SeparatedResourceParams: + return self.__class__.separate_resource_params_on_cls(data) + class ConfigurableResource(ConfigurableResourceFactory[TResValue]): """Base class for Dagster resources that utilize structured config. @@ -710,12 +731,16 @@ def __init__( data: Dict[str, Any], is_partial: bool = False, ): - resource_pointers, data_without_resources = separate_resource_params(self.__class__, data) + self._resource_cls = resource_cls + resource_pointers, data_without_resources = self.separate_resource_params(data) if not is_partial and data_without_resources: + resource_name = resource_cls.__name__ + parameter_names = list(data_without_resources.keys()) raise DagsterInvalidDefinitionError( - f"Resource {resource_cls.__name__} is not marked as partial, but was passed " - f"non-resource parameters: {list(data_without_resources.keys())}." + f"'{resource_name}.configure_at_launch' was called but non-resource parameters" + f" were passed: {parameter_names}. Did you mean to call '{resource_name}.partial'" + " instead?" ) MakeConfigCacheable.__init__(self, data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs @@ -743,9 +768,12 @@ def resource_fn(context: InitResourceContext): config_schema=curried_schema, description=resource_cls.__doc__, ) - + self._resource_cls = resource_cls self._nested_resources = {k: v for k, v in resource_pointers.items()} + def separate_resource_params(self, data: Dict[str, Any]) -> SeparatedResourceParams: + return self._resource_cls.separate_resource_params_on_cls(data) + @property def nested_resources(self) -> Mapping[str, ResourceDefinition]: return self._nested_resources @@ -1216,11 +1244,6 @@ def infer_schema_from_config_class( return Field(config=shape_cls(fields), description=description or docstring) -class SeparatedResourceParams(NamedTuple): - resources: Dict[str, Any] - non_resources: Dict[str, Any] - - def _is_resource_dependency(typ: Type) -> bool: return ( safe_is_subclass(typ, ResourceDependency) @@ -1230,25 +1253,6 @@ def _is_resource_dependency(typ: Type) -> bool: ) -def separate_resource_params( - cls: Type[AllowDelayedDependencies], data: Dict[str, Any] -) -> SeparatedResourceParams: - """Separates out the key/value inputs of fields in a structured config Resource class which - are themselves Resources and those which are not. - """ - resources = {} - non_resources = {} - for k, v in data.items(): - field = getattr(cls, "__fields__", {}).get(k) - if field and _is_resource_dependency(field.annotation): - resources[k] = v - elif isinstance(v, ResourceDefinition): - resources[k] = v - else: - non_resources[k] = v - return SeparatedResourceParams(resources=resources, non_resources=non_resources) - - def _call_resource_fn_with_default(obj: ResourceDefinition, context: InitResourceContext) -> Any: if isinstance(obj.config_schema, ConfiguredDefinitionConfigSchema): value = cast(Dict[str, Any], obj.config_schema.resolve_config({}).value) diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index bfab89fbe874b..79b09eb41b8ab 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -18,7 +18,6 @@ ConfigurableResource, PartialResource, ResourceWithKeyMapping, - separate_resource_params, ) from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.assets_job import ( @@ -80,8 +79,8 @@ def _env_vars_from_resource_defaults(resource_def: ResourceDefinition) -> Set[st if isinstance(resource_def, ResourceWithKeyMapping) and isinstance( resource_def.inner_resource, (ConfigurableResource, PartialResource) ): - nested_resources = separate_resource_params( - resource_def.inner_resource.__class__, resource_def.inner_resource.__dict__ + nested_resources = resource_def.inner_resource.separate_resource_params( + resource_def.inner_resource.__dict__ ).resources for nested_resource in nested_resources.values(): env_vars = env_vars.union(_env_vars_from_resource_defaults(nested_resource)) diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_pythonic_config_resources.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_pythonic_config_resources.py index ed079dd9151c1..8a41dd0f2ff00 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_pythonic_config_resources.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_pythonic_config_resources.py @@ -731,7 +731,7 @@ def my_asset(my_resource: MyResource): assert completed["yes"] -def test_nested_function_resource22(): +def test_nested_function_resource(): out_txt = [] @resource @@ -2448,7 +2448,7 @@ def string_resource_function_style(context: InitResourceContext) -> str: assert string_resource_function_style(build_init_resource_context()) == "foo" -def test_structured_resource_partial_config() -> None: +def test_structured_resource_partial_config_empty() -> None: out_txt = [] class WriterResource(ConfigurableResource): @@ -2475,7 +2475,20 @@ def hello_world_asset(writer: WriterResource): ) assert out_txt == [">hello, world!<"] - out_txt.clear() + +def test_structured_resource_partial_config_basic() -> None: + out_txt = [] + + class WriterResource(ConfigurableResource): + prefix: str + postfix: str + + def output(self, text: str) -> None: + out_txt.append(f"{self.prefix}{text}{self.postfix}") + + @asset + def hello_world_asset(writer: WriterResource): + writer.output("hello, world!") # One param set as partial defs = Definitions( @@ -2499,7 +2512,21 @@ def hello_world_asset(writer: WriterResource): assert defs.get_implicit_global_asset_job_def().execute_in_process().success assert out_txt == ["{hello, world!}"] - out_txt.clear() + + +def test_structured_resource_partial_config_overriding() -> None: + out_txt = [] + + class WriterResource(ConfigurableResource): + prefix: str + postfix: str + + def output(self, text: str) -> None: + out_txt.append(f"{self.prefix}{text}{self.postfix}") + + @asset + def hello_world_asset(writer: WriterResource): + writer.output("hello, world!") # Overriding partial param defs = Definitions( @@ -2528,3 +2555,189 @@ def hello_world_asset(writer: WriterResource): ) assert out_txt == ["*hello, world!*"] out_txt.clear() + + +def test_nested_resources_partial_config() -> None: + class AWSCredentialsResource(ConfigurableResource): + username: str + password: str + + class S3Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + bucket_name: str + + class EC2Resource(ConfigurableResource): + aws_credentials: AWSCredentialsResource + + completed = {} + + @asset + def my_asset(s3: S3Resource, ec2: EC2Resource): + assert s3.aws_credentials.username == "foo" + assert s3.aws_credentials.password == "bar" + assert s3.bucket_name == "my_bucket" + + assert ec2.aws_credentials.username == "foo" + assert ec2.aws_credentials.password == "bar" + + completed["yes"] = True + + aws_credentials = AWSCredentialsResource.partial(username="foo") + defs = Definitions( + assets=[my_asset], + resources={ + "aws_credentials": aws_credentials, + "s3": S3Resource(bucket_name="my_bucket", aws_credentials=aws_credentials), + "ec2": EC2Resource(aws_credentials=aws_credentials), + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "aws_credentials": { + "config": { + "password": "bar", + } + } + } + } + ) + .success + ) + assert completed["yes"] + + +def test_nested_resources_partial_config_complex() -> None: + class CredentialsResource(ConfigurableResource): + username: str + password: str + + class DBConfigResource(ConfigurableResource): + creds: CredentialsResource + host: str + database: str + + class DBResource(ConfigurableResource): + config: DBConfigResource + + completed = {} + + @asset + def my_asset(db: DBResource): + assert db.config.creds.username == "foo" + assert db.config.creds.password == "bar" + assert db.config.host == "localhost" + assert db.config.database == "my_db" + completed["yes"] = True + + credentials = CredentialsResource.partial(username="foo") + db_config = DBConfigResource.partial(creds=credentials, host="localhost") + db = DBResource(config=db_config) + + defs = Definitions( + assets=[my_asset], + resources={ + "credentials": credentials, + "db_config": db_config, + "db": db, + }, + ) + + assert ( + defs.get_implicit_global_asset_job_def() + .execute_in_process( + { + "resources": { + "credentials": { + "config": { + "password": "bar", + } + }, + "db_config": { + "config": { + "host": "localhost", + "database": "my_db", + } + }, + } + } + ) + .success + ) + assert completed["yes"] + + +def test_structured_configure_at_launch_error() -> None: + # Cannot pass any configuration information into a resource when calling + # configure_at_launch, you should instead call partial + + out_txt = [] + + class WriterResource(ConfigurableResource): + prefix: str + postfix: str + + def output(self, text: str) -> None: + out_txt.append(f"{self.prefix}{text}{self.postfix}") + + @asset + def hello_world_asset(writer: WriterResource): + writer.output("hello, world!") + + with pytest.raises( + DagsterInvalidDefinitionError, + match=( + "'WriterResource.configure_at_launch' was called but non-resource parameters were" + " passed: \\['prefix'\\]. Did you mean to call 'WriterResource.partial' instead?" + ), + ): + Definitions( + assets=[hello_world_asset], + resources={"writer": WriterResource.configure_at_launch(prefix="(")}, + ) + + with pytest.raises( + DagsterInvalidDefinitionError, + match=( + "'WriterResource.configure_at_launch' was called but non-resource parameters were" + " passed: \\['prefix', 'postfix'\\]. Did you mean to call 'WriterResource.partial'" + " instead?" + ), + ): + Definitions( + assets=[hello_world_asset], + resources={"writer": WriterResource.configure_at_launch(prefix="(", postfix=")")}, + ) + + +def test_structured_resource_partial_config_missing() -> None: + out_txt = [] + + class WriterResource(ConfigurableResource): + prefix: str + postfix: str + + def output(self, text: str) -> None: + out_txt.append(f"{self.prefix}{text}{self.postfix}") + + @asset + def hello_world_asset(writer: WriterResource): + writer.output("hello, world!") + + # One param set as partial + defs = Definitions( + assets=[hello_world_asset], + resources={"writer": WriterResource.partial(prefix="(")}, + ) + + with pytest.raises( + DagsterInvalidConfigError, + match=( + 'Missing required config entry "resources" at the root. Sample config for missing' + " entry: {'resources': {'writer': {'config': {'postfix': '...'}}}}" + ), + ): + assert defs.get_implicit_global_asset_job_def().execute_in_process().success