Skip to content

Commit

Permalink
Fix release lead on segments
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Dec 9, 2024
1 parent 83bde15 commit d0fb42c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<Strin

@Override
public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
LOG.info("Releasing locks for runId: {}, segmentId: {}, replicas: {}", runId, segmentId, replicas);
return repairRunLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ public ReplicaLockManagerWithTtl(long ttlSeconds) {
scheduler.scheduleAtFixedRate(this::cleanupExpiredLocks, 1, 1, TimeUnit.SECONDS);
}

private String getReplicaLockKey(String replica, UUID runId) {
return replica + runId;
}

public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
lock.lock();
try {
long currentTime = System.currentTimeMillis();
// Check if any replica is already locked by another runId
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(replica + runId);
LockInfo lockInfo = replicaLocks.get(getReplicaLockKey(replica, runId));
if (lockInfo != null && lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId)) {
return false; // Replica is locked by another runId and not expired
}
Expand All @@ -58,7 +62,7 @@ public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String
// Lock the replicas for the given runId and segmentId
long expirationTime = currentTime + (ttlSeconds * 1000);
for (String replica : replicas) {
replicaLocks.put(replica + runId, new LockInfo(runId, expirationTime));
replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, expirationTime));
}

// Update runId to segmentId mapping
Expand All @@ -76,7 +80,7 @@ public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<Strin

// Check if all replicas are already locked by this runId
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(replica + runId);
LockInfo lockInfo = replicaLocks.get(getReplicaLockKey(replica, runId));
if (lockInfo == null || !lockInfo.runId.equals(runId) || lockInfo.expirationTime <= currentTime) {
return false; // Some replica is not validly locked by this runId
}
Expand All @@ -85,7 +89,7 @@ public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<Strin
// Renew the lock by extending the expiration time
long newExpirationTime = currentTime + (ttlSeconds * 1000);
for (String replica : replicas) {
replicaLocks.put(replica + runId, new LockInfo(runId, newExpirationTime));
replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, newExpirationTime));
}

// Ensure the segmentId is linked to the runId
Expand All @@ -101,9 +105,9 @@ public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<Str
try {
// Remove the lock for replicas
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(replica);
LockInfo lockInfo = replicaLocks.get(getReplicaLockKey(replica, runId));
if (lockInfo != null && lockInfo.runId.equals(runId)) {
replicaLocks.remove(replica);
replicaLocks.remove(getReplicaLockKey(replica, runId));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId));
context.repairManager.resumeRunningRepairRuns();

await().with().atMost(20, TimeUnit.SECONDS).until(() -> {
await().with().atMost(120, TimeUnit.SECONDS).until(() -> {
return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState();
});
}
Expand Down

0 comments on commit d0fb42c

Please sign in to comment.