From d7019c40a77cce58225c9f5318d375421dda368f Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Wed, 20 Mar 2024 15:45:27 +0530 Subject: [PATCH 1/4] Removed thread safety issues in SnapshotSegmentAbortedTxnProcessorImpl.java --- .../impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index 385500dfbe9e7..d7323761d74b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -372,7 +372,8 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob If there is no segment index, the persistent worker will write segment begin from 0. */ if (indexes.size() != 0) { - persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1); + PositionImpl lastKey = indexes.lastKey(); + persistentWorker.sequenceID.set(indexes.get(lastKey).sequenceID + 1); } /* Append the aborted txn IDs in the index metadata @@ -496,7 +497,8 @@ private void handleSnapshotSegmentEntry(Entry entry) { TransactionBufferSnapshotSegment snapshotSegment = Schema.AVRO(TransactionBufferSnapshotSegment.class) .decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer()); - TxnIDData lastTxn = snapshotSegment.getAborts().get(snapshotSegment.getAborts().size() - 1); + long size = snapshotSegment.getAborts().size(); + TxnIDData lastTxn = snapshotSegment.getAborts().get(size - 1); segmentIndex.put(new PositionImpl(snapshotSegment.getPersistentPositionLedgerId(), snapshotSegment.getPersistentPositionEntryId()), new TxnID(lastTxn.getMostSigBits(), lastTxn.getLeastSigBits())); @@ -881,4 +883,4 @@ private List convertTypeToTxnIDData(List abortedTxns) { return segment; } -} \ No newline at end of file +} From 64a4a572c332fb0269eb2b85988dd15f56eb880e Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Thu, 21 Mar 2024 16:20:24 +0530 Subject: [PATCH 2/4] Update SnapshotSegmentAbortedTxnProcessorImpl.java --- .../impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index d7323761d74b9..20068dfc16ac6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -75,7 +75,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. * It will be persistent as a snapshot segment when its size reach the configured capacity. */ - private LinkedList unsealedTxnIds; + private volatile LinkedList unsealedTxnIds; /** * The map is used to clear the aborted transaction IDs persistent in the expired ledger. @@ -381,7 +381,9 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob So that we can trim the expired snapshot segment in aborts according to the latest transaction IDs in the segmentIndex. */ - unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID)); + synchronized(unsealedTxnIds){ + unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID)); + } return CompletableFuture.completedFuture(finalStartReadCursorPosition); }).exceptionally(ex -> { log.error("[{}] Failed to recover snapshot segment", this.topic.getName(), ex); @@ -463,7 +465,7 @@ public CompletableFuture clearAbortedTxnSnapshot() { persistentWorker::clearSnapshotSegmentAndIndexes); } - public TransactionBufferStats generateSnapshotStats(boolean segmentStats) { + public synchronized TransactionBufferStats generateSnapshotStats(boolean segmentStats) { TransactionBufferStats transactionBufferStats = new TransactionBufferStats(); transactionBufferStats.totalAbortedTransactions = this.aborts.size(); transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps; @@ -721,7 +723,7 @@ private CompletableFuture takeSnapshotSegmentAsync(LinkedList seale return res; } - private CompletableFuture writeSnapshotSegmentAsync(LinkedList segment, + private synchronized CompletableFuture writeSnapshotSegmentAsync(LinkedList segment, PositionImpl abortedMarkerPersistentPosition) { TransactionBufferSnapshotSegment transactionBufferSnapshotSegment = new TransactionBufferSnapshotSegment(); transactionBufferSnapshotSegment.setAborts(convertTypeToTxnIDData(segment)); From 282d9a6b0803e224e6fb57f8512cd539a0204511 Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Fri, 22 Mar 2024 13:32:13 +0530 Subject: [PATCH 3/4] Update SnapshotSegmentAbortedTxnProcessorImpl.java --- .../buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index 20068dfc16ac6..d5dbe083d9537 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -381,7 +381,7 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob So that we can trim the expired snapshot segment in aborts according to the latest transaction IDs in the segmentIndex. */ - synchronized(unsealedTxnIds){ + synchronized (unsealedTxnIds) { unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID)); } return CompletableFuture.completedFuture(finalStartReadCursorPosition); From 9158036f63b27e03fdb217887d7660c8608fcf0f Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Fri, 22 Mar 2024 22:53:11 +0530 Subject: [PATCH 4/4] Update SnapshotSegmentAbortedTxnProcessorImpl.java --- .../buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index d5dbe083d9537..69ba6c01d74d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -499,7 +499,7 @@ private void handleSnapshotSegmentEntry(Entry entry) { TransactionBufferSnapshotSegment snapshotSegment = Schema.AVRO(TransactionBufferSnapshotSegment.class) .decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer()); - long size = snapshotSegment.getAborts().size(); + int size = snapshotSegment.getAborts().size(); TxnIDData lastTxn = snapshotSegment.getAborts().get(size - 1); segmentIndex.put(new PositionImpl(snapshotSegment.getPersistentPositionLedgerId(), snapshotSegment.getPersistentPositionEntryId()),