Skip to content

Commit

Permalink
[6/n][dagster-sling] Update docs with DagsterSlingTranslator.get_asse…
Browse files Browse the repository at this point in the history
…t_spec
  • Loading branch information
maximearmstrong committed Jan 20, 2025
1 parent b1b03d6 commit 0485097
Showing 1 changed file with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
.. code-block:: python
class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_key_for_target(self, stream_definition: Mapping[str, Any]) -> AssetKey:
def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetKey:
default_spec = super().get_asset_spec(stream_definition)
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_definition["name"]])
return default_spec.replace_attributes(key=AssetKey(map[stream_definition["name"]]))
"""
return self.get_asset_spec(stream_definition).key

Expand Down Expand Up @@ -180,6 +181,17 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe
Returns:
AssetKey: The Dagster AssetKey for the replication stream.
Examples:
Using a custom mapping for streams:
.. code-block:: python
class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetKey:
default_spec = super().get_asset_spec(stream_definition)
map = {"stream1": "asset1", "stream2": "asset2"}
return default_spec.replace_attributes(key=AssetKey(map[stream_definition["name"]]))
"""
config = stream_definition.get("config", {}) or {}
object_key = config.get("object")
Expand All @@ -206,8 +218,8 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe
)
@public
def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]:
"""A function that takes a stream name from a Sling replication config and returns a
Dagster AssetKey for the dependencies of the replication stream.
"""A function that takes a stream definition from a Sling replication config and returns a
Dagster AssetKey for each dependency of the replication stream.
By default, this returns the stream name. For example, a stream named "public.accounts"
will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts".
Expand All @@ -227,24 +239,13 @@ def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[A
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition
Returns:
AssetKey: The Dagster AssetKey dependency for the replication stream.
Examples:
Using a custom mapping for streams:
.. code-block:: python
class CustomSlingTranslator(DagsterSlingTranslator):
def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_definition["name"]])
Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream.
"""
return [dep.asset_key for dep in self.get_asset_spec(stream_definition).deps]

def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]:
"""A function that takes a stream definition from a Sling replication config and returns a
Dagster AssetKey for the dependencies of the replication stream.
Dagster AssetKey for each dependency of the replication stream.
This returns the stream name. For example, a stream named "public.accounts"
will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts".
Expand All @@ -263,7 +264,7 @@ def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[Ass
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition
Returns:
AssetKey: The Dagster AssetKey dependency for the replication stream.
Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream.
"""
config = stream_definition.get("config", {}) or {}
meta = config.get("meta", {})
Expand Down

0 comments on commit 0485097

Please sign in to comment.