From c5ceba9d00b8800ab177d8b2bb5b38578025c9c0 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 15 Jan 2025 22:09:22 -0500 Subject: [PATCH] [4/n][dagster-sling] Deprecate DagsterSlingTranslator.get_* methods --- .../dagster_sling/dagster_sling_translator.py | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py index 3384b74e2fa6e..6bda1447453bb 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py @@ -4,7 +4,7 @@ from typing import Any, Optional from dagster import AssetKey, AssetSpec, AutoMaterializePolicy, FreshnessPolicy, MetadataValue -from dagster._annotations import public +from dagster._annotations import deprecated, public @dataclass @@ -53,6 +53,10 @@ def sanitize_stream_name(self, stream_name: str) -> str: """ return re.sub(r"[^a-zA-Z0-9_.]", "_", stream_name.replace('"', "").lower()) + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).key` instead.", + ) @public def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey: """A function that takes a stream definition from a Sling replication config and returns a @@ -158,6 +162,12 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe sanitized_components = self.sanitize_stream_name(stream_name).split(".") return AssetKey([self.target_prefix] + sanitized_components) + @deprecated( + breaking_version="1.11", + additional_warn_text=( + "Iterate over `DagsterSlingTranslator.get_asset_spec(...).deps` to access `AssetDep.asset_key` instead." + ), + ) @public def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]: """A function that takes a stream name from a Sling replication config and returns a @@ -241,6 +251,10 @@ def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[Ass components = self.sanitize_stream_name(stream_name).split(".") return [AssetKey(components)] + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).description` instead.", + ) @public def get_description(self, stream_definition: Mapping[str, Any]) -> Optional[str]: """Retrieves the description for a given stream definition. @@ -279,6 +293,10 @@ def _default_description_fn(self, stream_definition: Mapping[str, Any]) -> Optio description = meta.get("dagster", {}).get("description") return description + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).metadata` instead.", + ) @public def get_metadata(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]: """Retrieves the metadata for a given stream definition. @@ -310,6 +328,10 @@ def _default_metadata_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[ """ return {"stream_config": MetadataValue.json(stream_definition.get("config", {}))} + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).tags` instead.", + ) @public def get_tags(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]: """Retrieves the tags for a given stream definition. @@ -341,6 +363,10 @@ def _default_tags_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[str, """ return {} + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).kinds` instead.", + ) @public def get_kinds(self, stream_definition: Mapping[str, Any]) -> set[str]: """Retrieves the kinds for a given stream definition. @@ -370,6 +396,10 @@ def _default_kinds_fn(self, stream_definition: Mapping[str, Any]) -> set[str]: """ return {"sling"} + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).group_name` instead.", + ) @public def get_group_name(self, stream_definition: Mapping[str, Any]) -> Optional[str]: """Retrieves the group name for a given stream definition. @@ -403,6 +433,10 @@ def _default_group_name_fn(self, stream_definition: Mapping[str, Any]) -> Option meta = config.get("meta", {}) return meta.get("dagster", {}).get("group") + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).freshness_policy` instead.", + ) @public def get_freshness_policy( self, stream_definition: Mapping[str, Any] @@ -452,6 +486,10 @@ def _default_freshness_policy_fn( cron_schedule_timezone=freshness_policy_config.get("cron_schedule_timezone"), ) + @deprecated( + breaking_version="1.11", + additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).auto_materialize_policy` instead.", + ) @public def get_auto_materialize_policy( self, stream_definition: Mapping[str, Any]