@@ -411,6 +411,92 @@ def test_filter_schemas():
411
411
assert tap_catalog ["streams" ][0 ]["stream" ] == altered_table_name
412
412
413
413
414
+ def test_incremental ():
415
+ """Test incremental replication state."""
416
+ table_name = "test_incremental"
417
+ engine = sqlalchemy .create_engine (SAMPLE_CONFIG ["sqlalchemy_url" ])
418
+
419
+ metadata_obj = MetaData ()
420
+ table = Table (
421
+ table_name ,
422
+ metadata_obj ,
423
+ Column ("id" , Integer ),
424
+ Column ("name" , String ),
425
+ Column ("updated_at" , DateTime ),
426
+ )
427
+ with engine .connect () as conn :
428
+ if table .exists (conn ):
429
+ table .drop (conn )
430
+ metadata_obj .create_all (conn )
431
+ insert = table .insert ().values (
432
+ [
433
+ {
434
+ "id" : 1 ,
435
+ "name" : "foo" ,
436
+ "updated_at" : datetime .datetime (
437
+ 2022 ,
438
+ 10 ,
439
+ 1 ,
440
+ tzinfo = datetime .timezone .utc ,
441
+ ),
442
+ },
443
+ {
444
+ "id" : 2 ,
445
+ "name" : "bar" ,
446
+ "updated_at" : datetime .datetime (
447
+ 2022 ,
448
+ 1 ,
449
+ 1 ,
450
+ tzinfo = datetime .timezone .utc ,
451
+ ),
452
+ },
453
+ {
454
+ "id" : 1 ,
455
+ "name" : "baz" ,
456
+ "updated_at" : datetime .datetime (
457
+ 2022 ,
458
+ 4 ,
459
+ 1 ,
460
+ tzinfo = datetime .timezone .utc ,
461
+ ),
462
+ },
463
+ ],
464
+ )
465
+ conn .execute (insert )
466
+
467
+ tap = TapPostgres (config = SAMPLE_CONFIG )
468
+ tap_catalog = json .loads (tap .catalog_json_text )
469
+ altered_table_name = f"public-{ table_name } "
470
+
471
+ for stream in tap_catalog ["streams" ]:
472
+ if stream .get ("stream" ) and altered_table_name not in stream ["stream" ]:
473
+ for metadata in stream ["metadata" ]:
474
+ metadata ["metadata" ]["selected" ] = False
475
+ else :
476
+ for metadata in stream ["metadata" ]:
477
+ metadata ["metadata" ]["selected" ] = True
478
+ if metadata ["breadcrumb" ] == []:
479
+ metadata ["metadata" ]["replication-method" ] = "INCREMENTAL"
480
+
481
+ test_runner = PostgresTestRunner (
482
+ tap_class = TapPostgres ,
483
+ config = SAMPLE_CONFIG ,
484
+ catalog = tap_catalog ,
485
+ )
486
+ test_runner .sync_all ()
487
+ assert test_runner .state_messages [- 1 ] == {
488
+ "type" : "STATE" ,
489
+ "value" : {
490
+ "bookmarks" : {
491
+ altered_table_name : {
492
+ "replication_key" : "updated_at" ,
493
+ "replication_key_value" : "2022-10-01T00:00:00+00:00" ,
494
+ },
495
+ },
496
+ },
497
+ }
498
+
499
+
414
500
class PostgresTestRunner (TapTestRunner ):
415
501
def run_sync_dry_run (self ) -> bool :
416
502
"""Dislike this function and how TestRunner does this so just hacking it here.
0 commit comments