diff --git a/python_modules/libraries/dagster-sling/dagster_sling/resources.py b/python_modules/libraries/dagster-sling/dagster_sling/resources.py index d04cc56b2e57c..026558c79fd2b 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling/resources.py +++ b/python_modules/libraries/dagster-sling/dagster_sling/resources.py @@ -198,7 +198,7 @@ def _get_replication_streams_for_context( streams = streams_with_default_dagster_meta(raw_streams, replication_config) selected_asset_keys = context.selected_asset_keys for stream in streams: - asset_key = dagster_sling_translator.get_asset_key(stream) + asset_key = dagster_sling_translator.get_asset_spec(stream).key if asset_key in selected_asset_keys: context_streams.update({stream["name"]: stream["config"]}) @@ -401,7 +401,7 @@ def _replicate( # TODO: In the future, it'd be nice to yield these materializations as they come in # rather than waiting until the end of the replication for stream in stream_definitions: - asset_key = dagster_sling_translator.get_asset_key(stream) + asset_key = dagster_sling_translator.get_asset_spec(stream).key object_key = (stream.get("config") or {}).get("object") destination_stream_name = object_key or stream["name"] diff --git a/python_modules/libraries/dagster-sling/dagster_sling_tests/test_asset_decorator.py b/python_modules/libraries/dagster-sling/dagster_sling_tests/test_asset_decorator.py index ab124e73cae6c..e1a9abc94108f 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling_tests/test_asset_decorator.py @@ -230,15 +230,9 @@ class CustomSlingTranslator(DagsterSlingTranslator): def get_asset_spec(self, stream_definition: Mapping[str, Any]) -> AssetSpec: default_spec = super().get_asset_spec(stream_definition) return default_spec.replace_attributes( - kinds=["sling", "foo"], tags={"custom_tag": "custom_value"} + kinds={"sling", "foo"}, tags={"custom_tag": "custom_value"} ) - def get_tags(self, stream_definition): - return {"custom_tag": "custom_value"} - - def get_kinds(self, stream_definition): - return ["sling", "foo"] - @sling_assets( replication_config=replication_config_path, dagster_sling_translator=CustomSlingTranslator(), diff --git a/python_modules/libraries/dagster-sling/dagster_sling_tests/test_dagster_sling_translator.py b/python_modules/libraries/dagster-sling/dagster_sling_tests/test_dagster_sling_translator.py index 14e084b9d05e7..3cec6130515a4 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling_tests/test_dagster_sling_translator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling_tests/test_dagster_sling_translator.py @@ -28,16 +28,16 @@ def test_sling_translator_sanitize(test, expected): ), ], ) -def test_get_asset_key(stream, expected): +def test_asset_key_from_get_asset_spec(stream, expected): translator = DagsterSlingTranslator() - assert translator.get_asset_key(stream) == AssetKey.from_user_string(expected) + assert translator.get_asset_spec(stream).key == AssetKey.from_user_string(expected) -def test_get_asset_key_error(): +def test_asset_key_from_get_asset_spec_error(): stream_definition = {"name": "foo", "config": {"meta": {"dagster": {"asset_key": "foo$bar"}}}} translator = DagsterSlingTranslator() with pytest.raises(ValueError): - translator.get_asset_key(stream_definition) + translator.get_asset_spec(stream_definition).key # noqa @pytest.mark.parametrize( @@ -55,16 +55,18 @@ def test_get_asset_key_error(): ), ], ) -def test_get_deps_asset_key(stream, expected): +def test_deps_asset_key_from_get_asset_spec(stream, expected): translator = DagsterSlingTranslator() - assert translator.get_deps_asset_key(stream) == [AssetKey.from_user_string(e) for e in expected] + assert [dep.asset_key for dep in translator.get_asset_spec(stream).deps] == [ + AssetKey.from_user_string(e) for e in expected + ] -def test_get_deps_asset_key_error(): +def test_deps_asset_key_from_get_asset_spec_error(): stream_definition = {"name": "foo", "config": {"meta": {"dagster": {"deps": "foo$bar"}}}} translator = DagsterSlingTranslator() with pytest.raises(ValueError): - translator.get_deps_asset_key(stream_definition) + translator.get_asset_spec(stream_definition).deps # noqa @pytest.mark.parametrize( @@ -78,9 +80,9 @@ def test_get_deps_asset_key_error(): ), ], ) -def test_get_description(stream, expected): +def test_description_from_get_asset_spec(stream, expected): translator = DagsterSlingTranslator() - assert translator.get_description(stream) == expected + assert translator.get_asset_spec(stream).description == expected @pytest.mark.parametrize( @@ -93,10 +95,12 @@ def test_get_description(stream, expected): ), ], ) -def test_get_metadata(stream, expected): +def test_metadata_from_get_asset_spec(stream, expected): translator = DagsterSlingTranslator() stream = {"name": "foo", "config": {"foo": "bar"}} - assert translator.get_metadata(stream) == {"stream_config": JsonMetadataValue(stream["config"])} + assert translator.get_asset_spec(stream).metadata == { + "stream_config": JsonMetadataValue(stream["config"]) + } @pytest.mark.parametrize( @@ -109,9 +113,9 @@ def test_get_metadata(stream, expected): ), ], ) -def test_get_group_name(stream, expected): +def test_group_name_from_get_asset_spec(stream, expected): translator = DagsterSlingTranslator() - assert translator.get_group_name(stream) == expected + assert translator.get_asset_spec(stream).group_name == expected @pytest.mark.parametrize( @@ -127,9 +131,9 @@ def test_get_group_name(stream, expected): ), ], ) -def test_get_freshness_policy(stream, expected): +def test_freshness_policy_from_get_asset_spec(stream, expected): translator = DagsterSlingTranslator() - assert translator.get_freshness_policy(stream) == expected + assert translator.get_asset_spec(stream).freshness_policy == expected @pytest.mark.parametrize( @@ -145,6 +149,6 @@ def test_get_freshness_policy(stream, expected): ), ], ) -def test_get_auto_materialize_policy(stream, expected): +def test_auto_materialize_policy_from_get_asset_spec(stream, expected): translator = DagsterSlingTranslator() - assert translator.get_auto_materialize_policy(stream) == expected + assert translator.get_asset_spec(stream).auto_materialize_policy == expected