Skip to content

Commit

Permalink
[6/n][dagster-sling] Update docs with DagsterSlingTranslator.get_asse…
Browse files Browse the repository at this point in the history
…t_spec
  • Loading branch information
maximearmstrong committed Jan 17, 2025
1 parent 1ba176a commit dc63fa0
Showing 1 changed file with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
.. code-block:: python
class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
def get_asset_spec(self, stream_definition) -> AssetKey:
default_spec = super().get_asset_spec(stream_definition)
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
return default_spec.replace_attributes(key=AssetKey(map[stream_name]))
"""
return self.get_asset_spec(stream_definition).key

Expand Down Expand Up @@ -182,6 +183,17 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe
Returns:
AssetKey: The Dagster AssetKey for the replication stream.
Examples:
Using a custom mapping for streams:
.. code-block:: python
class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_spec(self, stream_definition) -> AssetKey:
default_spec = super().get_asset_spec(stream_definition)
map = {"stream1": "asset1", "stream2": "asset2"}
return default_spec.replace_attributes(key=AssetKey(map[stream_name]))
"""
config = stream_definition.get("config", {}) or {}
object_key = config.get("object")
Expand All @@ -208,13 +220,13 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe
)
@public
def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]:
"""A function that takes a stream name from a Sling replication config and returns a
Dagster AssetKey for the dependencies of the replication stream.
"""A function that takes a stream definition from a Sling replication config and returns a
Dagster AssetKey for each dependency of the replication stream.
By default, this returns the stream name. For example, a stream named "public.accounts"
will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts".
Override this function to customize how to map a Sling stream to a Dagster depenency.
Override this function to customize how to map a Sling stream to a Dagster dependency.
Alternatively, you can provide metadata in your Sling replication config to specify the
Dagster AssetKey for a stream as follows:
Expand All @@ -226,32 +238,21 @@ def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[A
deps: "sourcedb_users"
Args:
stream_name (str): The name of the stream.
stream_definition (str): The name of the stream.
Returns:
AssetKey: The Dagster AssetKey dependency for the replication stream.
Examples:
Using a custom mapping for streams:
.. code-block:: python
class CustomSlingTranslator(DagsterSlingTranslator):
def get_deps_asset_key(self, stream_name: str) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream.
"""
return [dep.asset_key for dep in self.get_asset_spec(stream_definition).deps]

def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]:
"""A function that takes a stream name from a Sling replication config and returns a
Dagster AssetKey for the dependencies of the replication stream.
"""A function that takes a stream definition from a Sling replication config and returns a
Dagster AssetKey for each dependency of the replication stream.
By default, this returns the stream name. For example, a stream named "public.accounts"
will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts".
Override this function to customize how to map a Sling stream to a Dagster depenency.
Override this function to customize how to map a Sling stream to a Dagster dependency.
Alternatively, you can provide metadata in your Sling replication config to specify the
Dagster AssetKey for a stream as follows:
Expand All @@ -263,10 +264,10 @@ def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[Ass
deps: "sourcedb_users"
Args:
stream_name (str): The name of the stream.
stream_definition (str): The name of the stream.
Returns:
AssetKey: The Dagster AssetKey dependency for the replication stream.
Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream.
"""
config = stream_definition.get("config", {}) or {}
meta = config.get("meta", {})
Expand Down

0 comments on commit dc63fa0

Please sign in to comment.