From 87c90cb2c17cb22d9b220e170ce99e0f4d5c5b78 Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Thu, 20 Apr 2023 07:04:25 +0200 Subject: [PATCH] fix: mitigate the PartitionMigratingException When a node leaves or joins the cluster hazelcast will rebalance the cluster. If we unregister or register Entity verticles when the hazelcast cluster is not in a safe state we get PartitionMigratingException's. This commit attempts to postpone the registration and unregistration until the cluster is in a safe state. --- src/main/java/io/neonbee/NeonBee.java | 18 ++- .../neonbee/internal/WriteSafeRegistry.java | 2 +- .../cluster/entity/ClusterEntityRegistry.java | 7 +- .../HazelcastClusterEntityRegistry.java | 120 ++++++++++++++++++ .../HazelcastClusterEntityRegistryTest.java | 80 ++++++++++++ 5 files changed, 223 insertions(+), 4 deletions(-) create mode 100644 src/main/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistry.java create mode 100644 src/test/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistryTest.java diff --git a/src/main/java/io/neonbee/NeonBee.java b/src/main/java/io/neonbee/NeonBee.java index b8078aa80..07c11b71c 100644 --- a/src/main/java/io/neonbee/NeonBee.java +++ b/src/main/java/io/neonbee/NeonBee.java @@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.partition.PartitionService; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; @@ -62,6 +64,7 @@ import io.neonbee.internal.buffer.ImmutableBuffer; import io.neonbee.internal.cluster.ClusterHelper; import io.neonbee.internal.cluster.entity.ClusterEntityRegistry; +import io.neonbee.internal.cluster.entity.HazelcastClusterEntityRegistry; import io.neonbee.internal.codec.DataExceptionMessageCodec; import io.neonbee.internal.codec.DataQueryMessageCodec; import io.neonbee.internal.codec.EntityWrapperMessageCodec; @@ -100,8 +103,9 @@ import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeListener; import io.vertx.micrometer.MicrometerMetricsOptions; +import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; -@SuppressWarnings({ "PMD.CouplingBetweenObjects", "PMD.GodClass" }) +@SuppressWarnings({ "PMD.CouplingBetweenObjects", "PMD.GodClass", "PMD.ExcessiveImports" }) public class NeonBee { /* // @formatter:off * @@ -594,7 +598,17 @@ private Future deployModules() { this.healthRegistry = new HealthCheckRegistry(vertx); this.modelManager = new EntityModelManager(this); if (vertx.isClustered()) { - this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME); + if (ClusterHelper.getHazelcastClusterManager(vertx).isPresent()) { + PartitionService partitionService = ClusterHelper.getHazelcastClusterManager(vertx) + .map(HazelcastClusterManager::getHazelcastInstance) + .map(HazelcastInstance::getPartitionService) + .orElseThrow(() -> new IllegalStateException( + "Failed to instantiate NeonBee with Hazelcast cluster manager.")); + this.entityRegistry = + new HazelcastClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME, partitionService); + } else { + this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME); + } } else { this.entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME); } diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index 73637dd38..12a2cd525 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -98,7 +98,7 @@ private Future addValue(String sharedMapKey, Object value) { */ @Override public Future unregister(String sharedMapKey, T value) { - logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + logger.info("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); return lock(sharedMapKey, () -> removeValue(sharedMapKey, value)); } diff --git a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java index facc06255..1e33b690e 100644 --- a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java +++ b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java @@ -118,7 +118,12 @@ Future removeClusteringInformation(String clusterNodeId) { */ public Future unregisterNode(String clusterNodeId) { return clusteringInformation.getSharedMap().compose(AsyncMap::entries).compose(map -> { - JsonArray registeredEntities = ((JsonArray) map.remove(clusterNodeId)).copy(); + JsonArray registeredEntities = (JsonArray) map.remove(clusterNodeId); + if (registeredEntities == null) { + return Future.succeededFuture(); + } + registeredEntities = registeredEntities.copy(); + List futureList = new ArrayList<>(registeredEntities.size()); for (Object o : registeredEntities) { if (remove(map, o)) { diff --git a/src/main/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistry.java new file mode 100644 index 000000000..b8096c3c6 --- /dev/null +++ b/src/main/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistry.java @@ -0,0 +1,120 @@ +package io.neonbee.internal.cluster.entity; + +import java.util.UUID; + +import com.hazelcast.partition.MigrationListener; +import com.hazelcast.partition.MigrationState; +import com.hazelcast.partition.PartitionService; +import com.hazelcast.partition.ReplicaMigrationEvent; + +import io.neonbee.logging.LoggingFacade; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; + +/** + * This ClusterEntityRegistry adds a Hazelcast-specific behavior. + *

+ * This implementation delays the method calls {@link #register(String, String)}, {@link #unregisterNode(String)} and + * {@link #unregisterNode(String)} when the cluster is not in a safe state and executes the methdos when the Hazelcast + * cluster partition migration process is complete. + */ +public class HazelcastClusterEntityRegistry extends ClusterEntityRegistry { + + private final LoggingFacade logger = LoggingFacade.create(); + + private final PartitionService partitionService; + + /** + * Create a new instance of {@link HazelcastClusterEntityRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + * @param partitionService the {@link PartitionService} + */ + public HazelcastClusterEntityRegistry(Vertx vertx, String registryName, PartitionService partitionService) { + super(vertx, registryName); + this.partitionService = partitionService; + } + + @Override + public Future register(String sharedMapKey, String value) { + return executeWhenClusterIsSafe("register key \"" + sharedMapKey + "\", value: \"" + value + "\"") + .compose(unused -> super.register(sharedMapKey, value)); + } + + @Override + public Future unregister(String sharedMapKey, String value) { + return executeWhenClusterIsSafe("unregister key \"" + sharedMapKey + "\", value: \"" + value + "\"") + .compose(unused -> super.unregister(sharedMapKey, value)); + } + + /** + * Unregister all registered entities for a node by ID. + * + * @param clusterNodeId the ID of the cluster node + * @return the future + */ + @Override + public Future unregisterNode(String clusterNodeId) { + return executeWhenClusterIsSafe("unregister node with ID: " + clusterNodeId) + .compose(unused -> super.unregisterNode(clusterNodeId)); + } + + /** + * Returns a future that is completed when the Cluster reaches a safe state. + * + * @param description a descriptive text that is appended to the log messages + * @return the future + */ + private Future executeWhenClusterIsSafe(String description) { + if (partitionService.isClusterSafe()) { + logger.info("Executing \"{}\"", description); + return Future.succeededFuture(); + } + + Promise promise = Promise.promise(); + executeWhenClusterIsSafe(promise); + return promise.future() + .onSuccess(event -> logger.info( + "execute delayed execution for: \"{}\". The replica migration {} and took {} ms.", + description, event.isSuccess() ? "completed" : "failed", event.getElapsedTime())) + .mapEmpty(); + } + + /** + * This method delays the de-registration if the cluster is not in a save state. + *

+ * + * @param promise {@link Promise} to completed the function. + */ + private void executeWhenClusterIsSafe(Promise promise) { + UUID migrationListenerUuid = partitionService.addMigrationListener(new MigrationListener() { + @Override + public void migrationStarted(MigrationState state) { + // not interested in + } + + @Override + public void migrationFinished(MigrationState state) { + // not interested in + } + + @Override + public void replicaMigrationCompleted(ReplicaMigrationEvent event) { + promise.complete(event); + } + + @Override + public void replicaMigrationFailed(ReplicaMigrationEvent event) { + promise.complete(event); + } + }); + + logger.info("Added migration listener with UUID: {}", migrationListenerUuid); + promise.future().onComplete(event -> { + partitionService.removeMigrationListener(migrationListenerUuid); + logger.info("Removed migration listener with UUID: {}", migrationListenerUuid); + }); + } +} diff --git a/src/test/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistryTest.java b/src/test/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistryTest.java new file mode 100644 index 000000000..416dcdc35 --- /dev/null +++ b/src/test/java/io/neonbee/internal/cluster/entity/HazelcastClusterEntityRegistryTest.java @@ -0,0 +1,80 @@ +package io.neonbee.internal.cluster.entity; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import java.util.function.Function; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; + +import com.hazelcast.partition.MigrationListener; +import com.hazelcast.partition.PartitionService; +import com.hazelcast.partition.ReplicaMigrationEvent; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +@ExtendWith(VertxExtension.class) +class HazelcastClusterEntityRegistryTest { + + @Test + void register(Vertx vertx, VertxTestContext context) { + testDelayedMethodExecution(vertx, context, "7be56c89-9b3d-452b-8009-cc801a1bcc35", + registry -> registry.register("key", "value")); + } + + @Test + void unregister(Vertx vertx, VertxTestContext context) { + testDelayedMethodExecution(vertx, context, "43ebcf4f-0416-4016-8aa8-a55a2c20338f", + registry -> registry.unregister("key", "value")); + } + + @Test + void unregisterNode(Vertx vertx, VertxTestContext context) { + testDelayedMethodExecution(vertx, context, "8d670ef8-6eaa-44d5-b589-ac72fce9e606", + registry -> registry.unregisterNode("ClusterNodeID:0")); + } + + private static void testDelayedMethodExecution(Vertx vertx, VertxTestContext context, String uuid, + Function> method) { + UUID migrationListenerUuid = UUID.fromString(uuid); + + PartitionService partitionServiceMock = mock(PartitionService.class); + when(partitionServiceMock.isClusterSafe()).thenReturn(Boolean.FALSE); + when(partitionServiceMock.addMigrationListener(any())) + .thenReturn(migrationListenerUuid); + + HazelcastClusterEntityRegistry registry = spy( + new HazelcastClusterEntityRegistry(vertx, "unitTestRegistry", partitionServiceMock)); + doReturn("ClusterNodeID:0").when(registry).getClusterNodeId(); + + method.apply(registry) + .onSuccess(unused -> context.verify(() -> { + // ensure that the MigrationListener is removed + verify(partitionServiceMock, times(1)).removeMigrationListener(migrationListenerUuid); + context.completeNow(); + })) + .onFailure(context::failNow); + + // get the MigrationListener + ArgumentCaptor captor = ArgumentCaptor.forClass(MigrationListener.class); + verify(partitionServiceMock).addMigrationListener(captor.capture()); + MigrationListener migrationListener = captor.getValue(); + + // send the event, that the partition replica migration is completed + ReplicaMigrationEvent event = mock(ReplicaMigrationEvent.class); + when(event.getElapsedTime()).thenReturn(10L); + when(event.isSuccess()).thenReturn(Boolean.TRUE); + migrationListener.replicaMigrationCompleted(event); + } +}