diff --git a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java index fdf5df234..63f461eb2 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java +++ b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java @@ -308,9 +308,7 @@ private void abortSegmentsWithNoLeaderNonIncremental(RepairRun repairRun, Collec if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) { // When multiple Reapers are in use, we can get stuck segments when one instance is rebooted // Any segment in RUNNING or STARTED state but with no leader should be killed - Set leaders = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).getLockedSegmentsForRun(repairRun.getId()) - : Collections.emptySet(); + Set leaders = context.storage.getLockedSegmentsForRun(repairRun.getId()); Collection orphanedSegments = runningSegments .stream() diff --git a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java index 68bf551e8..5bd30fe99 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java +++ b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java @@ -869,10 +869,8 @@ private boolean takeLead(RepairSegment segment) { ? ((IDistributedStorage) context.storage).takeLead(leaderElectionId) : true; } else { - result = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()) - : true; + result = context.storage.lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); } if (!result) { context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "takeLead", "failed")).inc(); @@ -895,10 +893,8 @@ private boolean renewLead(RepairSegment segment) { } return result; } else { - boolean resultLock2 = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()) - : true; + boolean resultLock2 = context.storage.renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); if (!resultLock2) { context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "renewLead", "failed")).inc(); releaseLead(segment); @@ -912,13 +908,14 @@ private boolean renewLead(RepairSegment segment) { private void releaseLead(RepairSegment segment) { try (Timer.Context cx = context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "releaseLead")).time()) { - if (context.storage instanceof IDistributedStorage) { - if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) { + + if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) { + if (context.storage instanceof IDistributedStorage) { ((IDistributedStorage) context.storage).releaseLead(leaderElectionId); - } else { - ((IDistributedStorage) context.storage).releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()); } + } else { + context.storage.releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); } } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java index 298bf825b..fc48dd6b9 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java @@ -41,7 +41,6 @@ import io.cassandrareaper.storage.operations.IOperationsDao; import java.util.List; -import java.util.Set; import java.util.UUID; @@ -62,23 +61,6 @@ public interface IDistributedStorage extends IDistributedMetrics { void releaseLead(UUID leaderId); - boolean lockRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - boolean renewRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - boolean releaseRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - Set getLockedSegmentsForRun(UUID runId); - int countRunningReapers(); List getRunningReapers(); diff --git a/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java b/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java index e91997e7d..93b7b3cb8 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java @@ -63,9 +63,8 @@ */ public final class MemoryStorageFacade implements IStorageDao { - private static final long LEAD_TIME = 90; + private static final long DEFAULT_LEAD_TIME = 90; private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class); - private static final ReplicaLockManagerWithTtl REPAIR_RUN_LOCK_MANAGER = new ReplicaLockManagerWithTtl(LEAD_TIME); /** Field evaluator to find transient attributes. This is needed to deal with persisting Guava collections objects * that sometimes use the transient keyword for some of their implementation's backing stores**/ private static final PersistenceFieldEvaluator TRANSIENT_FIELD_EVALUATOR = @@ -89,8 +88,9 @@ public final class MemoryStorageFacade implements IStorageDao { ); private final MemorySnapshotDao memSnapshotDao = new MemorySnapshotDao(); private final MemoryMetricsDao memMetricsDao = new MemoryMetricsDao(); + private final ReplicaLockManagerWithTtl repairRunLockManager; - public MemoryStorageFacade(String persistenceStoragePath) { + public MemoryStorageFacade(String persistenceStoragePath, long leadTime) { LOG.info("Using memory storage backend. Persistence storage path: {}", persistenceStoragePath); this.embeddedStorage = EmbeddedStorage.Foundation(Paths.get(persistenceStoragePath)) .onConnectionFoundation( @@ -107,10 +107,19 @@ public MemoryStorageFacade(String persistenceStoragePath) { LOG.info("Loading existing data from persistence storage"); this.memoryStorageRoot = (MemoryStorageRoot) this.embeddedStorage.root(); } + this.repairRunLockManager = new ReplicaLockManagerWithTtl(leadTime); } public MemoryStorageFacade() { - this("/tmp/" + UUID.randomUUID().toString()); + this("/tmp/" + UUID.randomUUID().toString(), DEFAULT_LEAD_TIME); + } + + public MemoryStorageFacade(String persistenceStoragePath) { + this(persistenceStoragePath, DEFAULT_LEAD_TIME); + } + + public MemoryStorageFacade(long leadTime) { + this("/tmp/" + UUID.randomUUID().toString(), leadTime); } @Override @@ -303,21 +312,21 @@ public Map getSubscriptionsById() { @Override public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { - return REPAIR_RUN_LOCK_MANAGER.lockRunningRepairsForNodes(runId, segmentId, replicas); + return repairRunLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas); } @Override public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { - return REPAIR_RUN_LOCK_MANAGER.renewRunningRepairsForNodes(runId, segmentId, replicas); + return repairRunLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas); } @Override public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { - return REPAIR_RUN_LOCK_MANAGER.releaseRunningRepairsForNodes(runId, segmentId, replicas); + return repairRunLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas); } @Override public Set getLockedSegmentsForRun(UUID runId) { - return REPAIR_RUN_LOCK_MANAGER.getLockedSegmentsForRun(runId); + return repairRunLockManager.getLockedSegmentsForRun(runId); } } diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java index 412191e9b..f8992240d 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java @@ -26,7 +26,6 @@ import io.cassandrareaper.core.RepairUnit; import io.cassandrareaper.core.Segment; import io.cassandrareaper.management.ClusterFacade; -import io.cassandrareaper.storage.IDistributedStorage; import io.cassandrareaper.storage.IStorageDao; import io.cassandrareaper.storage.cassandra.CassandraStorageFacade; import io.cassandrareaper.storage.cluster.IClusterDao; @@ -144,7 +143,7 @@ public void abortRunningSegmentWithNoLeader() throws ReaperException, Interrupte Mockito.doNothing().when(context.repairManager).abortSegments(any(), any()); Mockito.doReturn(run).when(context.repairManager).startRepairRun(run); - when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet()); + when(context.storage.getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet()); IRepairUnitDao mockedRepairUnitDao = mock(IRepairUnitDao.class); Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao); Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf); @@ -238,7 +237,7 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao); Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf); - when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn( + when(context.storage.getLockedSegmentsForRun(any())).thenReturn( new HashSet(Arrays.asList(segment.getId()))); context.repairManager.resumeRunningRepairRuns(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java index afbd073f8..b6de95266 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java @@ -83,6 +83,7 @@ public final class RepairRunnerHangingTest { + private static final long LEAD_TIME = 1L; private static final Logger LOG = LoggerFactory.getLogger(RepairRunnerHangingTest.class); private static final Set TABLES = ImmutableSet.of("table1"); private static final List THREE_TOKENS = Lists.newArrayList( @@ -242,7 +243,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM final double intensity = 0.5f; final int repairThreadCount = 1; final int segmentTimeout = 1; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); storage.getClusterDao().addCluster(cluster); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( RepairUnit.builder() @@ -397,7 +398,7 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti final double intensity = 0.5f; final int repairThreadCount = 1; final int segmentTimeout = 1; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); storage.getClusterDao().addCluster(cluster); DateTimeUtils.setCurrentMillisFixed(timeRun); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( @@ -553,7 +554,7 @@ public void testDontFailRepairAfterTopologyChangeIncrementalRepair() throws Inte final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java index b7f16e250..d10a2f2a1 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java @@ -99,6 +99,7 @@ import static org.mockito.Mockito.when; public final class RepairRunnerTest { + private static final long LEAD_TIME = 1L; private static final Set TABLES = ImmutableSet.of("table1"); private static final Duration POLL_INTERVAL = Duration.TWO_SECONDS; private static final List THREE_TOKENS = Lists.newArrayList( @@ -223,7 +224,7 @@ public void testResumeRepair() throws InterruptedException, ReaperException, Mal final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -333,8 +334,10 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); context.repairManager.resumeRunningRepairRuns(); - Thread.sleep(1000); - assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + + await().with().pollInterval(POLL_INTERVAL).atMost(30, SECONDS).until(() -> { + return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); } @Test(expected = ConditionTimeoutException.class) @@ -353,7 +356,7 @@ public void testTooManyPendingCompactions() final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -546,7 +549,7 @@ public void testDontFailRepairAfterTopologyChange() throws InterruptedException, final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -667,7 +670,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept context.repairManager.resumeRunningRepairRuns(); // The repair run should succeed despite the topology change. - await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + await().with().atMost(60, TimeUnit.SECONDS).until(() -> { return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); }); } @@ -687,7 +690,7 @@ public void testSubrangeIncrementalRepair() throws InterruptedException, ReaperE final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -962,7 +965,7 @@ public void getNodeMetricsInLocalDcAvailabilityForLocalDcNodeTest() throws Excep final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TIME); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java index c4cdbff11..a533df458 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java @@ -193,6 +193,7 @@ public JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -348,6 +349,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -496,6 +498,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -639,6 +642,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -782,6 +786,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -927,6 +932,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1073,6 +1079,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1205,6 +1212,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1300,6 +1308,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);