Skip to content

Commit e7ec03b

Browse files
Add preserveOrder check for CustomChecks
1 parent 8cc47cd commit e7ec03b

File tree

2 files changed

+74
-24
lines changed

2 files changed

+74
-24
lines changed

helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clust
394394
toBeStoppedInstances, preserveOrder);
395395
// custom check, includes partition check.
396396
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
397-
toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent));
397+
toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent), preserveOrder);
398398
return finalStoppableChecks;
399399
}
400400

@@ -502,12 +502,12 @@ private List<String> batchHelixInstanceStoppableCheck(String clusterId,
502502
addMinActiveReplicaChecks(clusterId, helixInstanceChecks, toBeStoppedInstances);
503503

504504
// finalStoppableChecks contains instances that does not pass this health check
505-
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
505+
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks, preserveOrder);
506506
}
507507

508508
private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
509509
Set<String> toBeStoppedInstances, Map<String, StoppableCheck> finalStoppableChecks,
510-
Map<String, String> customPayLoads) {
510+
Map<String, String> customPayLoads, boolean preserveOrder) {
511511
if (instances.isEmpty()) {
512512
// if all instances failed at previous checks, then all following checks are not required.
513513
return instances;
@@ -536,7 +536,7 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St
536536
Map<String, StoppableCheck> clusterLevelCustomCheckResult =
537537
performAggregatedCustomCheck(clusterId, instanceIdsForCustomCheck,
538538
restConfig.getCompleteConfiguredHealthUrl().get(), customPayLoads,
539-
toBeStoppedInstances);
539+
toBeStoppedInstances, preserveOrder);
540540
List<String> instancesForNextCheck = new ArrayList<>();
541541
clusterLevelCustomCheckResult.forEach((instance, stoppableCheck) -> {
542542
addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
@@ -553,11 +553,15 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St
553553
List<String> instancesForCustomPartitionLevelChecks = instanceIdsForCustomCheck;
554554
if (!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)) {
555555
Map<String, Future<StoppableCheck>> customInstanceLevelChecks = instances.stream().collect(
556-
Collectors.toMap(Function.identity(), instance -> POOL.submit(
557-
() -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance),
558-
customPayLoads))));
556+
Collectors.toMap(
557+
Function.identity(),
558+
instance -> POOL.submit(() -> performCustomInstanceCheck(clusterId, instance, restConfig.getBaseUrl(instance), customPayLoads)),
559+
(existing, replacement) -> existing,
560+
// Use LinkedHashMap when preserveOrder is true to maintain the original order of instances
561+
preserveOrder ? LinkedHashMap::new : HashMap::new
562+
));
559563
instancesForCustomPartitionLevelChecks =
560-
filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks);
564+
filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks, preserveOrder);
561565
}
562566

563567
if (!instancesForCustomPartitionLevelChecks.isEmpty() && !_skipHealthCheckCategories.contains(
@@ -638,7 +642,7 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
638642
// custom check, includes custom Instance check and partition check.
639643
instancesForNext =
640644
batchCustomInstanceStoppableCheck(clusterId, instancesForNext, Collections.emptySet(),
641-
finalStoppableChecks, healthCheckConfig);
645+
finalStoppableChecks, healthCheckConfig, false);
642646
} else {
643647
throw new UnsupportedOperationException(healthCheck + " is not supported yet!");
644648
}
@@ -675,18 +679,31 @@ private void addStoppableCheck(Map<String, StoppableCheck> stoppableChecks, Stri
675679
private List<String> filterInstancesForNextCheck(
676680
Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
677681
Map<String, StoppableCheck> finalStoppableCheckByInstance) {
678-
List<String> instancesForNextCheck = new ArrayList<>();
679-
for (Map.Entry<String, Future<StoppableCheck>> entry : futureStoppableCheckByInstance
680-
.entrySet()) {
682+
return filterInstancesForNextCheck(futureStoppableCheckByInstance, finalStoppableCheckByInstance, false);
683+
}
684+
685+
/**
686+
* Filters instances that have passed the stoppable check for the next check stage.
687+
* When preserveOrder is true, the original order of instances is maintained.
688+
*
689+
* @param futureStoppableCheckByInstance Map of instance name to future stoppable check
690+
* @param finalStoppableCheckByInstance Map to collect the final stoppable check results
691+
* @param preserveOrder Whether to preserve the original order of instances
692+
* @return List of instances that passed the stoppable check
693+
*/
694+
private List<String> filterInstancesForNextCheck(
695+
Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
696+
Map<String, StoppableCheck> finalStoppableCheckByInstance,
697+
boolean preserveOrder) {
698+
// Use LinkedHashMap when preserveOrder is true to maintain the original order of instances
699+
Map<String, Boolean> instanceStoppableMap = preserveOrder ? new LinkedHashMap<>() : new HashMap<>();
700+
701+
for (Map.Entry<String, Future<StoppableCheck>> entry : futureStoppableCheckByInstance.entrySet()) {
681702
String instance = entry.getKey();
682703
try {
683704
StoppableCheck stoppableCheck = entry.getValue().get();
684705
addStoppableCheck(finalStoppableCheckByInstance, instance, stoppableCheck);
685-
if (stoppableCheck.isStoppable() || isNonBlockingCheck(stoppableCheck)) {
686-
// instance passed this around of check or mandatory all checks
687-
// will be checked in the next round
688-
instancesForNextCheck.add(instance);
689-
}
706+
instanceStoppableMap.put(instance, stoppableCheck.isStoppable() || isNonBlockingCheck(stoppableCheck));
690707
} catch (Exception e) {
691708
String errorMessage =
692709
String.format("Failed to get StoppableChecks in parallel. Instance: %s", instance);
@@ -695,6 +712,16 @@ private List<String> filterInstancesForNextCheck(
695712
}
696713
}
697714

715+
// Filter instances that passed the check
716+
List<String> instancesForNextCheck = new ArrayList<>();
717+
for (Map.Entry<String, Boolean> entry : instanceStoppableMap.entrySet()) {
718+
if (entry.getValue()) {
719+
// Instance passed this round of check or mandatory all checks
720+
// will be checked in the next round
721+
instancesForNextCheck.add(entry.getKey());
722+
}
723+
}
724+
698725
return instancesForNextCheck;
699726
}
700727

@@ -785,8 +812,10 @@ private Map<String, StoppableCheck> performPartitionsCheck(List<String> instance
785812

786813
private Map<String, StoppableCheck> performAggregatedCustomCheck(String clusterId,
787814
List<String> instances, String url, Map<String, String> customPayLoads,
788-
Set<String> toBeStoppedInstances) {
789-
Map<String, StoppableCheck> aggregatedStoppableChecks = new HashMap<>();
815+
Set<String> toBeStoppedInstances, boolean preserveOrder) {
816+
// Use LinkedHashMap when preserveOrder is true to maintain the original order of instances
817+
Map<String, StoppableCheck> aggregatedStoppableChecks = preserveOrder ?
818+
new LinkedHashMap<>() : new HashMap<>();
790819
try {
791820
Map<String, List<String>> customCheckResult =
792821
_customRestClient.getAggregatedStoppableCheck(url, instances, toBeStoppedInstances,
@@ -799,9 +828,13 @@ private Map<String, StoppableCheck> performAggregatedCustomCheck(String clusterI
799828
}
800829
} catch (IOException ex) {
801830
LOG.error("Custom client side aggregated health check for {} failed.", clusterId, ex);
802-
return instances.stream().collect(Collectors.toMap(Function.identity(),
831+
return instances.stream().collect(Collectors.toMap(
832+
Function.identity(),
803833
instance -> new StoppableCheck(false, Collections.singletonList(instance),
804-
StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK)));
834+
StoppableCheck.Category.CUSTOM_AGGREGATED_CHECK),
835+
(existing, replacement) -> existing,
836+
// Use LinkedHashMap when preserveOrder is true to maintain the original order of instances
837+
preserveOrder ? LinkedHashMap::new : HashMap::new));
805838
}
806839
return aggregatedStoppableChecks;
807840
}
@@ -912,7 +945,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
912945
return healthStatus;
913946
}
914947

915-
// Adds the result of the min_active replica check for each stoppable check passed in futureStoppableCheckByInstance
948+
916949
private void addMinActiveReplicaChecks(String clusterId, Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
917950
Set<String> toBeStoppedInstances) {
918951
// Do not perform check if in the skip list

helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,25 @@ private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
7575
_dataAccessor = dataAccessor;
7676
}
7777

78+
/**
79+
* Evaluates and collects stoppable instances within a specified or determined zone based on the order of zones.
80+
* If _orderOfZone is specified, the method targets the first non-empty zone; otherwise, it targets the zone with
81+
* the highest instance count. The method iterates through instances, performing stoppable checks, and records
82+
* reasons for non-stoppability.
83+
*
84+
* @param instances A list of instance to be evaluated.
85+
* @param toBeStoppedInstances A list of instances presumed to be already stopped
86+
* @return An ObjectNode containing:
87+
* - 'stoppableNode': List of instances that can be stopped.
88+
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
89+
* a list of reasons for non-stoppability as the value.
90+
* @throws IOException
91+
*/
92+
public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
93+
List<String> toBeStoppedInstances) throws IOException {
94+
return getStoppableInstancesInSingleZone(instances, toBeStoppedInstances, false);
95+
}
96+
7897
/**
7998
* Evaluates and collects stoppable instances within a specified or determined zone based on the order of zones.
8099
* If _orderOfZone is specified, the method targets the first non-empty zone; otherwise, it targets the zone with
@@ -115,7 +134,6 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
115134
*
116135
* @param instances A list of instance to be evaluated.
117136
* @param toBeStoppedInstances A list of instances presumed to be already stopped
118-
* @param preserveOrder Indicates whether to preserve the original order of instances
119137
* @return An ObjectNode containing:
120138
* - 'stoppableNode': List of instances that can be stopped.
121139
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
@@ -153,7 +171,6 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
153171
*
154172
* @param instances A list of instance to be evaluated.
155173
* @param toBeStoppedInstances A list of instances presumed to be already stopped
156-
* @param preserveOrder Indicates whether to preserve the original order of instances
157174
* @return An ObjectNode containing:
158175
* - 'stoppableNode': List of instances that can be stopped.
159176
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and

0 commit comments

Comments
 (0)