From b4209ba7476d53689d6f5c02a3a0e76bc8d185eb Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 15 Jan 2025 15:45:31 -0500 Subject: [PATCH] [1/n][dagster-sling] Move asset spec creation to DagsterSlingTranslator.get_asset_spec --- .../dagster_sling/asset_decorator.py | 19 ++++------------ .../dagster_sling/dagster_sling_translator.py | 22 ++++++++++++++++++- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/python_modules/libraries/dagster-sling/dagster_sling/asset_decorator.py b/python_modules/libraries/dagster-sling/dagster_sling/asset_decorator.py index d04458043194c..cd1a294ee5d53 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling/asset_decorator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling/asset_decorator.py @@ -4,7 +4,6 @@ from dagster import ( AssetsDefinition, - AssetSpec, BackfillPolicy, PartitionsDefinition, _check as check, @@ -124,23 +123,13 @@ def my_assets(context, sling: SlingResource): op_tags=op_tags, backfill_policy=backfill_policy, specs=[ - AssetSpec( - key=dagster_sling_translator.get_asset_key(stream), - deps=dagster_sling_translator.get_deps_asset_key(stream), - description=dagster_sling_translator.get_description(stream), + dagster_sling_translator.get_asset_spec(stream) + .replace_attributes(code_version=code_version) + .merge_attributes( metadata={ - **dagster_sling_translator.get_metadata(stream), METADATA_KEY_TRANSLATOR: dagster_sling_translator, METADATA_KEY_REPLICATION_CONFIG: replication_config, - }, - tags=dagster_sling_translator.get_tags(stream), - kinds=dagster_sling_translator.get_kinds(stream), - group_name=dagster_sling_translator.get_group_name(stream), - freshness_policy=dagster_sling_translator.get_freshness_policy(stream), - auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy( - stream - ), - code_version=code_version, + } ) for stream in streams ], diff --git a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py index 4fca62104bb13..685c7377690e0 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from typing import Any, Optional -from dagster import AssetKey, AutoMaterializePolicy, FreshnessPolicy, MetadataValue +from dagster import AssetKey, AssetSpec, AutoMaterializePolicy, FreshnessPolicy, MetadataValue from dagster._annotations import public @@ -11,6 +11,26 @@ class DagsterSlingTranslator: target_prefix: str = "target" + @public + def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetSpec: + """A function that takes a stream definition from a Sling replication config and returns a + Dagster AssetSpec. + + The stream definition is a dictionary key/value pair where the key is the stream name and + the value is a dictionary representing the Sling Replication Stream Config. + """ + return AssetSpec( + key=self.get_asset_key(stream_definition), + deps=self.get_deps_asset_key(stream_definition), + description=self.get_description(stream_definition), + metadata=self.get_metadata(stream_definition), + tags=self.get_tags(stream_definition), + kinds=self.get_kinds(stream_definition), + group_name=self.get_group_name(stream_definition), + freshness_policy=self.get_freshness_policy(stream_definition), + auto_materialize_policy=self.get_auto_materialize_policy(stream_definition), + ) + @public def sanitize_stream_name(self, stream_name: str) -> str: """A function that takes a stream name from a Sling replication config and returns a