diff --git a/helix-admin-webapp/pom.xml b/helix-admin-webapp/pom.xml
index 0bd098f2f0..31502c7367 100644
--- a/helix-admin-webapp/pom.xml
+++ b/helix-admin-webapp/pom.xml
@@ -90,7 +90,7 @@
com.thoughtworks.xstream
xstream
- 1.4.21
+ 1.4.19
com.fasterxml.jackson.core
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 5c2ef10f20..68bfba86c2 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -800,13 +800,25 @@ Map validateInstancesForWagedRebalance(String clusterName,
/**
* Return if instance operation 'Evacuate' is finished.
* @param clusterName
- * @param instancesNames
- * @return Return true if there is no current state nor pending message on the instance.
+ * @param instancesName
+ * @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor
+ * any pending message on the instance.
*/
- default boolean isEvacuateFinished(String clusterName, String instancesNames) {
+ default boolean isEvacuateFinished(String clusterName, String instancesName) {
throw new UnsupportedOperationException("isEvacuateFinished is not implemented.");
}
+ /**
+ * Check to see if instance is drained.
+ * @param clusterName
+ * @param instanceName
+ * @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor
+ * any pending message on the instance.
+ */
+ default boolean isInstanceDrained(String clusterName, String instanceName) {
+ throw new UnsupportedOperationException("isInstanceDrained is not implemented.");
+ }
+
/**
* Check to see if swapping between two instances can be completed. Either the swapOut or
* swapIn instance can be passed in.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 939d94aedf..3b2f543e70 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -24,9 +24,11 @@
import java.util.Set;
import org.apache.helix.HelixDefinedState;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
@@ -132,10 +134,14 @@ private Map computeCustomizedBestStateForPartition(
boolean notInErrorState = currentStateMap != null
&& !HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance));
boolean enabled = !disabledInstancesForPartition.contains(instance) && isResourceEnabled;
-
+ InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance);
+ boolean hasEvacuatedOp = instanceConfig != null &&
+ instanceConfig.getInstanceOperation().getOperation() == InstanceConstants.InstanceOperation.EVACUATE;
+ boolean isAssignableForCustomizedResource = cache.getLiveInstances().containsKey(instance) && hasEvacuatedOp;
// Note: if instance is not live, the mapping for that instance will not show up in
// BestPossibleMapping (and ExternalView)
- if (assignableLiveInstancesMap.containsKey(instance) && notInErrorState) {
+ // if instance is evacuated keep the instanceStateMap same as idealStateMap
+ if ((assignableLiveInstancesMap.containsKey(instance) || isAssignableForCustomizedResource) && notInErrorState) {
if (enabled) {
instanceStateMap.put(instance, idealStateMap.get(instance));
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 0d72ac4aaa..29216cf4b4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -460,12 +460,17 @@ public void setInstanceOperation(String clusterName, String instanceName,
@Override
public boolean isEvacuateFinished(String clusterName, String instanceName) {
- if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
- InstanceConfig config = getInstanceConfig(clusterName, instanceName);
- return config != null && config.getInstanceOperation().getOperation()
- .equals(InstanceConstants.InstanceOperation.EVACUATE);
+ InstanceConfig config = getInstanceConfig(clusterName, instanceName);
+ if (config == null || config.getInstanceOperation().getOperation() !=
+ InstanceConstants.InstanceOperation.EVACUATE ) {
+ return false;
}
- return false;
+ return !instanceHasCurrentStateOrMessage(clusterName, instanceName);
+ }
+
+ @Override
+ public boolean isInstanceDrained(String clusterName, String instanceName) {
+ return !instanceHasCurrentStateOrMessage(clusterName, instanceName);
}
/**
@@ -721,7 +726,7 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName,
@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) {
- if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
+ if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
return config != null && INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
config.getInstanceOperation().getOperation());
@@ -757,13 +762,14 @@ public boolean forceKillInstance(String clusterName, String instanceName, String
}
/**
- * Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline,
+ * Return true if instance has any resource with FULL_AUTO or CUSTOMIZED rebalance mode in current state or
+ * if instance has any pending message. Otherwise, return false if instance is offline,
* instance has no active session, or if instance is online but has no current state or pending message.
* @param clusterName
* @param instanceName
* @return
*/
- private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName,
+ private boolean instanceHasCurrentStateOrMessage(String clusterName,
String instanceName) {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -803,13 +809,14 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName,
return true;
}
- // Get set of FULL_AUTO resources
+ // Get set of FULL_AUTO and CUSTOMIZED resources
List idealStates = accessor.getChildValues(keyBuilder.idealStates(), true);
- Set fullAutoResources = idealStates != null ? idealStates.stream()
- .filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO)
+ Set resources = idealStates != null ? idealStates.stream()
+ .filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO ||
+ idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED)
.map(IdealState::getResourceName).collect(Collectors.toSet()) : Collections.emptySet();
- return currentStates.stream().anyMatch(fullAutoResources::contains);
+ return currentStates.stream().anyMatch(resources::contains);
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 278cdfc7ac..d6a761cf7d 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -30,12 +30,14 @@
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
+import java.util.stream.Collectors;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
@@ -75,6 +77,7 @@
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -447,6 +450,36 @@ protected void createDBInSemiAuto(ClusterSetup clusterSetup, String clusterName,
clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, dbName, is);
}
+ protected void createResourceInCustomizedMode(ClusterSetup clusterSetup, String clusterName, String resourceName,
+ Map partitionInstanceMap) {
+ IdealState idealState = new IdealState(resourceName);
+ idealState.setNumPartitions(partitionInstanceMap.size());
+ idealState.setStateModelDefRef(OnlineOfflineSMD.name);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ partitionInstanceMap.forEach((partitionID, instanceName) -> {
+ idealState.setPartitionState(resourceName + "_" + partitionID,
+ instanceName, OnlineOfflineSMD.States.ONLINE.toString());
+ });
+ clusterSetup.addResourceToCluster(clusterName, resourceName, idealState);
+ }
+
+ protected void removeAllResourcesFromInstance(MockParticipantManager participant, Set excludeResourceNames) {
+ RealmAwareZkClient zkClient = participant.getZkClient();
+ String clusterName = participant.getClusterName();
+ String instanceName = participant.getInstanceName();
+ String sessionId = zkClient.getChildren(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName)).get(0);
+ List resourceNames = zkClient.getChildren(
+ PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId)
+ );
+ for (String resourceName : resourceNames) {
+ if (!excludeResourceNames.contains(resourceName)) {
+ String resourcePath = PropertyPathBuilder.instanceCurrentState(clusterName,
+ instanceName, sessionId, resourceName);
+ zkClient.delete(resourcePath);
+ }
+ }
+ }
+
/**
* Validate there should be always minimal active replica and top state replica for each
* partition.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
index 07a0a5de56..e8c7491af6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
@@ -25,6 +25,7 @@
import org.apache.helix.controller.rebalancer.CustomRebalancer;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.OnlineOfflineSMD;
import org.apache.helix.model.Partition;
@@ -70,7 +71,7 @@ public void testDisabledBootstrappingPartitions() {
.thenReturn(ImmutableSet.of(instanceName));
when(cache.getAssignableLiveInstances())
.thenReturn(ImmutableMap.of(instanceName, new LiveInstance(instanceName)));
-
+ when(cache.getInstanceConfigMap()).thenReturn(ImmutableMap.of(instanceName, new InstanceConfig(instanceName)));
CurrentStateOutput currOutput = new CurrentStateOutput();
ResourceAssignment resourceAssignment =
customRebalancer.computeBestPossiblePartitionState(cache, idealState, resource, currOutput);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 83b7c01924..98f79bcb29 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -282,6 +282,33 @@ public void testEvacuate() throws Exception {
Assert.assertEquals(getEVs(), assignment);
}
+ @Test
+ public void testEvacuateWithCustomizedResource() throws Exception {
+ System.out.println("START TestInstanceOperation.testEvacuateWithCustomizedResource() at " + new Date(System.currentTimeMillis()));
+ for( String resource : _allDBs) {
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, resource);
+ }
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ String instanceToEvacuate = _participants.get(0).getInstanceName();
+ String customizedDB = "CustomizedTestDB";
+ Map partitionInstanceMap = new HashMap<>();
+ partitionInstanceMap.put(Integer.valueOf(0), _participants.get(0).getInstanceName());
+ createResourceInCustomizedMode(_gSetupTool, CLUSTER_NAME, customizedDB, partitionInstanceMap);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ _gSetupTool.getClusterManagementTool()
+ .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
+ // evacuated instance
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
+ _gSetupTool.getClusterManagementTool()
+ .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);
+ // Drop customized DBs in clusterx
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, customizedDB);
+ createTestDBs(DEFAULT_RESOURCE_DELAY_TIME);
+ }
+
@Test(dependsOnMethods = "testEvacuate")
public void testRevertEvacuation() throws Exception {
System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis()));
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 5a1a8a5bcb..7f516345ae 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -577,6 +577,11 @@ public boolean isEvacuateFinished(String clusterName, String instancesNames) {
return false;
}
+ @Override
+ public boolean isInstanceDrained(String clusterName, String instancesNames) {
+ return false;
+ }
+
@Override
public boolean canCompleteSwap(String clusterName, String instancesNames) {
return false;
diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml
index 24ddf0d621..e13a8c2c67 100644
--- a/helix-rest/pom.xml
+++ b/helix-rest/pom.xml
@@ -121,7 +121,7 @@
com.thoughtworks.xstream
xstream
- 1.4.21
+ 1.4.19
com.fasterxml.jackson.core
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 847eb802fa..13cabb1ee4 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -90,6 +90,7 @@ public enum Command {
completeSwapIfPossible,
onDemandRebalance,
isEvacuateFinished,
+ isInstanceDrained,
setPartitionsToError,
forceKillInstance
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index d82c177962..1a16e3b9e8 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -508,6 +508,16 @@ public Response updateInstance(@PathParam("clusterId") String clusterId,
return serverError(e);
}
return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", evacuateFinished)));
+ case isInstanceDrained:
+ boolean instanceDrained;
+ try {
+ instanceDrained = admin.isInstanceDrained(clusterId, instanceName);
+ } catch (HelixException e) {
+ LOG.error(String.format("Encountered error when checking if instance is drained for cluster: "
+ + "{}, instance: {}", clusterId, instanceName), e);
+ return serverError(e);
+ }
+ return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", instanceDrained)));
case forceKillInstance:
boolean instanceForceKilled = admin.forceKillInstance(clusterId, instanceName, reason, instanceOperationSource);
if (!instanceForceKilled) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 32f47baae4..3ed432f00a 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -580,6 +580,12 @@ public void updateInstance() throws Exception {
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
Assert.assertTrue(evacuateFinishedResult.get("successful"));
+ response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isInstanceDrained")
+ .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
+ Map instanceDrainedResult = OBJECT_MAPPER.readValue(response.readEntity(String.class), Map.class);
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ Assert.assertTrue(instanceDrainedResult.get("successful"));
+
// test isEvacuateFinished on instance with EVACUATE and no currentState
// Create new instance so no currentState or messages assigned to it
String test_instance_name = INSTANCE_NAME + "_foo";