diff --git a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py index faf32b939f231..a9c4fb333eaa1 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py @@ -140,9 +140,10 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey: .. code-block:: python class CustomSlingTranslator(DagsterSlingTranslator): - def get_asset_key_for_target(self, stream_definition) -> AssetKey: + def get_asset_spec(self, stream_definition) -> AssetKey: + default_spec = super().get_asset_spec(stream_definition) map = {"stream1": "asset1", "stream2": "asset2"} - return AssetKey(map[stream_name]) + return default_spec.replace_attributes(key=AssetKey(map[stream_name])) """ return self.get_asset_spec(stream_definition).key @@ -182,6 +183,17 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe Returns: AssetKey: The Dagster AssetKey for the replication stream. + + Examples: + Using a custom mapping for streams: + + .. code-block:: python + + class CustomSlingTranslator(DagsterSlingTranslator): + def get_asset_spec(self, stream_definition) -> AssetKey: + default_spec = super().get_asset_spec(stream_definition) + map = {"stream1": "asset1", "stream2": "asset2"} + return default_spec.replace_attributes(key=AssetKey(map[stream_name])) """ config = stream_definition.get("config", {}) or {} object_key = config.get("object") @@ -208,13 +220,13 @@ def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKe ) @public def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]: - """A function that takes a stream name from a Sling replication config and returns a - Dagster AssetKey for the dependencies of the replication stream. + """A function that takes a stream definition from a Sling replication config and returns a + Dagster AssetKey for each dependency of the replication stream. By default, this returns the stream name. For example, a stream named "public.accounts" will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts". - Override this function to customize how to map a Sling stream to a Dagster depenency. + Override this function to customize how to map a Sling stream to a Dagster dependency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows: @@ -226,32 +238,21 @@ def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[A deps: "sourcedb_users" Args: - stream_name (str): The name of the stream. + stream_definition (str): The name of the stream. Returns: - AssetKey: The Dagster AssetKey dependency for the replication stream. - - Examples: - Using a custom mapping for streams: - - .. code-block:: python - - class CustomSlingTranslator(DagsterSlingTranslator): - def get_deps_asset_key(self, stream_name: str) -> AssetKey: - map = {"stream1": "asset1", "stream2": "asset2"} - return AssetKey(map[stream_name]) - + Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream. """ return [dep.asset_key for dep in self.get_asset_spec(stream_definition).deps] def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]: - """A function that takes a stream name from a Sling replication config and returns a - Dagster AssetKey for the dependencies of the replication stream. + """A function that takes a stream definition from a Sling replication config and returns a + Dagster AssetKey for each dependency of the replication stream. By default, this returns the stream name. For example, a stream named "public.accounts" will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts". - Override this function to customize how to map a Sling stream to a Dagster depenency. + Override this function to customize how to map a Sling stream to a Dagster dependency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows: @@ -263,10 +264,10 @@ def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[Ass deps: "sourcedb_users" Args: - stream_name (str): The name of the stream. + stream_definition (str): The name of the stream. Returns: - AssetKey: The Dagster AssetKey dependency for the replication stream. + Iterable[AssetKey]: A list of Dagster AssetKey for each dependency of the replication stream. """ config = stream_definition.get("config", {}) or {} meta = config.get("meta", {})