Skip to content

Commit 24bd881

Browse files
Add check for customized resources in isEvacuatedFinished
- Add isInstanceDrained method in HelixAdmin - Expose the method via instance update rest end point - Change the conditional checks order in isEvacuateFinished to improve latency
1 parent f87d948 commit 24bd881

File tree

10 files changed

+125
-18
lines changed

10 files changed

+125
-18
lines changed

helix-core/src/main/java/org/apache/helix/HelixAdmin.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -800,13 +800,25 @@ Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
800800
/**
801801
* Return if instance operation 'Evacuate' is finished.
802802
* @param clusterName
803-
* @param instancesNames
804-
* @return Return true if there is no current state nor pending message on the instance.
803+
* @param instancesName
804+
* @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor
805+
* any pending message on the instance.
805806
*/
806-
default boolean isEvacuateFinished(String clusterName, String instancesNames) {
807+
default boolean isEvacuateFinished(String clusterName, String instancesName) {
807808
throw new UnsupportedOperationException("isEvacuateFinished is not implemented.");
808809
}
809810

811+
/**
812+
* Check to see if instance is drained.
813+
* @param clusterName
814+
* @param instanceName
815+
* @return Return true if there is no FULL_AUTO or CUSTOMIZED resources in the current state nor
816+
* any pending message on the instance.
817+
*/
818+
default boolean isInstanceDrained(String clusterName, String instanceName) {
819+
throw new UnsupportedOperationException("isInstanceDrained is not implemented.");
820+
}
821+
810822
/**
811823
* Check to see if swapping between two instances can be completed. Either the swapOut or
812824
* swapIn instance can be passed in.

helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import java.util.Set;
2525

2626
import org.apache.helix.HelixDefinedState;
27+
import org.apache.helix.constants.InstanceConstants;
2728
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
2829
import org.apache.helix.controller.stages.CurrentStateOutput;
2930
import org.apache.helix.model.IdealState;
31+
import org.apache.helix.model.InstanceConfig;
3032
import org.apache.helix.model.LiveInstance;
3133
import org.apache.helix.model.Partition;
3234
import org.apache.helix.model.Resource;
@@ -132,10 +134,13 @@ private Map<String, String> computeCustomizedBestStateForPartition(
132134
boolean notInErrorState = currentStateMap != null
133135
&& !HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance));
134136
boolean enabled = !disabledInstancesForPartition.contains(instance) && isResourceEnabled;
135-
137+
InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance);
138+
boolean isInstanceEvacuated = instanceConfig != null &&
139+
instanceConfig.getInstanceOperation().getOperation() == InstanceConstants.InstanceOperation.EVACUATE;
136140
// Note: if instance is not live, the mapping for that instance will not show up in
137141
// BestPossibleMapping (and ExternalView)
138-
if (assignableLiveInstancesMap.containsKey(instance) && notInErrorState) {
142+
// if instance is evacuated keep the instanceStateMap same as idealStateMap
143+
if ((assignableLiveInstancesMap.containsKey(instance) || isInstanceEvacuated) && notInErrorState) {
139144
if (enabled) {
140145
instanceStateMap.put(instance, idealStateMap.get(instance));
141146
} else {

helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -460,12 +460,17 @@ public void setInstanceOperation(String clusterName, String instanceName,
460460

461461
@Override
462462
public boolean isEvacuateFinished(String clusterName, String instanceName) {
463-
if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
464-
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
465-
return config != null && config.getInstanceOperation().getOperation()
466-
.equals(InstanceConstants.InstanceOperation.EVACUATE);
463+
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
464+
if (config == null || config.getInstanceOperation().getOperation() !=
465+
InstanceConstants.InstanceOperation.EVACUATE ) {
466+
return false;
467467
}
468-
return false;
468+
return !instanceHasCurrentStateOrMessage(clusterName, instanceName);
469+
}
470+
471+
@Override
472+
public boolean isInstanceDrained(String clusterName, String instanceName) {
473+
return !instanceHasCurrentStateOrMessage(clusterName, instanceName);
469474
}
470475

471476
/**
@@ -721,7 +726,7 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName,
721726

722727
@Override
723728
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) {
724-
if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
729+
if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
725730
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
726731
return config != null && INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
727732
config.getInstanceOperation().getOperation());
@@ -757,13 +762,14 @@ public boolean forceKillInstance(String clusterName, String instanceName, String
757762
}
758763

759764
/**
760-
* Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline,
765+
* Return true if instance has any resource with FULL_AUTO or CUSTOMIZED rebalance mode in current state or
766+
* if instance has any pending message. Otherwise, return false if instance is offline,
761767
* instance has no active session, or if instance is online but has no current state or pending message.
762768
* @param clusterName
763769
* @param instanceName
764770
* @return
765771
*/
766-
private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName,
772+
private boolean instanceHasCurrentStateOrMessage(String clusterName,
767773
String instanceName) {
768774
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor);
769775
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -803,13 +809,14 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName,
803809
return true;
804810
}
805811

806-
// Get set of FULL_AUTO resources
812+
// Get set of FULL_AUTO and CUSTOMIZED resources
807813
List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates(), true);
808-
Set<String> fullAutoResources = idealStates != null ? idealStates.stream()
809-
.filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO)
814+
Set<String> resources = idealStates != null ? idealStates.stream()
815+
.filter(idealState -> idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO ||
816+
idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED)
810817
.map(IdealState::getResourceName).collect(Collectors.toSet()) : Collections.emptySet();
811818

812-
return currentStates.stream().anyMatch(fullAutoResources::contains);
819+
return currentStates.stream().anyMatch(resources::contains);
813820
}
814821

815822
@Override

helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
import java.util.Map;
3131
import java.util.Set;
3232
import java.util.logging.Level;
33+
import java.util.stream.Collectors;
3334
import javax.management.MBeanServerConnection;
3435
import javax.management.ObjectName;
3536

3637
import org.apache.helix.BaseDataAccessor;
3738
import org.apache.helix.ConfigAccessor;
3839
import org.apache.helix.HelixAdmin;
40+
import org.apache.helix.HelixConstants;
3941
import org.apache.helix.HelixDataAccessor;
4042
import org.apache.helix.HelixManager;
4143
import org.apache.helix.HelixProperty;
@@ -75,6 +77,7 @@
7577
import org.apache.helix.tools.ClusterStateVerifier;
7678
import org.apache.helix.tools.StateModelConfigGenerator;
7779
import org.apache.helix.zookeeper.api.client.HelixZkClient;
80+
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
7881
import org.apache.helix.zookeeper.datamodel.ZNRecord;
7982
import org.apache.helix.zookeeper.impl.client.ZkClient;
8083
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -447,6 +450,36 @@ protected void createDBInSemiAuto(ClusterSetup clusterSetup, String clusterName,
447450
clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, dbName, is);
448451
}
449452

453+
protected void createResourceInCustomizedMode(ClusterSetup clusterSetup, String clusterName, String resourceName,
454+
Map<Integer, String> partitionInstanceMap) {
455+
IdealState idealState = new IdealState(resourceName);
456+
idealState.setNumPartitions(partitionInstanceMap.size());
457+
idealState.setStateModelDefRef(OnlineOfflineSMD.name);
458+
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
459+
partitionInstanceMap.forEach((partitionID, instanceName) -> {
460+
idealState.setPartitionState(resourceName + "_" + partitionID,
461+
instanceName, OnlineOfflineSMD.States.ONLINE.toString());
462+
});
463+
clusterSetup.addResourceToCluster(clusterName, resourceName, idealState);
464+
}
465+
466+
protected void removeAllResourcesFromInstance(MockParticipantManager participant, Set<String> excludeResourceNames) {
467+
RealmAwareZkClient zkClient = participant.getZkClient();
468+
String clusterName = participant.getClusterName();
469+
String instanceName = participant.getInstanceName();
470+
String sessionId = zkClient.getChildren(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName)).get(0);
471+
List<String> resourceNames = zkClient.getChildren(
472+
PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId)
473+
);
474+
for (String resourceName : resourceNames) {
475+
if (!excludeResourceNames.contains(resourceName)) {
476+
String resourcePath = PropertyPathBuilder.instanceCurrentState(clusterName,
477+
instanceName, sessionId, resourceName);
478+
zkClient.delete(resourcePath);
479+
}
480+
}
481+
}
482+
450483
/**
451484
* Validate there should be always minimal active replica and top state replica for each
452485
* partition.

helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.helix.controller.rebalancer.CustomRebalancer;
2626
import org.apache.helix.controller.stages.CurrentStateOutput;
2727
import org.apache.helix.model.IdealState;
28+
import org.apache.helix.model.InstanceConfig;
2829
import org.apache.helix.model.LiveInstance;
2930
import org.apache.helix.model.OnlineOfflineSMD;
3031
import org.apache.helix.model.Partition;
@@ -70,7 +71,7 @@ public void testDisabledBootstrappingPartitions() {
7071
.thenReturn(ImmutableSet.of(instanceName));
7172
when(cache.getAssignableLiveInstances())
7273
.thenReturn(ImmutableMap.of(instanceName, new LiveInstance(instanceName)));
73-
74+
when(cache.getInstanceConfigMap()).thenReturn(ImmutableMap.of(instanceName, new InstanceConfig(instanceName)));
7475
CurrentStateOutput currOutput = new CurrentStateOutput();
7576
ResourceAssignment resourceAssignment =
7677
customRebalancer.computeBestPossiblePartitionState(cache, idealState, resource, currOutput);

helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,33 @@ public void testEvacuate() throws Exception {
282282
Assert.assertEquals(getEVs(), assignment);
283283
}
284284

285+
@Test
286+
public void testEvacuateWithCustomizedResource() throws Exception {
287+
System.out.println("START TestInstanceOperation.testEvacuateWithCustomizedResource() at " + new Date(System.currentTimeMillis()));
288+
for( String resource : _allDBs) {
289+
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, resource);
290+
}
291+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
292+
String instanceToEvacuate = _participants.get(0).getInstanceName();
293+
String customizedDB = "CustomizedTestDB";
294+
Map<Integer, String> partitionInstanceMap = new HashMap<>();
295+
partitionInstanceMap.put(Integer.valueOf(0), _participants.get(0).getInstanceName());
296+
createResourceInCustomizedMode(_gSetupTool, CLUSTER_NAME, customizedDB, partitionInstanceMap);
297+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
298+
_gSetupTool.getClusterManagementTool()
299+
.manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
300+
// evacuated instance
301+
_gSetupTool.getClusterManagementTool()
302+
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
303+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
304+
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
305+
_gSetupTool.getClusterManagementTool()
306+
.manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);
307+
// Drop customized DBs in clusterx
308+
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, customizedDB);
309+
createTestDBs(DEFAULT_RESOURCE_DELAY_TIME);
310+
}
311+
285312
@Test(dependsOnMethods = "testEvacuate")
286313
public void testRevertEvacuation() throws Exception {
287314
System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis()));

helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,11 @@ public boolean isEvacuateFinished(String clusterName, String instancesNames) {
577577
return false;
578578
}
579579

580+
@Override
581+
public boolean isInstanceDrained(String clusterName, String instancesNames) {
582+
return false;
583+
}
584+
580585
@Override
581586
public boolean canCompleteSwap(String clusterName, String instancesNames) {
582587
return false;

helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public enum Command {
9090
completeSwapIfPossible,
9191
onDemandRebalance,
9292
isEvacuateFinished,
93+
isInstanceDrained,
9394
setPartitionsToError,
9495
forceKillInstance
9596
}

helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,16 @@ public Response updateInstance(@PathParam("clusterId") String clusterId,
508508
return serverError(e);
509509
}
510510
return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", evacuateFinished)));
511+
case isInstanceDrained:
512+
boolean instanceDrained;
513+
try {
514+
instanceDrained = admin.isInstanceDrained(clusterId, instanceName);
515+
} catch (HelixException e) {
516+
LOG.error(String.format("Encountered error when checking if instance is drained for cluster: "
517+
+ "{}, instance: {}", clusterId, instanceName), e);
518+
return serverError(e);
519+
}
520+
return OK(OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("successful", instanceDrained)));
511521
case forceKillInstance:
512522
boolean instanceForceKilled = admin.forceKillInstance(clusterId, instanceName, reason, instanceOperationSource);
513523
if (!instanceForceKilled) {

helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,12 @@ public void updateInstance() throws Exception {
580580
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
581581
Assert.assertTrue(evacuateFinishedResult.get("successful"));
582582

583+
response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isInstanceDrained")
584+
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
585+
Map<String, Boolean> instanceDrainedResult = OBJECT_MAPPER.readValue(response.readEntity(String.class), Map.class);
586+
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
587+
Assert.assertTrue(instanceDrainedResult.get("successful"));
588+
583589
// test isEvacuateFinished on instance with EVACUATE and no currentState
584590
// Create new instance so no currentState or messages assigned to it
585591
String test_instance_name = INSTANCE_NAME + "_foo";

0 commit comments

Comments
 (0)