File tree Expand file tree Collapse file tree 1 file changed +6
-8
lines changed Expand file tree Collapse file tree 1 file changed +6
-8
lines changed Original file line number Diff line number Diff line change @@ -379,7 +379,7 @@ def _increment_stream_state(
379
379
f"stream(replication method={ self .replication_method } )"
380
380
)
381
381
raise ValueError (msg )
382
- treat_as_sorted = self .is_sorted ()
382
+ treat_as_sorted = self .is_sorted
383
383
if not treat_as_sorted and self .state_partitioning_keys is not None :
384
384
# Streams with custom state partitioning are not resumable.
385
385
treat_as_sorted = False
@@ -475,13 +475,11 @@ def consume(self, message) -> dict | None:
475
475
elif message_payload ["action" ] in delete_actions :
476
476
for column in message_payload ["identity" ]:
477
477
row .update ({column ["name" ]: column ["value" ]})
478
- row .update (
479
- {
480
- "_sdc_deleted_at" : datetime .datetime .utcnow ().strftime (
481
- r"%Y-%m-%dT%H:%M:%SZ"
482
- )
483
- }
484
- )
478
+ row .update ({
479
+ "_sdc_deleted_at" : datetime .datetime .utcnow ().strftime (
480
+ r"%Y-%m-%dT%H:%M:%SZ"
481
+ )
482
+ })
485
483
row .update ({"_sdc_lsn" : message .data_start })
486
484
elif message_payload ["action" ] in truncate_actions :
487
485
self .logger .debug (
You can’t perform that action at this time.
0 commit comments