Skip to content

Commit

Permalink
fix: mitigate the PartitionMigratingException
Browse files Browse the repository at this point in the history
When a node leaves or joins the cluster hazelcast will rebalance the
cluster. If we unregister or register Entity verticles when the
hazelcast cluster is not in a safe state we get
PartitionMigratingException's. This commit attempts to postpone the
registration and unregistration until the cluster is in a safe state.
  • Loading branch information
halber committed May 11, 2023
1 parent d8b237a commit 87c90cb
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 4 deletions.
18 changes: 16 additions & 2 deletions src/main/java/io/neonbee/NeonBee.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.partition.PartitionService;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
Expand Down Expand Up @@ -62,6 +64,7 @@
import io.neonbee.internal.buffer.ImmutableBuffer;
import io.neonbee.internal.cluster.ClusterHelper;
import io.neonbee.internal.cluster.entity.ClusterEntityRegistry;
import io.neonbee.internal.cluster.entity.HazelcastClusterEntityRegistry;
import io.neonbee.internal.codec.DataExceptionMessageCodec;
import io.neonbee.internal.codec.DataQueryMessageCodec;
import io.neonbee.internal.codec.EntityWrapperMessageCodec;
Expand Down Expand Up @@ -100,8 +103,9 @@
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.micrometer.MicrometerMetricsOptions;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

@SuppressWarnings({ "PMD.CouplingBetweenObjects", "PMD.GodClass" })
@SuppressWarnings({ "PMD.CouplingBetweenObjects", "PMD.GodClass", "PMD.ExcessiveImports" })
public class NeonBee {
/* // @formatter:off
*
Expand Down Expand Up @@ -594,7 +598,17 @@ private Future<Void> deployModules() {
this.healthRegistry = new HealthCheckRegistry(vertx);
this.modelManager = new EntityModelManager(this);
if (vertx.isClustered()) {
this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
if (ClusterHelper.getHazelcastClusterManager(vertx).isPresent()) {
PartitionService partitionService = ClusterHelper.getHazelcastClusterManager(vertx)
.map(HazelcastClusterManager::getHazelcastInstance)
.map(HazelcastInstance::getPartitionService)
.orElseThrow(() -> new IllegalStateException(
"Failed to instantiate NeonBee with Hazelcast cluster manager."));
this.entityRegistry =
new HazelcastClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME, partitionService);
} else {
this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
}
} else {
this.entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/neonbee/internal/WriteSafeRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private Future<Void> addValue(String sharedMapKey, Object value) {
*/
@Override
public Future<Void> unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);
logger.info("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);

return lock(sharedMapKey, () -> removeValue(sharedMapKey, value));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ Future<Void> removeClusteringInformation(String clusterNodeId) {
*/
public Future<Void> unregisterNode(String clusterNodeId) {
return clusteringInformation.getSharedMap().compose(AsyncMap::entries).compose(map -> {
JsonArray registeredEntities = ((JsonArray) map.remove(clusterNodeId)).copy();
JsonArray registeredEntities = (JsonArray) map.remove(clusterNodeId);
if (registeredEntities == null) {
return Future.succeededFuture();
}
registeredEntities = registeredEntities.copy();

List<Future> futureList = new ArrayList<>(registeredEntities.size());
for (Object o : registeredEntities) {
if (remove(map, o)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.neonbee.internal.cluster.entity;

import java.util.UUID;

import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.MigrationState;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.partition.ReplicaMigrationEvent;

import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

/**
* This ClusterEntityRegistry adds a Hazelcast-specific behavior.
* <p>
* This implementation delays the method calls {@link #register(String, String)}, {@link #unregisterNode(String)} and
* {@link #unregisterNode(String)} when the cluster is not in a safe state and executes the methdos when the Hazelcast
* cluster partition migration process is complete.
*/
public class HazelcastClusterEntityRegistry extends ClusterEntityRegistry {

private final LoggingFacade logger = LoggingFacade.create();

private final PartitionService partitionService;

/**
* Create a new instance of {@link HazelcastClusterEntityRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
* @param partitionService the {@link PartitionService}
*/
public HazelcastClusterEntityRegistry(Vertx vertx, String registryName, PartitionService partitionService) {
super(vertx, registryName);
this.partitionService = partitionService;
}

@Override
public Future<Void> register(String sharedMapKey, String value) {
return executeWhenClusterIsSafe("register key \"" + sharedMapKey + "\", value: \"" + value + "\"")
.compose(unused -> super.register(sharedMapKey, value));
}

@Override
public Future<Void> unregister(String sharedMapKey, String value) {
return executeWhenClusterIsSafe("unregister key \"" + sharedMapKey + "\", value: \"" + value + "\"")
.compose(unused -> super.unregister(sharedMapKey, value));
}

/**
* Unregister all registered entities for a node by ID.
*
* @param clusterNodeId the ID of the cluster node
* @return the future
*/
@Override
public Future<Void> unregisterNode(String clusterNodeId) {
return executeWhenClusterIsSafe("unregister node with ID: " + clusterNodeId)
.compose(unused -> super.unregisterNode(clusterNodeId));
}

/**
* Returns a future that is completed when the Cluster reaches a safe state.
*
* @param description a descriptive text that is appended to the log messages
* @return the future
*/
private Future<Void> executeWhenClusterIsSafe(String description) {
if (partitionService.isClusterSafe()) {
logger.info("Executing \"{}\"", description);
return Future.succeededFuture();
}

Promise<ReplicaMigrationEvent> promise = Promise.promise();
executeWhenClusterIsSafe(promise);
return promise.future()
.onSuccess(event -> logger.info(
"execute delayed execution for: \"{}\". The replica migration {} and took {} ms.",
description, event.isSuccess() ? "completed" : "failed", event.getElapsedTime()))
.mapEmpty();
}

/**
* This method delays the de-registration if the cluster is not in a save state.
* <p>
*
* @param promise {@link Promise} to completed the function.
*/
private void executeWhenClusterIsSafe(Promise<ReplicaMigrationEvent> promise) {
UUID migrationListenerUuid = partitionService.addMigrationListener(new MigrationListener() {
@Override
public void migrationStarted(MigrationState state) {
// not interested in
}

@Override
public void migrationFinished(MigrationState state) {
// not interested in
}

@Override
public void replicaMigrationCompleted(ReplicaMigrationEvent event) {
promise.complete(event);
}

@Override
public void replicaMigrationFailed(ReplicaMigrationEvent event) {
promise.complete(event);
}
});

logger.info("Added migration listener with UUID: {}", migrationListenerUuid);
promise.future().onComplete(event -> {
partitionService.removeMigrationListener(migrationListenerUuid);
logger.info("Removed migration listener with UUID: {}", migrationListenerUuid);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.neonbee.internal.cluster.entity;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.UUID;
import java.util.function.Function;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;

import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.partition.ReplicaMigrationEvent;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;

@ExtendWith(VertxExtension.class)
class HazelcastClusterEntityRegistryTest {

@Test
void register(Vertx vertx, VertxTestContext context) {
testDelayedMethodExecution(vertx, context, "7be56c89-9b3d-452b-8009-cc801a1bcc35",
registry -> registry.register("key", "value"));
}

@Test
void unregister(Vertx vertx, VertxTestContext context) {
testDelayedMethodExecution(vertx, context, "43ebcf4f-0416-4016-8aa8-a55a2c20338f",
registry -> registry.unregister("key", "value"));
}

@Test
void unregisterNode(Vertx vertx, VertxTestContext context) {
testDelayedMethodExecution(vertx, context, "8d670ef8-6eaa-44d5-b589-ac72fce9e606",
registry -> registry.unregisterNode("ClusterNodeID:0"));
}

private static void testDelayedMethodExecution(Vertx vertx, VertxTestContext context, String uuid,
Function<HazelcastClusterEntityRegistry, Future<Void>> method) {
UUID migrationListenerUuid = UUID.fromString(uuid);

PartitionService partitionServiceMock = mock(PartitionService.class);
when(partitionServiceMock.isClusterSafe()).thenReturn(Boolean.FALSE);
when(partitionServiceMock.addMigrationListener(any()))
.thenReturn(migrationListenerUuid);

HazelcastClusterEntityRegistry registry = spy(
new HazelcastClusterEntityRegistry(vertx, "unitTestRegistry", partitionServiceMock));
doReturn("ClusterNodeID:0").when(registry).getClusterNodeId();

method.apply(registry)
.onSuccess(unused -> context.verify(() -> {
// ensure that the MigrationListener is removed
verify(partitionServiceMock, times(1)).removeMigrationListener(migrationListenerUuid);
context.completeNow();
}))
.onFailure(context::failNow);

// get the MigrationListener
ArgumentCaptor<MigrationListener> captor = ArgumentCaptor.forClass(MigrationListener.class);
verify(partitionServiceMock).addMigrationListener(captor.capture());
MigrationListener migrationListener = captor.getValue();

// send the event, that the partition replica migration is completed
ReplicaMigrationEvent event = mock(ReplicaMigrationEvent.class);
when(event.getElapsedTime()).thenReturn(10L);
when(event.isSuccess()).thenReturn(Boolean.TRUE);
migrationListener.replicaMigrationCompleted(event);
}
}

0 comments on commit 87c90cb

Please sign in to comment.