From 0485097b5f915fda1da91d3c9759887e84b72e22 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 16 Jan 2025 13:39:20 -0500 Subject: [PATCH] [6/n][dagster-sling] Update docs with DagsterSlingTranslator.get_asset_spec --- .../dagster_sling/dagster_sling_translator.py | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py index 3d4f9b8926e7e..2ec936a5ab8d5 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py @@ -140,9 +140,10 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey: .. code-block:: python class CustomSlingTranslator(DagsterSlingTranslator): - def get_asset_key_for_target(self, stream_definition: Mapping[str, Any]) -> AssetKey: + def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetKey: + default_spec = super().get_asset_spec(stream_definition) map = {"stream1": "asset1", "stream2": "asset2"} - return AssetKey(map[stream_definition["name"]]) + return default_spec.replace_attributes(key=AssetKey(map[stream_definition["name"]])) """ return self.get_asset_spec(stream_definition).key @@ -180,6 +181,17 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe Returns: AssetKey: The Dagster AssetKey for the replication stream. + + Examples: + Using a custom mapping for streams: + + .. code-block:: python + + class CustomSlingTranslator(DagsterSlingTranslator): + def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetKey: + default_spec = super().get_asset_spec(stream_definition) + map = {"stream1": "asset1", "stream2": "asset2"} + return default_spec.replace_attributes(key=AssetKey(map[stream_definition["name"]])) """ config = stream_definition.get("config", {}) or {} object_key = config.get("object") @@ -206,8 +218,8 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe ) @public def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]: - """A function that takes a stream name from a Sling replication config and returns a - Dagster AssetKey for the dependencies of the replication stream. + """A function that takes a stream definition from a Sling replication config and returns a + Dagster AssetKey for each dependency of the replication stream. By default, this returns the stream name. For example, a stream named "public.accounts" will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts". @@ -227,24 +239,13 @@ def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[A stream_definition (Mapping[str, Any]): A dictionary representing the stream definition Returns: - AssetKey: The Dagster AssetKey dependency for the replication stream. - - Examples: - Using a custom mapping for streams: - - .. code-block:: python - - class CustomSlingTranslator(DagsterSlingTranslator): - def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey: - map = {"stream1": "asset1", "stream2": "asset2"} - return AssetKey(map[stream_definition["name"]]) - + Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream. """ return [dep.asset_key for dep in self.get_asset_spec(stream_definition).deps] def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]: """A function that takes a stream definition from a Sling replication config and returns a - Dagster AssetKey for the dependencies of the replication stream. + Dagster AssetKey for each dependency of the replication stream. This returns the stream name. For example, a stream named "public.accounts" will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts". @@ -263,7 +264,7 @@ def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[Ass stream_definition (Mapping[str, Any]): A dictionary representing the stream definition Returns: - AssetKey: The Dagster AssetKey dependency for the replication stream. + Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream. """ config = stream_definition.get("config", {}) or {} meta = config.get("meta", {})