From 886964aa38b58a993fc860cbc7304d5b2eb5d014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Mon, 28 Aug 2023 12:15:25 -0600 Subject: [PATCH 1/3] Add incremental test --- tests/test_core.py | 86 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index c7949639..1ae44381 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -411,6 +411,92 @@ def test_filter_schemas(): assert tap_catalog["streams"][0]["stream"] == altered_table_name +def test_incremental(): + """Test incremental replication state.""" + table_name = "test_incremental" + engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"]) + + metadata_obj = MetaData() + table = Table( + table_name, + metadata_obj, + Column("id", Integer), + Column("name", String), + Column("updated_at", DateTime), + ) + with engine.connect() as conn: + if table.exists(conn): + table.drop(conn) + metadata_obj.create_all(conn) + insert = table.insert().values( + [ + { + "id": 1, + "name": "foo", + "updated_at": datetime.datetime( + 2022, + 10, + 1, + tzinfo=datetime.timezone.utc, + ), + }, + { + "id": 2, + "name": "bar", + "updated_at": datetime.datetime( + 2022, + 1, + 1, + tzinfo=datetime.timezone.utc, + ), + }, + { + "id": 1, + "name": "baz", + "updated_at": datetime.datetime( + 2022, + 4, + 1, + tzinfo=datetime.timezone.utc, + ), + }, + ], + ) + conn.execute(insert) + + tap = TapPostgres(config=SAMPLE_CONFIG) + tap_catalog = json.loads(tap.catalog_json_text) + altered_table_name = f"public-{table_name}" + + for stream in tap_catalog["streams"]: + if stream.get("stream") and altered_table_name not in stream["stream"]: + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = False + else: + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = True + if metadata["breadcrumb"] == []: + metadata["metadata"]["replication-method"] = "INCREMENTAL" + + test_runner = PostgresTestRunner( + tap_class=TapPostgres, + config=SAMPLE_CONFIG, + catalog=tap_catalog, + ) + test_runner.sync_all() + assert test_runner.state_messages[-1] == { + "type": "STATE", + "value": { + "bookmarks": { + altered_table_name: { + "replication_key": "updated_at", + "replication_key_value": "2022-10-01T00:00:00+00:00", + }, + }, + }, + } + + class PostgresTestRunner(TapTestRunner): def run_sync_dry_run(self) -> bool: """Dislike this function and how TestRunner does this so just hacking it here. From cef7cee72766c812bf83e8c659243fb23d4a0aad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Mon, 28 Aug 2023 12:20:30 -0600 Subject: [PATCH 2/3] Add replication key to metadata --- tests/test_core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index 1ae44381..f88244f0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -424,6 +424,8 @@ def test_incremental(): Column("name", String), Column("updated_at", DateTime), ) + replication_key = "updated_at" + with engine.connect() as conn: if table.exists(conn): table.drop(conn) @@ -477,6 +479,7 @@ def test_incremental(): metadata["metadata"]["selected"] = True if metadata["breadcrumb"] == []: metadata["metadata"]["replication-method"] = "INCREMENTAL" + metadata["metadata"]["replication-key"] = replication_key test_runner = PostgresTestRunner( tap_class=TapPostgres, @@ -489,7 +492,7 @@ def test_incremental(): "value": { "bookmarks": { altered_table_name: { - "replication_key": "updated_at", + "replication_key": replication_key, "replication_key_value": "2022-10-01T00:00:00+00:00", }, }, From 7d107035deffa6c471e20c7ab06857280cf36a4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 20 Feb 2024 15:32:56 -0600 Subject: [PATCH 3/3] Merge --- tests/test_core.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index f88244f0..c547abd7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -414,21 +414,20 @@ def test_filter_schemas(): def test_incremental(): """Test incremental replication state.""" table_name = "test_incremental" - engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"]) + engine = sa.create_engine(SAMPLE_CONFIG["sqlalchemy_url"]) - metadata_obj = MetaData() - table = Table( + metadata_obj = sa.MetaData() + table = sa.Table( table_name, metadata_obj, - Column("id", Integer), - Column("name", String), - Column("updated_at", DateTime), + sa.Column("id", sa.Integer), + sa.Column("name", sa.String), + sa.Column("updated_at", sa.DateTime), ) replication_key = "updated_at" with engine.connect() as conn: - if table.exists(conn): - table.drop(conn) + table.drop(conn, checkfirst=True) metadata_obj.create_all(conn) insert = table.insert().values( [