diff --git a/tests/test_core.py b/tests/test_core.py index c7949639..c547abd7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -411,6 +411,94 @@ def test_filter_schemas(): assert tap_catalog["streams"][0]["stream"] == altered_table_name +def test_incremental(): + """Test incremental replication state.""" + table_name = "test_incremental" + engine = sa.create_engine(SAMPLE_CONFIG["sqlalchemy_url"]) + + metadata_obj = sa.MetaData() + table = sa.Table( + table_name, + metadata_obj, + sa.Column("id", sa.Integer), + sa.Column("name", sa.String), + sa.Column("updated_at", sa.DateTime), + ) + replication_key = "updated_at" + + with engine.connect() as conn: + table.drop(conn, checkfirst=True) + metadata_obj.create_all(conn) + insert = table.insert().values( + [ + { + "id": 1, + "name": "foo", + "updated_at": datetime.datetime( + 2022, + 10, + 1, + tzinfo=datetime.timezone.utc, + ), + }, + { + "id": 2, + "name": "bar", + "updated_at": datetime.datetime( + 2022, + 1, + 1, + tzinfo=datetime.timezone.utc, + ), + }, + { + "id": 1, + "name": "baz", + "updated_at": datetime.datetime( + 2022, + 4, + 1, + tzinfo=datetime.timezone.utc, + ), + }, + ], + ) + conn.execute(insert) + + tap = TapPostgres(config=SAMPLE_CONFIG) + tap_catalog = json.loads(tap.catalog_json_text) + altered_table_name = f"public-{table_name}" + + for stream in tap_catalog["streams"]: + if stream.get("stream") and altered_table_name not in stream["stream"]: + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = False + else: + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = True + if metadata["breadcrumb"] == []: + metadata["metadata"]["replication-method"] = "INCREMENTAL" + metadata["metadata"]["replication-key"] = replication_key + + test_runner = PostgresTestRunner( + tap_class=TapPostgres, + config=SAMPLE_CONFIG, + catalog=tap_catalog, + ) + test_runner.sync_all() + assert test_runner.state_messages[-1] == { + "type": "STATE", + "value": { + "bookmarks": { + altered_table_name: { + "replication_key": replication_key, + "replication_key_value": "2022-10-01T00:00:00+00:00", + }, + }, + }, + } + + class PostgresTestRunner(TapTestRunner): def run_sync_dry_run(self) -> bool: """Dislike this function and how TestRunner does this so just hacking it here.