Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn] Fix SnapshotSegmentAbortedTxnProcessorImpl thread safety issue #2

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess
* Stored the unsealed aborted transaction IDs Whose size is always less than the snapshotSegmentCapacity.
* It will be persistent as a snapshot segment when its size reach the configured capacity.
*/
private LinkedList<TxnID> unsealedTxnIds;
private volatile LinkedList<TxnID> unsealedTxnIds;

/**
* The map is used to clear the aborted transaction IDs persistent in the expired ledger.
Expand Down Expand Up @@ -372,15 +372,18 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob
If there is no segment index, the persistent worker will write segment begin from 0.
*/
if (indexes.size() != 0) {
persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1);
PositionImpl lastKey = indexes.lastKey();
persistentWorker.sequenceID.set(indexes.get(lastKey).sequenceID + 1);
}
/*
Append the aborted txn IDs in the index metadata
can keep the order of the aborted txn in the aborts.
So that we can trim the expired snapshot segment in aborts
according to the latest transaction IDs in the segmentIndex.
*/
unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
synchronized (unsealedTxnIds) {
unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
}
return CompletableFuture.completedFuture(finalStartReadCursorPosition);
}).exceptionally(ex -> {
log.error("[{}] Failed to recover snapshot segment", this.topic.getName(), ex);
Expand Down Expand Up @@ -462,7 +465,7 @@ public CompletableFuture<Void> clearAbortedTxnSnapshot() {
persistentWorker::clearSnapshotSegmentAndIndexes);
}

public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
public synchronized TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
transactionBufferStats.totalAbortedTransactions = this.aborts.size();
transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
Expand Down Expand Up @@ -496,7 +499,8 @@ private void handleSnapshotSegmentEntry(Entry entry) {
TransactionBufferSnapshotSegment snapshotSegment = Schema.AVRO(TransactionBufferSnapshotSegment.class)
.decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());

TxnIDData lastTxn = snapshotSegment.getAborts().get(snapshotSegment.getAborts().size() - 1);
int size = snapshotSegment.getAborts().size();
TxnIDData lastTxn = snapshotSegment.getAborts().get(size - 1);
segmentIndex.put(new PositionImpl(snapshotSegment.getPersistentPositionLedgerId(),
snapshotSegment.getPersistentPositionEntryId()),
new TxnID(lastTxn.getMostSigBits(), lastTxn.getLeastSigBits()));
Expand Down Expand Up @@ -719,7 +723,7 @@ private CompletableFuture<Void> takeSnapshotSegmentAsync(LinkedList<TxnID> seale
return res;
}

private CompletableFuture<Void> writeSnapshotSegmentAsync(LinkedList<TxnID> segment,
private synchronized CompletableFuture<Void> writeSnapshotSegmentAsync(LinkedList<TxnID> segment,
PositionImpl abortedMarkerPersistentPosition) {
TransactionBufferSnapshotSegment transactionBufferSnapshotSegment = new TransactionBufferSnapshotSegment();
transactionBufferSnapshotSegment.setAborts(convertTypeToTxnIDData(segment));
Expand Down Expand Up @@ -881,4 +885,4 @@ private List<TxnIDData> convertTypeToTxnIDData(List<TxnID> abortedTxns) {
return segment;
}

}
}
Loading