From 7b238a9e372c44efc16795619aeade423768cfd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grant=20Pal=C3=A1u=20Spencer?= Date: Tue, 3 Sep 2024 12:54:56 -0700 Subject: [PATCH] Consider fault zones when sorting combinedPreferenceList (#2894) Consider fault zones when sorting combinedPreferenceList --- .../rebalancer/AbstractRebalancer.java | 52 +++- .../rebalancer/DelayedAutoRebalancer.java | 3 +- .../rebalancer/MaintenanceRebalancer.java | 4 +- ...ceListNodeComparatorWithTopologyAware.java | 229 ++++++++++++++++++ 4 files changed, 284 insertions(+), 4 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestPreferenceListNodeComparatorWithTopologyAware.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java index 477ef2032c..d78e0e816e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java @@ -557,12 +557,25 @@ protected static class PreferenceListNodeComparator implements Comparator _currentStateMap; protected final StateModelDefinition _stateModelDef; protected final List _preferenceList; + protected final ResourceControllerDataProvider _cache; + protected final Map _mzReplicaCountMap; public PreferenceListNodeComparator(Map currentStateMap, StateModelDefinition stateModelDef, List preferenceList) { _currentStateMap = currentStateMap; _stateModelDef = stateModelDef; _preferenceList = preferenceList; + _cache = null; + _mzReplicaCountMap = null; + } + + public PreferenceListNodeComparator(Map currentStateMap, StateModelDefinition stateModelDef, + List preferenceList, ResourceControllerDataProvider cache) { + _currentStateMap = currentStateMap; + _stateModelDef = stateModelDef; + _preferenceList = preferenceList; + _cache = cache; + _mzReplicaCountMap = populateMzReplicaCountMap(); } @Override @@ -570,7 +583,9 @@ public int compare(String ins1, String ins2) { // condition : // 1. both in preference list, keep the order in preference list // 2. one them in preference list, the one in preference list has higher priority - // 3. none of them in preference list, sort by state. + // 3. None of them in preference list, reverse sort by # of replicas in that mz + // a. node in MZ that has more replicas should be dropped first (lower priority) + // 4. Same # of replicas in each MZ, order by state if (_preferenceList.contains(ins1) && _preferenceList.contains(ins2)) { return _preferenceList.indexOf(ins1) - _preferenceList.indexOf(ins2); } else if (_preferenceList.contains(ins1)) { @@ -581,6 +596,22 @@ public int compare(String ins1, String ins2) { Integer p1 = Integer.MAX_VALUE; Integer p2 = Integer.MAX_VALUE; + // Order by MZ representation (overrepresented should be dropped first) + if (_mzReplicaCountMap != null && !_mzReplicaCountMap.isEmpty()) { + String faultZoneType = _cache.getClusterConfig().getFaultZoneType(); + String mz1 = _cache.getInstanceConfigMap().get(ins1) != null ? + _cache.getInstanceConfigMap().get(ins1).getDomainAsMap().get(faultZoneType) : null; + String mz2 = _cache.getInstanceConfigMap().get(ins2) != null ? + _cache.getInstanceConfigMap().get(ins2).getDomainAsMap().get(faultZoneType) : null; + + int mz1Count = _mzReplicaCountMap.getOrDefault(mz1, 0); + int mz2Count = _mzReplicaCountMap.getOrDefault(mz2, 0); + if (mz1Count != mz2Count) { + return mz1Count - mz2Count; + } + } + + // Order by state priority Map statesPriorityMap = _stateModelDef.getStatePriorityMap(); String state1 = _currentStateMap.get(ins1); String state2 = _currentStateMap.get(ins2); @@ -593,6 +624,25 @@ public int compare(String ins1, String ins2) { return p1.compareTo(p2); } + + private Map populateMzReplicaCountMap() { + if (_cache == null || _cache.getInstanceConfigMap() == null || + _cache.getClusterConfig() == null || !_cache.getClusterConfig().isTopologyAwareEnabled() ) { + return Collections.emptyMap(); + } + String faultZoneType = _cache.getClusterConfig().getFaultZoneType(); + + Map mzReplicaCountMap = new HashMap<>(); + for (String instance : _currentStateMap.keySet()) { + String mz = _cache.getInstanceConfigMap().get(instance).getDomainAsMap().get(faultZoneType); + if (mzReplicaCountMap.containsKey(mz)) { + mzReplicaCountMap.put(mz, mzReplicaCountMap.get(mz) + 1); + } else { + mzReplicaCountMap.put(mz, 1); + } + } + return mzReplicaCountMap; + } } // This is for a backward compatible workaround to fix diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index d55a6eae83..939e894cb9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -365,7 +365,8 @@ protected Map computeBestPossibleStateForPartition(Set l currentMapWithPreferenceList.keySet().retainAll(preferenceList); combinedPreferenceList.addAll(currentInstances); - combinedPreferenceList.sort(new PreferenceListNodeComparator(currentStateMap, stateModelDef, preferenceList)); + combinedPreferenceList.sort( + new PreferenceListNodeComparator(currentStateMap, stateModelDef, preferenceList, cache)); // if preference list is not empty, and we do have new intanceToAdd, we // should check if it has capacity to hold the partition. diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java index 9b5b4f9293..49d5fd3249 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/MaintenanceRebalancer.java @@ -77,8 +77,8 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId * -------------------------------------------------------- * newPrefList = [C, B, A] => [A, B, C] */ - Collections.sort(preferenceList, new PreferenceListNodeComparator(stateMap, - stateModelDef, currentIdealState.getPreferenceList(partition.getPartitionName()))); + Collections.sort(preferenceList, new PreferenceListNodeComparator(stateMap, stateModelDef, + currentIdealState.getPreferenceList(partition.getPartitionName()), clusterData)); /** * Sort 2: Sort based on state-priority order: diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestPreferenceListNodeComparatorWithTopologyAware.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestPreferenceListNodeComparatorWithTopologyAware.java new file mode 100644 index 0000000000..151f77cfba --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestPreferenceListNodeComparatorWithTopologyAware.java @@ -0,0 +1,229 @@ +package org.apache.helix.controller.rebalancer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.OnlineOfflineSMD; +import org.apache.helix.model.Partition; +import org.apache.helix.model.StateModelDefinition; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestPreferenceListNodeComparatorWithTopologyAware extends ZkTestBase { + static final String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster"; + protected ClusterControllerManager _controller; + protected List _participants = new ArrayList<>(); + static final List ZONES = Arrays.asList("zone-0", "zone-1", "zone-2"); + private Map> _zoneToInstanceMap = new HashMap<>(); + static final int NUM_NODES_PER_ZONE = 3; + protected static final String ZONE = "zone"; + protected static final String HOST = "host"; + protected static final String LOGICAL_ID = "logicalId"; + protected static final String TOPOLOGY = String.format("%s/%s/%s", ZONE, HOST, LOGICAL_ID); + + + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + TestHelper.getTestClassName() + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (String zoneId : ZONES) { + for (int j = 0; j < NUM_NODES_PER_ZONE; j++) { + String participantName = PARTICIPANT_PREFIX + "_" + zoneId + "_" + j; + InstanceConfig instanceConfig = new InstanceConfig.Builder().setDomain( + String.format("%s=%s, %s=%s, %s=%s", ZONE, zoneId, HOST, participantName, LOGICAL_ID, + UUID.randomUUID())).setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE) + .build(participantName); + + _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, instanceConfig); + _gSetupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, participantName, zoneId); + // start dummy participants + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName); + participant.syncStart(); + _participants.add(participant); + + if (!_zoneToInstanceMap.containsKey(zoneId)) { + _zoneToInstanceMap.put(zoneId, new ArrayList<>()); + } + _zoneToInstanceMap.get(zoneId).add(participantName); + } + } + + enableTopologyAwareRebalance(); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + } + + @Test + public void testPrefrenceListNodeComparator() { + ResourceControllerDataProvider cache = new ResourceControllerDataProvider(CLUSTER_NAME); + cache.refresh(_controller.getHelixDataAccessor()); + StateModelDefinition stateModelDef = cache.getStateModelDef(OnlineOfflineSMD.name); + + // No nodes in preference list, so mz will be considered when sorting + List preferenceList = Collections.emptyList(); + + // All nodes have same state, so state comparator will return 0 if mz representation is identical + Map currentStateMap = new HashMap<>(); + currentStateMap.put(_zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + currentStateMap.put(_zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + currentStateMap.put(_zoneToInstanceMap.get(ZONES.get(1)).get(1), "ONLINE"); + currentStateMap.put(_zoneToInstanceMap.get(ZONES.get(1)).get(2), "ONLINE"); + currentStateMap.put(_zoneToInstanceMap.get(ZONES.get(2)).get(0), "ONLINE"); + + AbstractRebalancer.PreferenceListNodeComparator comparator = + new AbstractRebalancer.PreferenceListNodeComparator(currentStateMap, stateModelDef, preferenceList, cache); + + // Replicas in zone 0 should have higher priority than nodes in zone 1, because zone 1 has more replicas + Assert.assertTrue(comparator.compare(_zoneToInstanceMap.get(ZONES.get(0)).get(0), + _zoneToInstanceMap.get(ZONES.get(1)).get(0)) < 0); + Assert.assertTrue(comparator.compare(_zoneToInstanceMap.get(ZONES.get(0)).get(0), + _zoneToInstanceMap.get(ZONES.get(1)).get(1)) < 0); + // Similarly, replicas in zone 1 should have lower priority than nodes in zone 2 + Assert.assertTrue(comparator.compare(_zoneToInstanceMap.get(ZONES.get(1)).get(0), + _zoneToInstanceMap.get(ZONES.get(2)).get(0)) > 0); + Assert.assertTrue(comparator.compare(_zoneToInstanceMap.get(ZONES.get(1)).get(1), + _zoneToInstanceMap.get(ZONES.get(2)).get(0)) > 0); + // Replicas in the same zone should have equal priority + // Technically this gets ordered by state priority, but all states are the same, so it will also return 0 + Assert.assertEquals(comparator.compare(_zoneToInstanceMap.get(ZONES.get(1)).get(0), + _zoneToInstanceMap.get(ZONES.get(1)).get(1)), 0); + Assert.assertEquals(comparator.compare(_zoneToInstanceMap.get(ZONES.get(1)).get(1), + _zoneToInstanceMap.get(ZONES.get(1)).get(2)), 0); + // Replicas in zones with equal replica count should have equal priority + Assert.assertEquals(comparator.compare(_zoneToInstanceMap.get(ZONES.get(0)).get(0), + _zoneToInstanceMap.get(ZONES.get(2)).get(0)), 0); + } + + @Test + public void testComputeBestPossibleStateForPartition() { + String resourceName = "testResource"; + Partition partition = new Partition("testPartition"); + ResourceControllerDataProvider cache = new ResourceControllerDataProvider(CLUSTER_NAME); + cache.refresh(_controller.getHelixDataAccessor()); + Set liveInstances = cache.getLiveInstances().keySet(); + StateModelDefinition stateModelDef = cache.getStateModelDef(OnlineOfflineSMD.name); + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + Set disabledInstancesForPartition = new HashSet<>(); + IdealState idealState = new IdealState(resourceName); + idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + idealState.setRebalancerClassName(WagedRebalancer.class.getName()); + idealState.setReplicas("3"); + idealState.setMinActiveReplicas(2); + + // Create current state with 4 instances, 1 in zone-0, 1 in zone-1, 2 in zone-2 + // The instance in zone-0 is in the preference list + // The first instance in zone-1 is in the preference list, the second is not + // The instance in zone-2 is not in the preference list + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(1)).get(1), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(2)).get(1), "ONLINE"); + + // Preference list contains first node from each zone (2 current states are not in preference list) + List preferenceList = Arrays.asList(_zoneToInstanceMap.get(ZONES.get(0)).get(0), + _zoneToInstanceMap.get(ZONES.get(1)).get(0), _zoneToInstanceMap.get(ZONES.get(2)).get(0)); + + // Should preferentially drop the replica from zone-2 that is not in the preference list. This is because another + // replica already exists in zone-2 but not in zone-1. + DelayedAutoRebalancer delayedAutoRebalancer = new DelayedAutoRebalancer(); + Map result = delayedAutoRebalancer.computeBestPossibleStateForPartition(liveInstances, stateModelDef, preferenceList, + currentStateOutput, disabledInstancesForPartition, idealState, cache.getClusterConfig(), partition, + cache.getAbnormalStateResolver(OnlineOfflineSMD.name), cache); + + // Zone-1 replica 1 should be dropped + Map expectedPartitionStates = new HashMap<>(); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(1)).get(1), "DROPPED"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(2)).get(1), "ONLINE"); + Assert.assertEquals(result, expectedPartitionStates, "Partition movement different than expected"); + + // Rebuild current state to drop the replica in zone-1 that was not in preference list + currentStateOutput = new CurrentStateOutput(); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(2)).get(1), "ONLINE"); + result = delayedAutoRebalancer.computeBestPossibleStateForPartition(liveInstances, stateModelDef, preferenceList, + currentStateOutput, disabledInstancesForPartition, idealState, cache.getClusterConfig(), partition, + cache.getAbnormalStateResolver(OnlineOfflineSMD.name), cache); + + // Zone-2 replica 0 should now be assigned as it's in the preference list + expectedPartitionStates = new HashMap<>(); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(2)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(2)).get(1), "ONLINE"); + Assert.assertEquals(result, expectedPartitionStates, "Partition movement different than expected"); + + // Rebuild current state to assign Zone-2 replica 0 + currentStateOutput = new CurrentStateOutput(); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(2)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(2)).get(1), "ONLINE"); + result = delayedAutoRebalancer.computeBestPossibleStateForPartition(liveInstances, stateModelDef, preferenceList, + currentStateOutput, disabledInstancesForPartition, idealState, cache.getClusterConfig(), partition, + cache.getAbnormalStateResolver(OnlineOfflineSMD.name), cache); + + // Zone-1 replica 1 should be told to drop as it is last replica no in preference list + expectedPartitionStates = new HashMap<>(); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(2)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(2)).get(1), "DROPPED"); + Assert.assertEquals(result, expectedPartitionStates, "Partition movement different than expected"); + + // Rebuild current state to drop Zone-1 replica 1 + currentStateOutput = new CurrentStateOutput(); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + currentStateOutput.setCurrentState(resourceName, partition, _zoneToInstanceMap.get(ZONES.get(2)).get(0), "ONLINE"); + result = delayedAutoRebalancer.computeBestPossibleStateForPartition(liveInstances, stateModelDef, preferenceList, + currentStateOutput, disabledInstancesForPartition, idealState, cache.getClusterConfig(), partition, + cache.getAbnormalStateResolver(OnlineOfflineSMD.name), cache); + + // Expect no movement from the currentState + expectedPartitionStates = new HashMap<>(); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(0)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(1)).get(0), "ONLINE"); + expectedPartitionStates.put(_zoneToInstanceMap.get(ZONES.get(2)).get(0), "ONLINE"); + Assert.assertEquals(result, expectedPartitionStates, "Partition movement different than expected"); + } + + private void enableTopologyAwareRebalance() { + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopology(TOPOLOGY); + clusterConfig.setFaultZoneType(ZONE); + clusterConfig.setTopologyAwareEnabled(true); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } +}