Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory store leader election - REAP-2 #1533

Merged
merged 8 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,7 @@ private void abortSegmentsWithNoLeaderNonIncremental(RepairRun repairRun, Collec
if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) {
// When multiple Reapers are in use, we can get stuck segments when one instance is rebooted
// Any segment in RUNNING or STARTED state but with no leader should be killed
Set<UUID> leaders = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).getLockedSegmentsForRun(repairRun.getId())
: Collections.emptySet();
Set<UUID> leaders = context.storage.getLockedSegmentsForRun(repairRun.getId());

Collection<RepairSegment> orphanedSegments = runningSegments
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,10 +869,8 @@ private boolean takeLead(RepairSegment segment) {
? ((IDistributedStorage) context.storage).takeLead(leaderElectionId)
: true;
} else {
result = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet())
: true;
result = context.storage.lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
}
if (!result) {
context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "takeLead", "failed")).inc();
Expand All @@ -895,10 +893,8 @@ private boolean renewLead(RepairSegment segment) {
}
return result;
} else {
boolean resultLock2 = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet())
: true;
boolean resultLock2 = context.storage.renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
if (!resultLock2) {
context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "renewLead", "failed")).inc();
releaseLead(segment);
Expand All @@ -912,13 +908,14 @@ private boolean renewLead(RepairSegment segment) {
private void releaseLead(RepairSegment segment) {
try (Timer.Context cx
= context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "releaseLead")).time()) {
if (context.storage instanceof IDistributedStorage) {
if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) {

if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) {
if (context.storage instanceof IDistributedStorage) {
((IDistributedStorage) context.storage).releaseLead(leaderElectionId);
} else {
((IDistributedStorage) context.storage).releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
}
} else {
context.storage.releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.cassandrareaper.storage.operations.IOperationsDao;

import java.util.List;
import java.util.Set;
import java.util.UUID;


Expand All @@ -62,23 +61,6 @@ public interface IDistributedStorage extends IDistributedMetrics {

void releaseLead(UUID leaderId);

boolean lockRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean renewRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean releaseRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

Set<UUID> getLockedSegmentsForRun(UUID runId);

int countRunningReapers();

List<UUID> getRunningReapers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import io.cassandrareaper.storage.repairunit.IRepairUnitDao;
import io.cassandrareaper.storage.snapshot.ISnapshotDao;

import java.util.Set;
import java.util.UUID;

import io.dropwizard.lifecycle.Managed;

/**
Expand All @@ -34,6 +37,23 @@
public interface IStorageDao extends Managed,
IMetricsDao {

boolean lockRunningRepairsForNodes(
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean renewRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean releaseRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

Set<UUID> getLockedSegmentsForRun(UUID runId);

boolean isStorageConnected();

IEventsDao getEventsDao();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.cassandrareaper.storage.events.IEventsDao;
import io.cassandrareaper.storage.events.MemoryEventsDao;
import io.cassandrareaper.storage.memory.MemoryStorageRoot;
import io.cassandrareaper.storage.memory.ReplicaLockManagerWithTtl;
import io.cassandrareaper.storage.metrics.MemoryMetricsDao;
import io.cassandrareaper.storage.repairrun.IRepairRunDao;
import io.cassandrareaper.storage.repairrun.MemoryRepairRunDao;
Expand All @@ -46,9 +47,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import com.google.common.io.Files;
import org.eclipse.serializer.persistence.types.PersistenceFieldEvaluator;
import org.eclipse.store.storage.embedded.types.EmbeddedStorage;
import org.eclipse.store.storage.embedded.types.EmbeddedStorageManager;
Expand All @@ -61,12 +64,13 @@
*/
public final class MemoryStorageFacade implements IStorageDao {

private static final long DEFAULT_LEAD_TIME = 90;
private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class);

/** Field evaluator to find transient attributes. This is needed to deal with persisting Guava collections objects
* that sometimes use the transient keyword for some of their implementation's backing stores**/
private static final PersistenceFieldEvaluator TRANSIENT_FIELD_EVALUATOR =
(clazz, field) -> !field.getName().startsWith("_");
private static final UUID REAPER_INSTANCE_ID = UUID.randomUUID();

private final EmbeddedStorageManager embeddedStorage;
private final MemoryStorageRoot memoryStorageRoot;
Expand All @@ -85,8 +89,9 @@ public final class MemoryStorageFacade implements IStorageDao {
);
private final MemorySnapshotDao memSnapshotDao = new MemorySnapshotDao();
private final MemoryMetricsDao memMetricsDao = new MemoryMetricsDao();
private final ReplicaLockManagerWithTtl repairRunLockManager;
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved

public MemoryStorageFacade(String persistenceStoragePath) {
public MemoryStorageFacade(String persistenceStoragePath, long leadTime) {
LOG.info("Using memory storage backend. Persistence storage path: {}", persistenceStoragePath);
this.embeddedStorage = EmbeddedStorage.Foundation(Paths.get(persistenceStoragePath))
.onConnectionFoundation(
Expand All @@ -103,10 +108,19 @@ public MemoryStorageFacade(String persistenceStoragePath) {
LOG.info("Loading existing data from persistence storage");
this.memoryStorageRoot = (MemoryStorageRoot) this.embeddedStorage.root();
}
this.repairRunLockManager = new ReplicaLockManagerWithTtl(leadTime);
}

public MemoryStorageFacade() {
this("/tmp/" + UUID.randomUUID().toString());
this(Files.createTempDir().getAbsolutePath(), DEFAULT_LEAD_TIME);
}

public MemoryStorageFacade(String persistenceStoragePath) {
this(persistenceStoragePath, DEFAULT_LEAD_TIME);
}

public MemoryStorageFacade(long leadTime) {
this(Files.createTempDir().getAbsolutePath(), leadTime);
}

@Override
Expand Down Expand Up @@ -296,4 +310,25 @@ public Collection<RepairSegment> getRepairSegmentsByRunId(UUID runId) {
public Map<UUID, DiagEventSubscription> getSubscriptionsById() {
return this.memoryStorageRoot.getSubscriptionsById();
}

@Override
public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return repairRunLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return repairRunLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
LOG.info("Releasing locks for runId: {}, segmentId: {}, replicas: {}", runId, segmentId, replicas);
return repairRunLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public Set<UUID> getLockedSegmentsForRun(UUID runId) {
return repairRunLockManager.getLockedSegmentsForRun(runId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright 2024-2024 DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.storage.memory;

import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;

public class ReplicaLockManagerWithTtl {

private final ConcurrentHashMap<String, LockInfo> replicaLocks = new ConcurrentHashMap<>();
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
private final ConcurrentHashMap<UUID, Set<UUID>> runToSegmentLocks = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved

private final long ttlSeconds; // 1 minute

public ReplicaLockManagerWithTtl(long ttlSeconds) {
this.ttlSeconds = ttlSeconds;
// Schedule cleanup of expired locks
scheduler.scheduleAtFixedRate(this::cleanupExpiredLocks, 1, 1, TimeUnit.SECONDS);
}

private String getReplicaLockKey(String replica, UUID runId) {
return replica + runId;
}

public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
lock.lock();
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
try {
long currentTime = System.currentTimeMillis();
// Check if any replica is already locked by another runId
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
for (String replica : replicas) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think the pattern in this codebase is to the use the streams API instead of for loops where possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point, I'm refactoring this to use streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might have missed this one in your latest commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're good now, no more loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is still a loop here for (String replica : replicas) { was this something you wanted to change?

LockInfo lockInfo = replicaLocks.get(getReplicaLockKey(replica, runId));
if (lockInfo != null && lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId)) {
return false; // Replica is locked by another runId and not expired
}
}

// Lock the replicas for the given runId and segmentId
long expirationTime = currentTime + (ttlSeconds * 1000);
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
for (String replica : replicas) {
replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, expirationTime));
}

// Update runId to segmentId mapping
runToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId);
return true;
} finally {
lock.unlock();
}
}

public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
lock.lock();
try {
long currentTime = System.currentTimeMillis();

// Check if all replicas are already locked by this runId
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(getReplicaLockKey(replica, runId));
if (lockInfo == null || !lockInfo.runId.equals(runId) || lockInfo.expirationTime <= currentTime) {
return false; // Some replica is not validly locked by this runId
}
}

// Renew the lock by extending the expiration time
long newExpirationTime = currentTime + (ttlSeconds * 1000);
for (String replica : replicas) {
replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, newExpirationTime));
}

// Ensure the segmentId is linked to the runId
runToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId);
return true;
} finally {
lock.unlock();
}
}

public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
lock.lock();
try {
// Remove the lock for replicas
for (String replica : replicas) {
LockInfo lockInfo = replicaLocks.get(getReplicaLockKey(replica, runId));
if (lockInfo != null && lockInfo.runId.equals(runId)) {
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
replicaLocks.remove(getReplicaLockKey(replica, runId));
}
}

// Remove the segmentId from the runId mapping
Set<UUID> segments = runToSegmentLocks.get(runId);
if (segments != null) {
segments.remove(segmentId);
if (segments.isEmpty()) {
runToSegmentLocks.remove(runId);
}
}
return true;
} finally {
lock.unlock();
}
}

public Set<UUID> getLockedSegmentsForRun(UUID runId) {
return runToSegmentLocks.getOrDefault(runId, Collections.emptySet());
}

@VisibleForTesting
public void cleanupExpiredLocks() {
lock.lock();
try {
long currentTime = System.currentTimeMillis();

// Remove expired locks from replicaLocks
replicaLocks.entrySet().removeIf(entry -> entry.getValue().expirationTime <= currentTime);

// Clean up runToSegmentLocks by removing segments with no active replicas
runToSegmentLocks.entrySet().removeIf(entry -> {
UUID runId = entry.getKey();
Set<UUID> segments = entry.getValue();

// Retain only active segments
segments.removeIf(segmentId -> {
boolean active = replicaLocks.values().stream()
.anyMatch(info -> info.runId.equals(runId));
return !active;
});
return segments.isEmpty();
});
} finally {
lock.unlock();
}
}

// Class to store lock information
private static class LockInfo {
UUID runId;
long expirationTime;

LockInfo(UUID runId, long expirationTime) {
this.runId = runId;
this.expirationTime = expirationTime;
}
}
}
Loading
Loading