diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index 385500dfbe9e7..69ba6c01d74d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -75,7 +75,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess * Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity. * It will be persistent as a snapshot segment when its size reach the configured capacity. */ - private LinkedList unsealedTxnIds; + private volatile LinkedList unsealedTxnIds; /** * The map is used to clear the aborted transaction IDs persistent in the expired ledger. @@ -372,7 +372,8 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob If there is no segment index, the persistent worker will write segment begin from 0. */ if (indexes.size() != 0) { - persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1); + PositionImpl lastKey = indexes.lastKey(); + persistentWorker.sequenceID.set(indexes.get(lastKey).sequenceID + 1); } /* Append the aborted txn IDs in the index metadata @@ -380,7 +381,9 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob So that we can trim the expired snapshot segment in aborts according to the latest transaction IDs in the segmentIndex. */ - unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID)); + synchronized (unsealedTxnIds) { + unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID)); + } return CompletableFuture.completedFuture(finalStartReadCursorPosition); }).exceptionally(ex -> { log.error("[{}] Failed to recover snapshot segment", this.topic.getName(), ex); @@ -462,7 +465,7 @@ public CompletableFuture clearAbortedTxnSnapshot() { persistentWorker::clearSnapshotSegmentAndIndexes); } - public TransactionBufferStats generateSnapshotStats(boolean segmentStats) { + public synchronized TransactionBufferStats generateSnapshotStats(boolean segmentStats) { TransactionBufferStats transactionBufferStats = new TransactionBufferStats(); transactionBufferStats.totalAbortedTransactions = this.aborts.size(); transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps; @@ -496,7 +499,8 @@ private void handleSnapshotSegmentEntry(Entry entry) { TransactionBufferSnapshotSegment snapshotSegment = Schema.AVRO(TransactionBufferSnapshotSegment.class) .decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer()); - TxnIDData lastTxn = snapshotSegment.getAborts().get(snapshotSegment.getAborts().size() - 1); + int size = snapshotSegment.getAborts().size(); + TxnIDData lastTxn = snapshotSegment.getAborts().get(size - 1); segmentIndex.put(new PositionImpl(snapshotSegment.getPersistentPositionLedgerId(), snapshotSegment.getPersistentPositionEntryId()), new TxnID(lastTxn.getMostSigBits(), lastTxn.getLeastSigBits())); @@ -719,7 +723,7 @@ private CompletableFuture takeSnapshotSegmentAsync(LinkedList seale return res; } - private CompletableFuture writeSnapshotSegmentAsync(LinkedList segment, + private synchronized CompletableFuture writeSnapshotSegmentAsync(LinkedList segment, PositionImpl abortedMarkerPersistentPosition) { TransactionBufferSnapshotSegment transactionBufferSnapshotSegment = new TransactionBufferSnapshotSegment(); transactionBufferSnapshotSegment.setAborts(convertTypeToTxnIDData(segment)); @@ -881,4 +885,4 @@ private List convertTypeToTxnIDData(List abortedTxns) { return segment; } -} \ No newline at end of file +}