From 1696cf7d96338678ff1def781a3f367c0911c154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 19 Nov 2024 19:21:16 -0600 Subject: [PATCH 1/4] fix: Respect standard Singer stream metadata for key properties, replication key and replication method - https://hub.meltano.com/singer/spec/#metadata - https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#metadata --- singer_sdk/_singerlib/catalog.py | 1 + singer_sdk/streams/core.py | 25 +++++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/singer_sdk/_singerlib/catalog.py b/singer_sdk/_singerlib/catalog.py index 986272e3c..c1281e091 100644 --- a/singer_sdk/_singerlib/catalog.py +++ b/singer_sdk/_singerlib/catalog.py @@ -87,6 +87,7 @@ class StreamMetadata(Metadata): """Stream metadata.""" table_key_properties: t.Sequence[str] | None = None + replication_key: str | None = None forced_replication_method: str | None = None valid_replication_keys: list[str] | None = None schema_name: str | None = None diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 009726b7f..fc99b82b0 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -56,6 +56,7 @@ if t.TYPE_CHECKING: import logging + from singer_sdk._singerlib.catalog import StreamMetadata from singer_sdk.helpers import types from singer_sdk.helpers._compat import Traversable from singer_sdk.tap_base import Tap @@ -1277,10 +1278,26 @@ def apply_catalog(self, catalog: singer.Catalog) -> None: catalog_entry = catalog.get_stream(self.name) if catalog_entry: - self.primary_keys = catalog_entry.key_properties - self.replication_key = catalog_entry.replication_key - if catalog_entry.replication_method: - self.forced_replication_method = catalog_entry.replication_method + stream_metadata: StreamMetadata | None + if stream_metadata := catalog_entry.metadata.get(()): # type: ignore[assignment] + table_key_properties = stream_metadata.table_key_properties + table_replication_key = stream_metadata.replication_key + table_replication_method = stream_metadata.forced_replication_method + else: + table_key_properties = None + table_replication_key = None + table_replication_method = None + + self.primary_keys = catalog_entry.key_properties or table_key_properties + self.replication_key = ( + catalog_entry.replication_key or table_replication_key + ) + + replication_method = ( + catalog_entry.replication_method or table_replication_method + ) + if replication_method: + self.forced_replication_method = replication_method def _get_state_partition_context( self, From f884924a263fc15ca5bbd29e26fa050299af7d82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 19 Nov 2024 20:22:57 -0600 Subject: [PATCH 2/4] Add test --- tests/core/test_streams.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 038f87657..b9a158bb4 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -106,6 +106,43 @@ def test_stream_apply_catalog(stream: Stream): assert stream.forced_replication_method == REPLICATION_FULL_TABLE +def test_stream_apply_catalog__singer_standard(stream: Stream): + """Applying a catalog to a stream should overwrite fields.""" + assert stream.primary_keys == [] + assert stream.replication_key == "updatedAt" + assert stream.replication_method == REPLICATION_INCREMENTAL + assert stream.forced_replication_method is None + + stream.apply_catalog( + catalog=Catalog.from_dict( + { + "streams": [ + { + "tap_stream_id": stream.name, + "stream": stream.name, + "schema": stream.schema, + "metadata": [ + { + "breadcrumb": [], + "metadata": { + "table-key-properties": ["id"], + "replication-key": None, + "forced-replication-method": REPLICATION_FULL_TABLE, + }, + }, + ], + }, + ], + }, + ), + ) + + assert stream.primary_keys == ["id"] + assert stream.replication_key is None + assert stream.replication_method == REPLICATION_FULL_TABLE + assert stream.forced_replication_method == REPLICATION_FULL_TABLE + + @pytest.mark.parametrize( "stream_name,forced_replication_method,bookmark_value,expected_starting_value", [ From 72478fda99355ee65276cb4fca331ba1b64d561b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 19 Nov 2024 20:33:53 -0600 Subject: [PATCH 3/4] Fix typo in docs --- docs/implementation/catalog_metadata.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/implementation/catalog_metadata.md b/docs/implementation/catalog_metadata.md index b255a5530..090a38969 100644 --- a/docs/implementation/catalog_metadata.md +++ b/docs/implementation/catalog_metadata.md @@ -1,6 +1,6 @@ # Catalog Metadata -The SDK automatically generates catalog metadata during catalog discovery. Selection rules overrided by a user will be respected. +The SDK automatically generates catalog metadata during catalog discovery. Selection rules overridden by a user will be respected. Primary key properties may not be deselected, as these are required for `key_properties` to be declared in stream messages. From 3f22f38448e26822cb24e52fe22bbbc31ce5d2ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 19 Nov 2024 20:45:06 -0600 Subject: [PATCH 4/4] Test non-null replication key override --- tests/core/test_streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index b9a158bb4..6dabf1f28 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -126,7 +126,7 @@ def test_stream_apply_catalog__singer_standard(stream: Stream): "breadcrumb": [], "metadata": { "table-key-properties": ["id"], - "replication-key": None, + "replication-key": "newReplicationKey", "forced-replication-method": REPLICATION_FULL_TABLE, }, }, @@ -138,7 +138,7 @@ def test_stream_apply_catalog__singer_standard(stream: Stream): ) assert stream.primary_keys == ["id"] - assert stream.replication_key is None + assert stream.replication_key == "newReplicationKey" assert stream.replication_method == REPLICATION_FULL_TABLE assert stream.forced_replication_method == REPLICATION_FULL_TABLE