Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pythonic resources] Partial config support for resources #12870

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,65 +12,98 @@
Field,
Selector,
)
from dagster._config.config_type import Array, ConfigType, Noneable
from dagster._config.field_utils import FIELD_NO_DEFAULT_PROVIDED, Map, convert_potential_field
from dagster._config.post_process import resolve_defaults
from dagster._config.config_type import Array, ConfigType, ConfigTypeKind, Noneable
from dagster._config.field_utils import (
FIELD_NO_DEFAULT_PROVIDED,
Map,
_ConfigHasFields,
convert_potential_field,
)
from dagster._config.pythonic_config.attach_other_object_to_context import (
IAttachDifferentObjectToOpContext as IAttachDifferentObjectToOpContext,
)
from dagster._config.pythonic_config.type_check_utils import safe_is_subclass
from dagster._config.source import BoolSource, IntSource, StringSource
from dagster._config.validate import validate_config
from dagster._core.definitions.definition_config_schema import DefinitionConfigSchema
from dagster._core.errors import (
DagsterInvalidConfigDefinitionError,
DagsterInvalidConfigError,
DagsterInvalidDefinitionError,
DagsterInvalidPythonicConfigDefinitionError,
)
from dagster._model.pydantic_compat_layer import ModelFieldCompat, PydanticUndefined, model_fields
from dagster._utils.typing_api import is_closed_python_optional_type


def _create_new_default_from_subfields(
old_field: Field, updated_sub_fields: dict[str, Field], additional_default_values: dict
) -> Any:
"""Generates a replacement default value for a field based on its updated subfields,
if possible.

We can only build a replacement default if all required subfields have defaults,
or the field is a Selector. In this case, we can construct a new default value
composed of the defaults of the subfields.
"""
if all(
sub_field.default_provided or not sub_field.is_required
for sub_field in updated_sub_fields.values()
) or (
old_field.config_type.kind == ConfigTypeKind.SELECTOR
and any(sub_field.default_provided for sub_field in updated_sub_fields.values())
):
return {
**additional_default_values,
**{k: v.default_value for k, v in updated_sub_fields.items() if v.default_provided},
}
else:
return old_field.default_value if old_field.default_provided else FIELD_NO_DEFAULT_PROVIDED


# This is from https://github.com/dagster-io/dagster/pull/11470
def _apply_defaults_to_schema_field(field: Field, additional_default_values: Any) -> Field:
# This work by validating the top-level config and then
# just setting it at that top-level field. Config fields
# can actually take nested values so we only need to set it
# at a single level

evr = validate_config(field.config_type, additional_default_values)

if not evr.success:
raise DagsterInvalidConfigError(
"Incorrect values passed to .configured",
evr.errors,
additional_default_values,
)
def _apply_defaults_to_schema_field(old_field: Field, additional_default_values: Any) -> Field:
"""Given a config Field and a set of default values (usually a dictionary or raw default value),
return a new Field with the default values applied to it (and recursively to any sub-fields).
"""
# Any config type which does not have subfields or which doesn't have a new default value
# to apply can be copied directly.
if not isinstance(old_field.config_type, _ConfigHasFields) or not isinstance(
additional_default_values, dict
):
return copy_with_default(old_field, additional_default_values)

if field.default_provided:
# In the case where there is already a default config value
# we can apply "additional" defaults by actually invoking
# the config machinery. Meaning we pass the new_additional_default_values
# and then resolve the existing defaults over them. This preserves the default
# values that are not specified in new_additional_default_values and then
# applies the new value as the default value of the field in question.
defaults_processed_evr = resolve_defaults(field.config_type, additional_default_values)
check.invariant(
defaults_processed_evr.success,
"Since validation passed, this should always work.",
# If the field has subfields and the default value is a dictionary, iterate
# over the subfields and apply the defaults to them.
updated_sub_fields = {
k: _apply_defaults_to_schema_field(
sub_field, additional_default_values.get(k, FIELD_NO_DEFAULT_PROVIDED)
)
default_to_pass = defaults_processed_evr.value
return copy_with_default(field, default_to_pass)
else:
return copy_with_default(field, additional_default_values)
for k, sub_field in old_field.config_type.fields.items()
}

# We also compute a replacement default value, if possible.
new_default_value = _create_new_default_from_subfields(
old_field, updated_sub_fields, additional_default_values
)

return Field(
config=old_field.config_type.__class__(fields=updated_sub_fields),
default_value=new_default_value,
is_required=new_default_value == FIELD_NO_DEFAULT_PROVIDED and old_field.is_required,
description=old_field.description,
)


def copy_with_default(old_field: Field, new_config_value: Any) -> Field:
"""Copies a Field, but replaces the default value with the provided value.
Also updates the is_required flag depending on whether the new config value is
actually specified.
"""
return Field(
config=old_field.config_type,
default_value=new_config_value,
is_required=False,
default_value=old_field.default_value
if new_config_value == FIELD_NO_DEFAULT_PROVIDED and old_field.default_provided
else new_config_value,
is_required=new_config_value == FIELD_NO_DEFAULT_PROVIDED and old_field.is_required,
description=old_field.description,
)

Expand Down
51 changes: 41 additions & 10 deletions python_modules/dagster/dagster/_config/pythonic_config/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import dagster._check as check
from dagster import Field as DagsterField
from dagster._annotations import deprecated
from dagster._annotations import deprecated, experimental
from dagster._config.field_utils import config_dictionary_from_values
from dagster._config.pythonic_config.attach_other_object_to_context import (
IAttachDifferentObjectToOpContext as IAttachDifferentObjectToOpContext,
Expand All @@ -41,6 +41,20 @@
ConfiguredDefinitionConfigSchema,
DefinitionConfigSchema,
)
from dagster._core.errors import DagsterInvalidConfigError, DagsterInvalidDefinitionError
from dagster._core.execution.context.init import InitResourceContext, build_init_resource_context
from dagster._model.pydantic_compat_layer import model_fields
from dagster._utils.cached_method import cached_method
from dagster._utils.typing_api import is_closed_python_optional_type

try:
from functools import cached_property # type: ignore # (py37 compat)
except ImportError:

class cached_property:
pass


from dagster._core.definitions.resource_definition import (
ResourceDefinition,
ResourceFunction,
Expand All @@ -49,12 +63,7 @@
has_at_least_one_parameter,
)
from dagster._core.definitions.resource_requirement import ResourceRequirement
from dagster._core.errors import DagsterInvalidConfigError, DagsterInvalidDefinitionError
from dagster._core.execution.context.init import InitResourceContext, build_init_resource_context
from dagster._model.pydantic_compat_layer import model_fields
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._utils.typing_api import is_closed_python_optional_type

T_Self = TypeVar("T_Self", bound="ConfigurableResourceFactory")
ResourceId: TypeAlias = int
Expand Down Expand Up @@ -322,6 +331,14 @@ def configure_at_launch(cls: "type[T_Self]", **kwargs) -> "PartialResource[T_Sel
"""
return PartialResource(cls, data=kwargs)

@experimental
@classmethod
def partial(cls: "type[T_Self]", **kwargs) -> "PartialResource[T_Self]":
"""Returns a partially initialized copy of the resource, with remaining config fields
set at runtime.
"""
return PartialResource(cls, data=kwargs, is_partial=True)

def _with_updated_values(
self, values: Optional[Mapping[str, Any]]
) -> "ConfigurableResourceFactory[TResValue]":
Expand Down Expand Up @@ -654,8 +671,18 @@ def __init__(
self,
resource_cls: type[ConfigurableResourceFactory[TResValue]],
data: dict[str, Any],
is_partial: bool = False,
):
resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data)
resource_pointers, data_without_resources = separate_resource_params(resource_cls, data)

if not is_partial and data_without_resources:
resource_name = resource_cls.__name__
parameter_names = list(data_without_resources.keys())
raise DagsterInvalidDefinitionError(
f"'{resource_name}.configure_at_launch' was called but non-resource parameters"
f" were passed: {parameter_names}. Did you mean to call '{resource_name}.partial'"
" instead?"
)

super().__init__(data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs

Expand All @@ -668,15 +695,19 @@ def resource_fn(context: InitResourceContext):
) # So that collisions are resolved in favor of the latest provided run config
return instantiated._get_initialize_and_run_fn()(context) # noqa: SLF001

schema = infer_schema_from_config_class(
resource_cls, fields_to_omit=set(resource_pointers.keys())
)
resolved_config_dict = config_dictionary_from_values(data_without_resources, schema)
curried_schema = _curry_config_schema(schema, resolved_config_dict)

self._state__internal__ = PartialResourceState(
# We keep track of any resources we depend on which are not fully configured
# so that we can retrieve them at runtime
nested_partial_resources={
k: v for k, v in resource_pointers.items() if (not _is_fully_configured(v))
},
config_schema=infer_schema_from_config_class(
resource_cls, fields_to_omit=set(resource_pointers.keys())
),
config_schema=curried_schema.as_field(),
resource_fn=resource_fn,
description=resource_cls.__doc__,
nested_resources={k: v for k, v in resource_pointers.items()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def asset_with_resource(context, my_resource: MyResource):

result_two = materialize(
[asset_with_resource],
resources={"my_resource": MyResource.configure_at_launch(my_enum=AnotherEnum.A)},
resources={"my_resource": MyResource.partial(my_enum=AnotherEnum.A)},
run_config={"resources": {"my_resource": {"config": {"my_enum": "B"}}}},
)

Expand Down
Loading
Loading