From 90499717ccdf916bfeebfdd03ada055d530ec86a Mon Sep 17 00:00:00 2001 From: lamhv Date: Sun, 2 Jun 2024 17:07:27 +0700 Subject: [PATCH] fix loss data as described in DBZ-7915 issue. --- .../io/debezium/server/pubsub/PubSubChangeConsumer.java | 9 +++++++-- .../debezium/server/pubsub/PubSubLiteChangeConsumer.java | 8 +++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index ce7ec664..1f24bc64 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -236,8 +236,6 @@ public void handleBatch(List> records, RecordCommitt PubsubMessage message = buildPubSubMessage(record); deliveries.add(publisher.publish(message)); - - committer.markProcessed(record); } List messageIds; try { @@ -247,7 +245,14 @@ public void handleBatch(List> records, RecordCommitt throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); + + LOGGER.trace("Marking {} records as processed.", records.size()); + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + committer.markBatchFinished(); + LOGGER.trace("Batch marked finished"); } private PubsubMessage buildPubSubMessage(ChangeEvent record) { diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java index 6a07a23c..84d58b43 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java @@ -132,7 +132,6 @@ public void handleBatch(List> records, RecordCommitt PubsubMessage message = buildPubSubMessage(record); deliveries.add(publisher.publish(message)); - committer.markProcessed(record); } List messageIds; try { @@ -142,7 +141,14 @@ public void handleBatch(List> records, RecordCommitt throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); + + LOGGER.trace("Marking {} records as processed.", records.size()); + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + committer.markBatchFinished(); + LOGGER.trace("Batch marked finished"); } private PubsubMessage buildPubSubMessage(ChangeEvent record) {