Skip to content

Commit

Permalink
[1/n][dagster-sling] Move asset spec creation to DagsterSlingTranslat…
Browse files Browse the repository at this point in the history
…or.get_asset_spec
  • Loading branch information
maximearmstrong committed Jan 15, 2025
1 parent 88d0828 commit b4209ba
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from dagster import (
AssetsDefinition,
AssetSpec,
BackfillPolicy,
PartitionsDefinition,
_check as check,
Expand Down Expand Up @@ -124,23 +123,13 @@ def my_assets(context, sling: SlingResource):
op_tags=op_tags,
backfill_policy=backfill_policy,
specs=[
AssetSpec(
key=dagster_sling_translator.get_asset_key(stream),
deps=dagster_sling_translator.get_deps_asset_key(stream),
description=dagster_sling_translator.get_description(stream),
dagster_sling_translator.get_asset_spec(stream)
.replace_attributes(code_version=code_version)
.merge_attributes(
metadata={
**dagster_sling_translator.get_metadata(stream),
METADATA_KEY_TRANSLATOR: dagster_sling_translator,
METADATA_KEY_REPLICATION_CONFIG: replication_config,
},
tags=dagster_sling_translator.get_tags(stream),
kinds=dagster_sling_translator.get_kinds(stream),
group_name=dagster_sling_translator.get_group_name(stream),
freshness_policy=dagster_sling_translator.get_freshness_policy(stream),
auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy(
stream
),
code_version=code_version,
}
)
for stream in streams
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,34 @@
from dataclasses import dataclass
from typing import Any, Optional

from dagster import AssetKey, AutoMaterializePolicy, FreshnessPolicy, MetadataValue
from dagster import AssetKey, AssetSpec, AutoMaterializePolicy, FreshnessPolicy, MetadataValue
from dagster._annotations import public


@dataclass
class DagsterSlingTranslator:
target_prefix: str = "target"

@public
def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetSpec:
"""A function that takes a stream definition from a Sling replication config and returns a
Dagster AssetSpec.
The stream definition is a dictionary key/value pair where the key is the stream name and
the value is a dictionary representing the Sling Replication Stream Config.
"""
return AssetSpec(
key=self.get_asset_key(stream_definition),
deps=self.get_deps_asset_key(stream_definition),
description=self.get_description(stream_definition),
metadata=self.get_metadata(stream_definition),
tags=self.get_tags(stream_definition),
kinds=self.get_kinds(stream_definition),
group_name=self.get_group_name(stream_definition),
freshness_policy=self.get_freshness_policy(stream_definition),
auto_materialize_policy=self.get_auto_materialize_policy(stream_definition),
)

@public
def sanitize_stream_name(self, stream_name: str) -> str:
"""A function that takes a stream name from a Sling replication config and returns a
Expand Down

0 comments on commit b4209ba

Please sign in to comment.