Skip to content

Commit

Permalink
Add TTLs to the lock manager implementation
Browse files Browse the repository at this point in the history
tmp
  • Loading branch information
adejanovski committed Dec 5, 2024
1 parent 81f6777 commit 42c96b3
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import io.cassandrareaper.storage.repairunit.IRepairUnitDao;
import io.cassandrareaper.storage.snapshot.ISnapshotDao;

import java.util.Set;
import java.util.UUID;

import io.dropwizard.lifecycle.Managed;

/**
Expand All @@ -34,6 +37,23 @@
public interface IStorageDao extends Managed,
IMetricsDao {

boolean lockRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean renewRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean releaseRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

Set<UUID> getLockedSegmentsForRun(UUID runId);

boolean isStorageConnected();

IEventsDao getEventsDao();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.cassandrareaper.storage.events.IEventsDao;
import io.cassandrareaper.storage.events.MemoryEventsDao;
import io.cassandrareaper.storage.memory.MemoryStorageRoot;
import io.cassandrareaper.storage.memory.ReplicaLockManagerWithTtl;
import io.cassandrareaper.storage.metrics.MemoryMetricsDao;
import io.cassandrareaper.storage.repairrun.IRepairRunDao;
import io.cassandrareaper.storage.repairrun.MemoryRepairRunDao;
Expand All @@ -46,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand All @@ -61,12 +63,14 @@
*/
public final class MemoryStorageFacade implements IStorageDao {

private static final long LEAD_TIME = 90;
private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class);

private static final ReplicaLockManagerWithTtl REPAIR_RUN_LOCK_MANAGER = new ReplicaLockManagerWithTtl(LEAD_TIME);
/** Field evaluator to find transient attributes. This is needed to deal with persisting Guava collections objects
* that sometimes use the transient keyword for some of their implementation's backing stores**/
private static final PersistenceFieldEvaluator TRANSIENT_FIELD_EVALUATOR =
(clazz, field) -> !field.getName().startsWith("_");
private static final UUID REAPER_INSTANCE_ID = UUID.randomUUID();

private final EmbeddedStorageManager embeddedStorage;
private final MemoryStorageRoot memoryStorageRoot;
Expand Down Expand Up @@ -296,4 +300,24 @@ public Collection<RepairSegment> getRepairSegmentsByRunId(UUID runId) {
public Map<UUID, DiagEventSubscription> getSubscriptionsById() {
return this.memoryStorageRoot.getSubscriptionsById();
}

@Override
public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return REPAIR_RUN_LOCK_MANAGER.lockRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return REPAIR_RUN_LOCK_MANAGER.renewRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return REPAIR_RUN_LOCK_MANAGER.releaseRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public Set<UUID> getLockedSegmentsForRun(UUID runId) {
return REPAIR_RUN_LOCK_MANAGER.getLockedSegmentsForRun(runId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright 2024-2024 DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.storage.memory;

import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;

public class ReplicaLockManagerWithTtl {

private final ConcurrentHashMap<String, LockInfo> replicaLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, Set<UUID>> runToSegmentLocks = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private final long ttlSeconds; // 1 minute

public ReplicaLockManagerWithTtl(long ttlSeconds) {
this.ttlSeconds = ttlSeconds;
// Schedule cleanup of expired locks
scheduler.scheduleAtFixedRate(this::cleanupExpiredLocks, 1, 1, TimeUnit.SECONDS);
}

public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
lock.lock();
try {
long currentTime = System.currentTimeMillis();
// Check if any replica is already locked by another runId
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(replica + runId);
if (lockInfo != null && lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId)) {
return false; // Replica is locked by another runId and not expired
}
}

// Lock the replicas for the given runId and segmentId
long expirationTime = currentTime + (ttlSeconds * 1000);
for (String replica : replicas) {
replicaLocks.put(replica + runId, new LockInfo(runId, expirationTime));
}

// Update runId to segmentId mapping
runToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId);
return true;
} finally {
lock.unlock();
}
}

public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
lock.lock();
try {
long currentTime = System.currentTimeMillis();

// Check if all replicas are already locked by this runId
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(replica + runId);
if (lockInfo == null || !lockInfo.runId.equals(runId) || lockInfo.expirationTime <= currentTime) {
return false; // Some replica is not validly locked by this runId
}
}

// Renew the lock by extending the expiration time
long newExpirationTime = currentTime + (ttlSeconds * 1000);
for (String replica : replicas) {
replicaLocks.put(replica + runId, new LockInfo(runId, newExpirationTime));
}

// Ensure the segmentId is linked to the runId
runToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId);
return true;
} finally {
lock.unlock();
}
}

public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
lock.lock();
try {
// Remove the lock for replicas
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(replica);
if (lockInfo != null && lockInfo.runId.equals(runId)) {
replicaLocks.remove(replica);
}
}

// Remove the segmentId from the runId mapping
Set<UUID> segments = runToSegmentLocks.get(runId);
if (segments != null) {
segments.remove(segmentId);
if (segments.isEmpty()) {
runToSegmentLocks.remove(runId);
}
}
return true;
} finally {
lock.unlock();
}
}

public Set<UUID> getLockedSegmentsForRun(UUID runId) {
return runToSegmentLocks.getOrDefault(runId, Collections.emptySet());
}

@VisibleForTesting
public void cleanupExpiredLocks() {
lock.lock();
try {
long currentTime = System.currentTimeMillis();

// Remove expired locks from replicaLocks
replicaLocks.entrySet().removeIf(entry -> entry.getValue().expirationTime <= currentTime);

// Clean up runToSegmentLocks by removing segments with no active replicas
runToSegmentLocks.entrySet().removeIf(entry -> {
UUID runId = entry.getKey();
Set<UUID> segments = entry.getValue();

// Retain only active segments
segments.removeIf(segmentId -> {
boolean active = replicaLocks.values().stream()
.anyMatch(info -> info.runId.equals(runId));
return !active;
});
return segments.isEmpty();
});
} finally {
lock.unlock();
}
}

// Class to store lock information
private static class LockInfo {
UUID runId;
long expirationTime;

LockInfo(UUID runId, long expirationTime) {
this.runId = runId;
this.expirationTime = expirationTime;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2014-2017 Spotify AB
* Copyright 2016-2019 The Last Pickle Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.storage.memory;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;


public class ReplicaLockManagerWithTtlTest {

private ReplicaLockManagerWithTtl replicaLockManager;
private UUID runId;
private UUID segmentId;
private Set<String> replicas;
private Set<String> replicasOverlap;

@BeforeEach
public void setUp() {
replicaLockManager = new ReplicaLockManagerWithTtl(1); // TTL of 60 seconds
runId = UUID.randomUUID();
segmentId = UUID.randomUUID();
replicas = new HashSet<>(Arrays.asList("replica1", "replica2", "replica3"));
replicasOverlap = new HashSet<>(Arrays.asList("replica4", "replica2", "replica5"));
}

@Test
public void testLockRunningRepairsForNodes() {
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
}

@Test
public void testLockRunningRepairsForNodesAlreadyLocked() {
UUID anotherRunId = UUID.randomUUID();
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
}

@Test
public void testRenewRunningRepairsForNodes() {
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
assertTrue(replicaLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas));
}

@Test
public void testRenewRunningRepairsForNodesNotLocked() {
assertFalse(replicaLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas));
}

@Test
public void testReleaseRunningRepairsForNodes() {
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
assertTrue(replicaLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas));
}

@Test
public void testGetLockedSegmentsForRun() {
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
Set<UUID> lockedSegments = replicaLockManager.getLockedSegmentsForRun(runId);
assertTrue(lockedSegments.contains(segmentId));
}

@Test
public void testCleanupExpiredLocks() throws InterruptedException {
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
// We can lock the replica for a different run id
assertTrue(replicaLockManager.lockRunningRepairsForNodes(UUID.randomUUID(), UUID.randomUUID(), replicas));
// The following lock should fail because overlapping replicas are already locked
assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, UUID.randomUUID(), replicasOverlap));
Thread.sleep(1000); // Wait for TTL to expire
replicaLockManager.cleanupExpiredLocks();
// The following lock should succeed as the lock expired
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicasOverlap));
}
}

0 comments on commit 42c96b3

Please sign in to comment.