Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mitigate the PartitionMigratingException #313

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/main/java/io/neonbee/NeonBee.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.core.HazelcastInstance;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
Expand Down Expand Up @@ -71,6 +72,7 @@
import io.neonbee.internal.json.ConfigurableJsonFactory.ConfigurableJsonCodec;
import io.neonbee.internal.json.ImmutableJsonArray;
import io.neonbee.internal.json.ImmutableJsonObject;
import io.neonbee.internal.registry.HazelcastClusterSafeRegistry;
import io.neonbee.internal.registry.Registry;
import io.neonbee.internal.registry.SelfCleaningRegistry;
import io.neonbee.internal.registry.SelfCleaningRegistryHook;
Expand Down Expand Up @@ -101,6 +103,7 @@
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.micrometer.MicrometerMetricsOptions;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

@SuppressWarnings({ "PMD.CouplingBetweenObjects", "PMD.GodClass" })
public class NeonBee {
Expand Down Expand Up @@ -293,8 +296,20 @@ static Future<NeonBee> create(Function<VertxOptions, Future<Vertx>> vertxFactory
}

Future<HealthCheckRegistry> healthCheckRegistryFuture = HealthCheckRegistry.create(vertx);
Future<SelfCleaningRegistry<String>> entityRegistryFuture =
SelfCleaningRegistry.create(vertx, EntityVerticle.REGISTRY_NAME);
Future<Registry<String>> entityRegistryFuture =
SelfCleaningRegistry.<String>create(vertx, EntityVerticle.REGISTRY_NAME)
.map(registry -> {
if (ClusterHelper.getHazelcastClusterManager(vertx).isPresent()) {
return ClusterHelper.getHazelcastClusterManager(vertx)
.map(HazelcastClusterManager::getHazelcastInstance)
.map(HazelcastInstance::getPartitionService)
.map(partitionService -> (Registry<String>) new HazelcastClusterSafeRegistry<>(
registry, partitionService))
.orElse(registry);
} else {
return registry;
}
});

// create a NeonBee instance, hook registry and close handler
Future<NeonBee> neonBeeFuture = all(configFuture, healthCheckRegistryFuture, entityRegistryFuture)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.neonbee.internal.cluster.hazelcast;

import java.util.UUID;

import com.hazelcast.partition.MigrationListener;
import com.hazelcast.partition.MigrationState;
import com.hazelcast.partition.PartitionService;
import com.hazelcast.partition.ReplicaMigrationEvent;

import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Promise;

public class HazelcastMigration {

private final LoggingFacade logger = LoggingFacade.create();

private final PartitionService partitionService;

/**
* Create a new HazelcastMigration instance.
*
* @param partitionService the Hazelcast PartitionService instance
*/
public HazelcastMigration(PartitionService partitionService) {
this.partitionService = partitionService;
}

/**
* This method returns a future that completes when the Hazelcast replication migration is completed or failed.
* <p>
*
* @param description a descriptive text that is appended to the log messages
* @return Future object that is completed when the ReplicaMigration is finished. When the cluster is in a safe
* state this function returns a succeeded future.
*/
public Future<Void> onReplicaMigrationFinished(String description) {
if (partitionService.isClusterSafe()) {
logger.info("Executing \"{}\"", description);
return Future.succeededFuture();
}

return onReplicaMigrationFinished()
.onSuccess(event -> logger.info(
"execute delayed execution for: \"{}\". The replica migration {} and took {} ms.", description,
event.isSuccess() ? "completed" : "failed", event.getElapsedTime()))
.mapEmpty();
}

private Future<ReplicaMigrationEvent> onReplicaMigrationFinished() {
// FIXME: test if the event is directly completed if no migration is in progress

Promise<ReplicaMigrationEvent> promise = Promise.promise();
onReplicaMigrationFinished(promise);
return promise.future();
}

/**
* This method completes the promise when the Hazelcast replication migration is completed or fails the promise if
* the Hazelcast replication migration failed.
* <p>
*
* @param promise {@link Promise} to completed the function.
*/
private void onReplicaMigrationFinished(Promise<ReplicaMigrationEvent> promise) {
UUID migrationListenerUuid = partitionService.addMigrationListener(new MigrationListener() {
@Override
public void migrationStarted(MigrationState state) {
// not interested in
}

@Override
public void migrationFinished(MigrationState state) {
// not interested in
}

@Override
public void replicaMigrationCompleted(ReplicaMigrationEvent event) {
promise.complete(event);
}

@Override
public void replicaMigrationFailed(ReplicaMigrationEvent event) {
promise.fail(new HazelcastReplicaMigrationException("Hazelcast replica migration failed", event));
}
});

logger.info("Added migration listener with UUID: {}", migrationListenerUuid);
promise.future().onComplete(event -> {
if (partitionService.removeMigrationListener(migrationListenerUuid)) {
logger.info("Removed migration listener with UUID: {}", migrationListenerUuid);
} else {
logger.error("Failed to remove migration listener with UUID: {}", migrationListenerUuid);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.neonbee.internal.cluster.hazelcast;

import com.hazelcast.partition.ReplicaMigrationEvent;

/**
* HazelcastReplicaMigrationException is thrown when the Hazelcast migration fails.
*/
public class HazelcastReplicaMigrationException extends RuntimeException {

private static final long serialVersionUID = 4354100497375749139L;

private final ReplicaMigrationEvent replicaMigrationEvent;

/**
* Constructs a new HazelcastReplicaMigrationException exception with the specified detail message and event.
*
* @param message the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
* @param event the ReplicaMigrationEvent
*/
public HazelcastReplicaMigrationException(String message, ReplicaMigrationEvent event) {
super(message);
this.replicaMigrationEvent = event;
}

/**
* Constructs a new HazelcastReplicaMigrationException exception with the specified detail message, cause and event.
*
* @param message the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
* @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method). (A {@code null}
* value is permitted, and indicates that the cause is nonexistent or unknown.)
* @param event the ReplicaMigrationEvent
*/
public HazelcastReplicaMigrationException(String message, Throwable cause, ReplicaMigrationEvent event) {
super(message, cause);
this.replicaMigrationEvent = event;
}

/**
* Get the ReplicaMigrationEvent from the failed replica migration.
*
* @return the ReplicaMigrationEvent
*/
public ReplicaMigrationEvent getReplicaMigrationEvent() {
return replicaMigrationEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ private static SharedDataAccessor getSharedDataAccessor(Vertx vertx) {
* @param futureSupplier supplier for the future to be secured by the lock
* @return the futureSupplier
*/
// FIXME: this lock method could lead to unwanted lock wait because every one that use the same key would have to
// wait to acquire the lock even when the keys are unrelated
public static Future<Void> lock(Vertx vertx, String key, Supplier<Future<Void>> futureSupplier) {
LOGGER.debug("Get lock for key \"{}\"", key);
return getSharedDataAccessor(vertx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.neonbee.internal.registry;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import com.hazelcast.partition.PartitionService;

import io.neonbee.internal.cluster.hazelcast.HazelcastMigration;
import io.vertx.core.Future;

/**
* This Registry adds a Hazelcast-specific behavior.
* <p>
* This wrapper delays the method register and unregisterNode method calls when the cluster is not in a safe state and
* executes the methods when the Hazelcast cluster partition migration process is complete.
*/
public class HazelcastClusterSafeRegistry<T> implements Registry<T> {

private final HazelcastMigration hazelcastMigration;

private final Registry<T> registry;

/**
* Create a new instance of . This is Registry delegates all calls to the underlining registry, when the cluster is
* in a save state.
*
* @param registry the underlining registry
* @param partitionService the {@link PartitionService}
*/
public HazelcastClusterSafeRegistry(Registry<T> registry, PartitionService partitionService) {
this.registry = registry;
hazelcastMigration = new HazelcastMigration(partitionService);
}

@Override
public Future<Void> register(String key, T value) {
return this.hazelcastMigration
.onReplicaMigrationFinished("register key \"" + key + "\", value: \"" + value + "\"")
.compose(unused -> this.registry.register(key, value));
}

@Override
public Future<Void> register(String key, Collection<T> values) {
return this.hazelcastMigration
.onReplicaMigrationFinished("register key \"" + key + "\", values: \"" + values + "\"")
.compose(unused -> this.registry.register(key, values));
}

@Override
public Future<Void> unregister(String key, T value) {
return this.hazelcastMigration
.onReplicaMigrationFinished("unregister key \"" + key + "\", value: \"" + value + "\"")
.compose(unused -> this.registry.unregister(key, value));
}

@Override
public Future<Void> unregister(String key, Collection<T> values) {
return this.hazelcastMigration
.onReplicaMigrationFinished("unregister key \"" + key + "\", values: \"" + values + "\"")
.compose(unused -> this.registry.unregister(key, values));
}

@Override
public Future<List<T>> get(String key) {
return this.registry.get(key);
}

@Override
public Future<Optional<T>> getAny(String key) {
return this.registry.getAny(key);
}

@Override
public Future<Set<String>> getKeys() {
return this.registry.getKeys();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.neonbee.internal.registry;

import com.hazelcast.partition.PartitionService;

import io.neonbee.internal.cluster.hazelcast.HazelcastMigration;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

public class HazelcastClusterSafeRegistryController extends SelfCleaningRegistryController {
private final HazelcastMigration hazelcastMigration;

/**
* Creates a SelfCleaningRegistryController that is aware of the Hazelcast cluster.
* <p>
* This controller delays the cleanUpAllRegistriesForNode call if the Hazelcast cluster is not in a safe state.
*
* @param vertx the Vertx instance
* @param partitionService Hazelcast PartitionService instance
*/
public HazelcastClusterSafeRegistryController(Vertx vertx, PartitionService partitionService) {
super(vertx);
this.hazelcastMigration = new HazelcastMigration(partitionService);
}

@Override
public Future<Void> cleanUpAllRegistriesForNode(String nodeId) {
return hazelcastMigration.onReplicaMigrationFinished("clean up all registries for node with ID: " + nodeId)
.compose(v -> super.cleanUpAllRegistriesForNode(nodeId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import static io.neonbee.hook.HookType.CLUSTER_NODE_ID;

import com.hazelcast.core.HazelcastInstance;

import io.neonbee.NeonBee;
import io.neonbee.hook.Hook;
import io.neonbee.hook.HookContext;
import io.neonbee.hook.HookType;
import io.neonbee.internal.cluster.ClusterHelper;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

public class SelfCleaningRegistryHook {
private static final LoggingFacade LOGGER = LoggingFacade.create();
Expand All @@ -22,7 +26,7 @@ public class SelfCleaningRegistryHook {
*/
@Hook(HookType.BEFORE_SHUTDOWN)
public void unregisterOnShutdown(NeonBee neonBee, HookContext hookContext, Promise<Void> promise) {
SelfCleaningRegistryController controller = new SelfCleaningRegistryController(neonBee.getVertx());
SelfCleaningRegistryController controller = getController(neonBee.getVertx());

String nodeId = controller.getNodeId();
LOGGER.debug("Execute BEFORE_SHUTDOWN hook for SelfCleaningRegistry on node \"{}\"", nodeId);
Expand All @@ -42,7 +46,7 @@ public void unregisterOnShutdown(NeonBee neonBee, HookContext hookContext, Promi
public void cleanup(NeonBee neonBee, HookContext hookContext, Promise<Void> promise) {
if (ClusterHelper.isLeader(neonBee.getVertx())) {
String nodeId = hookContext.get(CLUSTER_NODE_ID);
SelfCleaningRegistryController controller = new SelfCleaningRegistryController(neonBee.getVertx());
SelfCleaningRegistryController controller = getController(neonBee.getVertx());

String currentNodeId = controller.getNodeId();
LOGGER.debug("Execute NODE_LEFT hook for SelfCleaningRegistry on node \"{}\" for node \"{}\"",
Expand All @@ -56,4 +60,13 @@ public void cleanup(NeonBee neonBee, HookContext hookContext, Promise<Void> prom
promise.complete();
}
}

private static SelfCleaningRegistryController getController(Vertx vertx) {
return ClusterHelper.getHazelcastClusterManager(vertx)
.map(HazelcastClusterManager::getHazelcastInstance)
.map(HazelcastInstance::getPartitionService)
.map(partitionService -> (SelfCleaningRegistryController) new HazelcastClusterSafeRegistryController(
vertx, partitionService))
.orElseGet(() -> new SelfCleaningRegistryController(vertx));
}
}
Loading