Skip to content

Commit

Permalink
[4/n][dagster-sling] Deprecate DagsterSlingTranslator.get_* methods
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Jan 16, 2025
1 parent c6cd280 commit c5ceba9
Showing 1 changed file with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, Optional

from dagster import AssetKey, AssetSpec, AutoMaterializePolicy, FreshnessPolicy, MetadataValue
from dagster._annotations import public
from dagster._annotations import deprecated, public


@dataclass
Expand Down Expand Up @@ -53,6 +53,10 @@ def sanitize_stream_name(self, stream_name: str) -> str:
"""
return re.sub(r"[^a-zA-Z0-9_.]", "_", stream_name.replace('"', "").lower())

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).key` instead.",
)
@public
def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
"""A function that takes a stream definition from a Sling replication config and returns a
Expand Down Expand Up @@ -158,6 +162,12 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe
sanitized_components = self.sanitize_stream_name(stream_name).split(".")
return AssetKey([self.target_prefix] + sanitized_components)

@deprecated(
breaking_version="1.11",
additional_warn_text=(
"Iterate over `DagsterSlingTranslator.get_asset_spec(...).deps` to access `AssetDep.asset_key` instead."
),
)
@public
def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]:
"""A function that takes a stream name from a Sling replication config and returns a
Expand Down Expand Up @@ -241,6 +251,10 @@ def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[Ass
components = self.sanitize_stream_name(stream_name).split(".")
return [AssetKey(components)]

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).description` instead.",
)
@public
def get_description(self, stream_definition: Mapping[str, Any]) -> Optional[str]:
"""Retrieves the description for a given stream definition.
Expand Down Expand Up @@ -279,6 +293,10 @@ def _default_description_fn(self, stream_definition: Mapping[str, Any]) -> Optio
description = meta.get("dagster", {}).get("description")
return description

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).metadata` instead.",
)
@public
def get_metadata(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]:
"""Retrieves the metadata for a given stream definition.
Expand Down Expand Up @@ -310,6 +328,10 @@ def _default_metadata_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[
"""
return {"stream_config": MetadataValue.json(stream_definition.get("config", {}))}

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).tags` instead.",
)
@public
def get_tags(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]:
"""Retrieves the tags for a given stream definition.
Expand Down Expand Up @@ -341,6 +363,10 @@ def _default_tags_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[str,
"""
return {}

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).kinds` instead.",
)
@public
def get_kinds(self, stream_definition: Mapping[str, Any]) -> set[str]:
"""Retrieves the kinds for a given stream definition.
Expand Down Expand Up @@ -370,6 +396,10 @@ def _default_kinds_fn(self, stream_definition: Mapping[str, Any]) -> set[str]:
"""
return {"sling"}

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).group_name` instead.",
)
@public
def get_group_name(self, stream_definition: Mapping[str, Any]) -> Optional[str]:
"""Retrieves the group name for a given stream definition.
Expand Down Expand Up @@ -403,6 +433,10 @@ def _default_group_name_fn(self, stream_definition: Mapping[str, Any]) -> Option
meta = config.get("meta", {})
return meta.get("dagster", {}).get("group")

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).freshness_policy` instead.",
)
@public
def get_freshness_policy(
self, stream_definition: Mapping[str, Any]
Expand Down Expand Up @@ -452,6 +486,10 @@ def _default_freshness_policy_fn(
cron_schedule_timezone=freshness_policy_config.get("cron_schedule_timezone"),
)

@deprecated(
breaking_version="1.11",
additional_warn_text="Use `DagsterSlingTranslator.get_asset_spec(...).auto_materialize_policy` instead.",
)
@public
def get_auto_materialize_policy(
self, stream_definition: Mapping[str, Any]
Expand Down

0 comments on commit c5ceba9

Please sign in to comment.