-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: mitigate the PartitionMigratingException
When a node leaves or joins the cluster hazelcast will rebalance the cluster. If we unregister or register Entity verticles when the hazelcast cluster is not in a safe state we get PartitionMigratingException's. This commit attempts to postpone the registration and unregistration until the cluster is in a safe state.
- Loading branch information
Showing
5 changed files
with
223 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
121 changes: 121 additions & 0 deletions
121
src/main/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistry.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package io.neonbee.internal.cluster.entity; | ||
|
||
import java.util.UUID; | ||
|
||
import com.hazelcast.partition.MigrationListener; | ||
import com.hazelcast.partition.MigrationState; | ||
import com.hazelcast.partition.PartitionService; | ||
import com.hazelcast.partition.ReplicaMigrationEvent; | ||
|
||
import io.neonbee.logging.LoggingFacade; | ||
import io.vertx.core.Future; | ||
import io.vertx.core.Promise; | ||
import io.vertx.core.Vertx; | ||
|
||
/** | ||
* This ClusterEntityRegistry adds a Hazelcast-specific behavior. | ||
* <p> | ||
* This implementation delays the method calls {@link #register(String, String)}, {@link #unregisterNode(String)} and | ||
* {@link #unregisterNode(String)} when the cluster is not in a safe state and executes the methdos when the Hazelcast | ||
* cluster partition migration process is complete. | ||
*/ | ||
public class HazelcastClusterEntityRegistry extends ClusterEntityRegistry { | ||
|
||
private final LoggingFacade logger = LoggingFacade.create(); | ||
|
||
private final PartitionService partitionService; | ||
|
||
/** | ||
* Create a new instance of {@link HazelcastClusterEntityRegistry}. | ||
* | ||
* @param vertx the {@link Vertx} instance | ||
* @param registryName the name of the map registry | ||
* @param partitionService the {@link PartitionService} | ||
*/ | ||
public HazelcastClusterEntityRegistry(Vertx vertx, String registryName, PartitionService partitionService) { | ||
super(vertx, registryName); | ||
this.partitionService = partitionService; | ||
} | ||
|
||
@Override | ||
public Future<Void> register(String sharedMapKey, String value) { | ||
return executeWhenClusterIsSafe("register key \"" + sharedMapKey + "\", value: \"" + value + "\"") | ||
.compose(unused -> super.register(sharedMapKey, value)); | ||
} | ||
|
||
@Override | ||
public Future<Void> unregister(String sharedMapKey, String value) { | ||
return executeWhenClusterIsSafe("unregister key \"" + sharedMapKey + "\", value: \"" + value + "\"") | ||
.compose(unused -> super.unregister(sharedMapKey, value)); | ||
} | ||
|
||
/** | ||
* Unregister all registered entities for a node by ID. | ||
* | ||
* @param clusterNodeId the ID of the cluster node | ||
* @return the future | ||
*/ | ||
@Override | ||
public Future<Void> unregisterNode(String clusterNodeId) { | ||
return executeWhenClusterIsSafe("unregister node with ID: " + clusterNodeId) | ||
.compose(unused -> super.unregisterNode(clusterNodeId)); | ||
} | ||
|
||
/** | ||
* Returns a future that is completed when the Cluster reaches a safe state. | ||
* | ||
* @param description a descriptive text that is appended to the log messages | ||
* @return the future | ||
*/ | ||
private Future<Void> executeWhenClusterIsSafe(String description) { | ||
Promise<ReplicaMigrationEvent> promise = Promise.promise(); | ||
executeWhenClusterIsSafe(promise, description); | ||
return promise.future() | ||
.onSuccess(event -> logger.info( | ||
"execute delayed execution for: \"{}\". The replica migration {} and took {} ms.", | ||
description, event.isSuccess() ? "completed" : "failed", event.getElapsedTime())) | ||
.mapEmpty(); | ||
} | ||
|
||
/** | ||
* This method delays the de-registration if the cluster is not in a save state. | ||
* <p> | ||
* | ||
* @param promise {@link Promise} to completed the function. | ||
* @param description a descriptive string that is appended to the log messages | ||
*/ | ||
private void executeWhenClusterIsSafe(Promise<ReplicaMigrationEvent> promise, String description) { | ||
if (partitionService.isClusterSafe()) { | ||
logger.info("Executing \"{}\"", description); | ||
promise.complete(); | ||
} else { | ||
UUID migrationListenerUuid = partitionService.addMigrationListener(new MigrationListener() { | ||
@Override | ||
public void migrationStarted(MigrationState state) { | ||
// not interested in | ||
} | ||
|
||
@Override | ||
public void migrationFinished(MigrationState state) { | ||
// not interested in | ||
} | ||
|
||
@Override | ||
public void replicaMigrationCompleted(ReplicaMigrationEvent event) { | ||
promise.complete(event); | ||
} | ||
|
||
@Override | ||
public void replicaMigrationFailed(ReplicaMigrationEvent event) { | ||
promise.complete(event); | ||
} | ||
}); | ||
|
||
logger.info("Added migration listener with UUID: {}", migrationListenerUuid); | ||
promise.future().onComplete(event -> { | ||
partitionService.removeMigrationListener(migrationListenerUuid); | ||
logger.info("Removed migration listener with UUID: {}", migrationListenerUuid); | ||
}); | ||
} | ||
} | ||
} |
80 changes: 80 additions & 0 deletions
80
src/test/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistryTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package io.neonbee.internal.cluster.entity; | ||
|
||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.doReturn; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
|
||
import java.util.UUID; | ||
import java.util.function.Function; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.ArgumentCaptor; | ||
|
||
import com.hazelcast.partition.MigrationListener; | ||
import com.hazelcast.partition.PartitionService; | ||
import com.hazelcast.partition.ReplicaMigrationEvent; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.core.Vertx; | ||
import io.vertx.junit5.VertxExtension; | ||
import io.vertx.junit5.VertxTestContext; | ||
|
||
@ExtendWith(VertxExtension.class) | ||
class HazelcastClusterEntityRegistryTest { | ||
|
||
@Test | ||
void register(Vertx vertx, VertxTestContext context) { | ||
testDelayedMethodExecution(vertx, context, "7be56c89-9b3d-452b-8009-cc801a1bcc35", | ||
registry -> registry.register("key", "value")); | ||
} | ||
|
||
@Test | ||
void unregister(Vertx vertx, VertxTestContext context) { | ||
testDelayedMethodExecution(vertx, context, "43ebcf4f-0416-4016-8aa8-a55a2c20338f", | ||
registry -> registry.unregister("key", "value")); | ||
} | ||
|
||
@Test | ||
void unregisterNode(Vertx vertx, VertxTestContext context) { | ||
testDelayedMethodExecution(vertx, context, "8d670ef8-6eaa-44d5-b589-ac72fce9e606", | ||
registry -> registry.unregisterNode("ClusterNodeID:0")); | ||
} | ||
|
||
private static void testDelayedMethodExecution(Vertx vertx, VertxTestContext context, String uuid, | ||
Function<HazelcastClusterEntityRegistry, Future<Void>> method) { | ||
UUID migrationListenerUuid = UUID.fromString(uuid); | ||
|
||
PartitionService partitionServiceMock = mock(PartitionService.class); | ||
when(partitionServiceMock.isClusterSafe()).thenReturn(Boolean.FALSE); | ||
when(partitionServiceMock.addMigrationListener(any())) | ||
.thenReturn(migrationListenerUuid); | ||
|
||
HazelcastClusterEntityRegistry registry = spy( | ||
new HazelcastClusterEntityRegistry(vertx, "unitTestRegistry", partitionServiceMock)); | ||
doReturn("ClusterNodeID:0").when(registry).getClusterNodeId(); | ||
|
||
method.apply(registry) | ||
.onSuccess(unused -> context.verify(() -> { | ||
// ensure that the MigrationListener is removed | ||
verify(partitionServiceMock, times(1)).removeMigrationListener(migrationListenerUuid); | ||
context.completeNow(); | ||
})) | ||
.onFailure(context::failNow); | ||
|
||
// get the MigrationListener | ||
ArgumentCaptor<MigrationListener> captor = ArgumentCaptor.forClass(MigrationListener.class); | ||
verify(partitionServiceMock).addMigrationListener(captor.capture()); | ||
MigrationListener migrationListener = captor.getValue(); | ||
|
||
// send the event, that the partition replica migration is completed | ||
ReplicaMigrationEvent event = mock(ReplicaMigrationEvent.class); | ||
when(event.getElapsedTime()).thenReturn(10L); | ||
when(event.isSuccess()).thenReturn(Boolean.TRUE); | ||
migrationListener.replicaMigrationCompleted(event); | ||
} | ||
} |