Skip to content

Commit

Permalink
Minimal rework and fix for all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Dec 5, 2024
1 parent 42c96b3 commit 83bde15
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,7 @@ private void abortSegmentsWithNoLeaderNonIncremental(RepairRun repairRun, Collec
if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) {
// When multiple Reapers are in use, we can get stuck segments when one instance is rebooted
// Any segment in RUNNING or STARTED state but with no leader should be killed
Set<UUID> leaders = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).getLockedSegmentsForRun(repairRun.getId())
: Collections.emptySet();
Set<UUID> leaders = context.storage.getLockedSegmentsForRun(repairRun.getId());

Collection<RepairSegment> orphanedSegments = runningSegments
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,10 +869,8 @@ private boolean takeLead(RepairSegment segment) {
? ((IDistributedStorage) context.storage).takeLead(leaderElectionId)
: true;
} else {
result = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet())
: true;
result = context.storage.lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
}
if (!result) {
context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "takeLead", "failed")).inc();
Expand All @@ -895,10 +893,8 @@ private boolean renewLead(RepairSegment segment) {
}
return result;
} else {
boolean resultLock2 = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet())
: true;
boolean resultLock2 = context.storage.renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
if (!resultLock2) {
context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "renewLead", "failed")).inc();
releaseLead(segment);
Expand All @@ -912,13 +908,14 @@ private boolean renewLead(RepairSegment segment) {
private void releaseLead(RepairSegment segment) {
try (Timer.Context cx
= context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "releaseLead")).time()) {
if (context.storage instanceof IDistributedStorage) {
if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) {

if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) {
if (context.storage instanceof IDistributedStorage) {
((IDistributedStorage) context.storage).releaseLead(leaderElectionId);
} else {
((IDistributedStorage) context.storage).releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
}
} else {
context.storage.releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
segment.getId(), segment.getReplicas().keySet());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.cassandrareaper.storage.operations.IOperationsDao;

import java.util.List;
import java.util.Set;
import java.util.UUID;


Expand All @@ -62,23 +61,6 @@ public interface IDistributedStorage extends IDistributedMetrics {

void releaseLead(UUID leaderId);

boolean lockRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean renewRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

boolean releaseRunningRepairsForNodes(
UUID repairId,
UUID segmentId,
Set<String> replicas);

Set<UUID> getLockedSegmentsForRun(UUID runId);

int countRunningReapers();

List<UUID> getRunningReapers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@
*/
public final class MemoryStorageFacade implements IStorageDao {

private static final long LEAD_TIME = 90;
private static final long DEFAULT_LEAD_TIME = 90;
private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class);
private static final ReplicaLockManagerWithTtl REPAIR_RUN_LOCK_MANAGER = new ReplicaLockManagerWithTtl(LEAD_TIME);
/** Field evaluator to find transient attributes. This is needed to deal with persisting Guava collections objects
* that sometimes use the transient keyword for some of their implementation's backing stores**/
private static final PersistenceFieldEvaluator TRANSIENT_FIELD_EVALUATOR =
Expand All @@ -89,8 +88,9 @@ public final class MemoryStorageFacade implements IStorageDao {
);
private final MemorySnapshotDao memSnapshotDao = new MemorySnapshotDao();
private final MemoryMetricsDao memMetricsDao = new MemoryMetricsDao();
private final ReplicaLockManagerWithTtl repairRunLockManager;

public MemoryStorageFacade(String persistenceStoragePath) {
public MemoryStorageFacade(String persistenceStoragePath, long leadTime) {
LOG.info("Using memory storage backend. Persistence storage path: {}", persistenceStoragePath);
this.embeddedStorage = EmbeddedStorage.Foundation(Paths.get(persistenceStoragePath))
.onConnectionFoundation(
Expand All @@ -107,10 +107,19 @@ public MemoryStorageFacade(String persistenceStoragePath) {
LOG.info("Loading existing data from persistence storage");
this.memoryStorageRoot = (MemoryStorageRoot) this.embeddedStorage.root();
}
this.repairRunLockManager = new ReplicaLockManagerWithTtl(leadTime);
}

public MemoryStorageFacade() {
this("/tmp/" + UUID.randomUUID().toString());
this("/tmp/" + UUID.randomUUID().toString(), DEFAULT_LEAD_TIME);
}

public MemoryStorageFacade(String persistenceStoragePath) {
this(persistenceStoragePath, DEFAULT_LEAD_TIME);
}

public MemoryStorageFacade(long leadTime) {
this("/tmp/" + UUID.randomUUID().toString(), leadTime);
}

@Override
Expand Down Expand Up @@ -303,21 +312,21 @@ public Map<UUID, DiagEventSubscription> getSubscriptionsById() {

@Override
public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return REPAIR_RUN_LOCK_MANAGER.lockRunningRepairsForNodes(runId, segmentId, replicas);
return repairRunLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return REPAIR_RUN_LOCK_MANAGER.renewRunningRepairsForNodes(runId, segmentId, replicas);
return repairRunLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
return REPAIR_RUN_LOCK_MANAGER.releaseRunningRepairsForNodes(runId, segmentId, replicas);
return repairRunLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas);
}

@Override
public Set<UUID> getLockedSegmentsForRun(UUID runId) {
return REPAIR_RUN_LOCK_MANAGER.getLockedSegmentsForRun(runId);
return repairRunLockManager.getLockedSegmentsForRun(runId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.core.Segment;
import io.cassandrareaper.management.ClusterFacade;
import io.cassandrareaper.storage.IDistributedStorage;
import io.cassandrareaper.storage.IStorageDao;
import io.cassandrareaper.storage.cassandra.CassandraStorageFacade;
import io.cassandrareaper.storage.cluster.IClusterDao;
Expand Down Expand Up @@ -144,7 +143,7 @@ public void abortRunningSegmentWithNoLeader() throws ReaperException, Interrupte
Mockito.doNothing().when(context.repairManager).abortSegments(any(), any());
Mockito.doReturn(run).when(context.repairManager).startRepairRun(run);

when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet());
when(context.storage.getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet());
IRepairUnitDao mockedRepairUnitDao = mock(IRepairUnitDao.class);
Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao);
Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf);
Expand Down Expand Up @@ -238,7 +237,7 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru
Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao);
Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf);

when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn(
when(context.storage.getLockedSegmentsForRun(any())).thenReturn(
new HashSet<UUID>(Arrays.asList(segment.getId())));

context.repairManager.resumeRunningRepairRuns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

public final class RepairRunnerHangingTest {

private static final long LEAD_TIME = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RepairRunnerHangingTest.class);
private static final Set<String> TABLES = ImmutableSet.of("table1");
private static final List<BigInteger> THREE_TOKENS = Lists.newArrayList(
Expand Down Expand Up @@ -242,7 +243,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM
final double intensity = 0.5f;
final int repairThreadCount = 1;
final int segmentTimeout = 1;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
storage.getClusterDao().addCluster(cluster);
RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
RepairUnit.builder()
Expand Down Expand Up @@ -397,7 +398,7 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti
final double intensity = 0.5f;
final int repairThreadCount = 1;
final int segmentTimeout = 1;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
storage.getClusterDao().addCluster(cluster);
DateTimeUtils.setCurrentMillisFixed(timeRun);
RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
Expand Down Expand Up @@ -553,7 +554,7 @@ public void testDontFailRepairAfterTopologyChangeIncrementalRepair() throws Inte
final int repairThreadCount = 1;
final int segmentTimeout = 30;
final List<BigInteger> tokens = THREE_TOKENS;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static org.mockito.Mockito.when;

public final class RepairRunnerTest {
private static final long LEAD_TIME = 1L;
private static final Set<String> TABLES = ImmutableSet.of("table1");
private static final Duration POLL_INTERVAL = Duration.TWO_SECONDS;
private static final List<BigInteger> THREE_TOKENS = Lists.newArrayList(
Expand Down Expand Up @@ -223,7 +224,7 @@ public void testResumeRepair() throws InterruptedException, ReaperException, Mal
final int repairThreadCount = 1;
final int segmentTimeout = 30;
final List<BigInteger> tokens = THREE_TOKENS;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
Expand Down Expand Up @@ -333,8 +334,10 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId));

context.repairManager.resumeRunningRepairRuns();
Thread.sleep(1000);
assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());

await().with().pollInterval(POLL_INTERVAL).atMost(30, SECONDS).until(() -> {
return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState();
});
}

@Test(expected = ConditionTimeoutException.class)
Expand All @@ -353,7 +356,7 @@ public void testTooManyPendingCompactions()
final int repairThreadCount = 1;
final int segmentTimeout = 30;
final List<BigInteger> tokens = THREE_TOKENS;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
Expand Down Expand Up @@ -546,7 +549,7 @@ public void testDontFailRepairAfterTopologyChange() throws InterruptedException,
final int repairThreadCount = 1;
final int segmentTimeout = 30;
final List<BigInteger> tokens = THREE_TOKENS;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
Expand Down Expand Up @@ -667,7 +670,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
context.repairManager.resumeRunningRepairRuns();

// The repair run should succeed despite the topology change.
await().with().atMost(20, TimeUnit.SECONDS).until(() -> {
await().with().atMost(60, TimeUnit.SECONDS).until(() -> {
return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState();
});
}
Expand All @@ -687,7 +690,7 @@ public void testSubrangeIncrementalRepair() throws InterruptedException, ReaperE
final int repairThreadCount = 1;
final int segmentTimeout = 30;
final List<BigInteger> tokens = THREE_TOKENS;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
Expand Down Expand Up @@ -962,7 +965,7 @@ public void getNodeMetricsInLocalDcAvailabilityForLocalDcNodeTest() throws Excep
final int repairThreadCount = 1;
final int segmentTimeout = 30;
final List<BigInteger> tokens = THREE_TOKENS;
final IStorageDao storage = new MemoryStorageFacade();
final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME);
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -348,6 +349,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -496,6 +498,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -639,6 +642,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -782,6 +786,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -927,6 +932,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -1073,6 +1079,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -1205,6 +1212,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down Expand Up @@ -1300,6 +1308,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
RepairRunner rr = mock(RepairRunner.class);
RepairUnit ru = mock(RepairUnit.class);
when(ru.getKeyspaceName()).thenReturn("reaper");
when(rr.getRepairRunId()).thenReturn(runId);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
Expand Down

0 comments on commit 83bde15

Please sign in to comment.