Skip to content

Commit

Permalink
Support persist instance info collected by CloudInstanceInformationPr…
Browse files Browse the repository at this point in the history
…ocessor during auto-reg (apache#2622)

Add support to persist all instance information collected by CloudInstanceInformationProcessor in CloudInstanceInformation object. Add ability for CloudInstanceInformationProcessor to produce full DOMAIN field instead of appending _instanceName unless last character in CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN is '='.
  • Loading branch information
zpinto authored Sep 21, 2023
1 parent 0922e1f commit 045deb6
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.apache.helix.api.cloud;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.collect.ImmutableMap;

/**
* Generic interface for cloud instance information which builds on top of CloudInstanceInformation.
* This interface adds a new method, getAll(), which returns all the key value pairs of a specific cloud instance.
* We call suffix the name of this interface with V2 to preserve backwards compatibility for all classes
* that implement CloudInstanceInformation.
*/
public interface CloudInstanceInformationV2 extends CloudInstanceInformation {
/**
* Get all the key value pairs of a specific cloud instance
* @return A map of all the key value pairs
*/
ImmutableMap<String, String> getAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.api.cloud.CloudInstanceInformationProcessor;
import org.apache.helix.api.cloud.CloudInstanceInformationV2;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
Expand All @@ -58,6 +59,7 @@
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
Expand Down Expand Up @@ -208,25 +210,38 @@ private void joinCluster() {
InstanceConfig instanceConfig;
if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
if (!autoJoin) {
throw new HelixException("Initial cluster structure is not set up for instance: "
+ _instanceName + ", instanceType: " + _instanceType);
throw new HelixException(
"Initial cluster structure is not set up for instance: " + _instanceName
+ ", instanceType: " + _instanceType);
}

InstanceConfig.Builder instanceConfigBuilder =
_helixManagerProperty.getDefaultInstanceConfigBuilder();
if (!autoRegistration) {
LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
instanceConfig =
_helixManagerProperty.getDefaultInstanceConfigBuilder().build(_instanceName);
instanceConfig = instanceConfigBuilder.build(_instanceName);
} else {
LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName);
CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation();
String domain = cloudInstanceInformation.get(
CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()) + _instanceName;
instanceConfig =
_helixManagerProperty.getDefaultInstanceConfigBuilder().build(_instanceName);
instanceConfig.setDomain(domain);
if (cloudInstanceInformation instanceof CloudInstanceInformationV2) {
CloudInstanceInformationV2 cloudInstanceInformationV2 =
(CloudInstanceInformationV2) cloudInstanceInformation;
cloudInstanceInformationV2.getAll().forEach(instanceConfigBuilder::addInstanceInfo);
}

String cloudInstanceInformationFaultDomain = cloudInstanceInformation.get(
CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name());
instanceConfig = instanceConfigBuilder.setDomain(
// Previously, the FAULT_DOMAIN was expected to end with the final DOMAIN field key without a value,
// like "rack=25, host=" or "cabinet=A, rack=25, host=". This is because ParticipantManager would append
// the _instanceName to populate the value. This check has been added to preserve backwards compatibility
// while also allowing the auto-registration to construct the full DOMAIN which includes the last value.
cloudInstanceInformationFaultDomain.endsWith(ConfigStringUtil.CONCATENATE_CONFIG_JOINER)
? cloudInstanceInformationFaultDomain + _instanceName
: cloudInstanceInformationFaultDomain).build(_instanceName);
}
instanceConfig
.validateTopologySettingInInstanceConfig(_configAccessor.getClusterConfig(_clusterName),
_instanceName);
instanceConfig.validateTopologySettingInInstanceConfig(
_configAccessor.getClusterConfig(_clusterName), _instanceName);
_helixAdmin.addInstance(_clusterName, instanceConfig);
} else {
_configAccessor.getInstanceConfig(_clusterName, _instanceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public enum InstanceConfigProperty {
DOMAIN,
DELAY_REBALANCE_ENABLED,
MAX_CONCURRENT_TASK,
INSTANCE_INFO_MAP,
INSTANCE_CAPACITY_MAP,
TARGET_TASK_THREAD_POOL_SIZE,
INSTANCE_OPERATION
Expand Down Expand Up @@ -607,6 +608,29 @@ public void setTargetTaskThreadPoolSize(int targetTaskThreadPoolSize)
targetTaskThreadPoolSize);
}

/**
* Get the instance information map from the map fields.
* @return data map if it exists, or empty map
*/
public Map<String, String> getInstanceInfoMap() {
Map<String, String> instanceInfoMap =
_record.getMapField(InstanceConfigProperty.INSTANCE_INFO_MAP.name());
return instanceInfoMap != null ? instanceInfoMap : Collections.emptyMap();
}

/**
* Set instanceInfoMap to map of information about the instance that can be used
* to construct the DOMAIN field.
* @param instanceInfoMap Map of information about the instance. ie: { 'rack': 'rack-1', 'host': 'host-1' }
*/
private void setInstanceInfoMap(Map<String, String> instanceInfoMap) {
if (instanceInfoMap == null) {
_record.getMapFields().remove(InstanceConfigProperty.INSTANCE_INFO_MAP.name());
} else {
_record.setMapField(InstanceConfigProperty.INSTANCE_INFO_MAP.name(), instanceInfoMap);
}
}

/**
* Get the instance capacity information from the map fields.
* @return data map if it exists, or empty map
Expand Down Expand Up @@ -748,6 +772,7 @@ public static class Builder {
private int _weight = WEIGHT_NOT_SET;
private List<String> _tags = new ArrayList<>();
private boolean _instanceEnabled = HELIX_ENABLED_DEFAULT_VALUE;
private Map<String, String> _instanceInfoMap;
private Map<String, Integer> _instanceCapacityMap;

/**
Expand Down Expand Up @@ -794,6 +819,10 @@ public InstanceConfig build(String instanceId) {
instanceConfig.setInstanceEnabled(_instanceEnabled);
}

if (_instanceInfoMap != null) {
instanceConfig.setInstanceInfoMap(_instanceInfoMap);
}

if (_instanceCapacityMap != null) {
instanceConfig.setInstanceCapacityMap(_instanceCapacityMap);
}
Expand Down Expand Up @@ -861,6 +890,31 @@ public Builder setInstanceEnabled(boolean instanceEnabled) {
return this;
}

/**
* Set the INSTANCE_INFO_MAP for this instance
* @param instanceInfoMap the instance info map
* @return InstanceConfig.Builder
*/
public Builder setInstanceInfoMap(Map<String, String> instanceInfoMap) {
_instanceInfoMap = instanceInfoMap;
return this;
}

/**
* Add instance info to the INSTANCE_INFO_MAP.
* Only adds if the key does not already exist.
* @param key the key for the information
* @param value the value the information
* @return InstanceConfig.Builder
*/
public Builder addInstanceInfo(String key, String value) {
if (_instanceInfoMap == null) {
_instanceInfoMap = new HashMap<>();
}
_instanceInfoMap.putIfAbsent(key, value);
return this;
}

/**
* Set the capacity map for this instance
* @param instanceCapacityMap the capacity map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public final class ConfigStringUtil {
private static final String CONCATENATE_CONFIG_SPLITTER = ",";
private static final String CONCATENATE_CONFIG_JOINER = "=";
public static final String CONCATENATE_CONFIG_JOINER = "=";

private ConfigStringUtil() {
throw new java.lang.UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,31 @@
* under the License.
*/

import com.google.common.collect.ImmutableMap;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.api.cloud.CloudInstanceInformationV2;

/**
* This is a custom implementation of CloudInstanceInformation. It is used to test the functionality
* of Helix node auto-registration.
*/
public class CustomCloudInstanceInformation implements CloudInstanceInformation {
private final String _faultDomain;
public class CustomCloudInstanceInformation implements CloudInstanceInformationV2 {

public CustomCloudInstanceInformation(String faultDomain) {
_faultDomain = faultDomain;
public static final ImmutableMap<String, String> _cloudInstanceInfo =
ImmutableMap.of(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name(),
"mz=0, host=localhost, container=containerId", "MAINTENANCE_ZONE", "0", "INSTANCE_NAME",
"localhost_something");

public CustomCloudInstanceInformation() {
}

@Override
public String get(String key) {
if (key.equals(CloudInstanceField.FAULT_DOMAIN.name())) {
return _faultDomain;
}
return null;
return _cloudInstanceInfo.get(key);
}

@Override
public ImmutableMap<String, String> getAll() {
return _cloudInstanceInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* It is used to test the functionality of Helix node auto-registration.
*/
public class CustomCloudInstanceInformationProcessor implements CloudInstanceInformationProcessor<String> {

public CustomCloudInstanceInformationProcessor(HelixCloudProperty helixCloudProperty) {
}

Expand All @@ -41,6 +42,6 @@ public List<String> fetchCloudInstanceInformation() {

@Override
public CloudInstanceInformation parseCloudInstanceInformation(List<String> responses) {
return new CustomCloudInstanceInformation("rack=A:123, host=");
return new CustomCloudInstanceInformation();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
Expand Down Expand Up @@ -211,8 +212,11 @@ public void testAutoRegistrationCustomizedFullyQualifiedInfoProcessorPath() thro
// Check that live instance is added and instance config is populated with correct domain.
return null != manager.getHelixDataAccessor()
.getProperty(accessor.keyBuilder().liveInstance(instance5)) && manager.getConfigAccessor()
.getInstanceConfig(CLUSTER_NAME, instance5).getDomainAsString()
.equals("rack=A:123, host=" + instance5);
.getInstanceConfig(CLUSTER_NAME, instance5).getDomainAsString().equals(
CustomCloudInstanceInformation._cloudInstanceInfo.get(
CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()))
&& manager.getConfigAccessor().getInstanceConfig(CLUSTER_NAME, instance5)
.getInstanceInfoMap().equals(CustomCloudInstanceInformation._cloudInstanceInfo);
}, 2000));

autoParticipant.syncStop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -173,11 +174,16 @@ public void testSetTargetTaskThreadPoolSizeIllegalArgument() {

@Test
public void testInstanceConfigBuilder() {

Map<String, String> instanceInfoMap = new HashMap<>();
instanceInfoMap.put("CAGE", "H");
Map<String, Integer> capacityDataMap = ImmutableMap.of("weight1", 1);
InstanceConfig instanceConfig =
new InstanceConfig.Builder().setHostName("testHost").setPort("1234").setDomain("foo=bar")
.setWeight(100).setInstanceEnabled(true).addTag("tag1").addTag("tag2")
.setInstanceEnabled(false).setInstanceCapacityMap(capacityDataMap).build("instance1");
.setInstanceEnabled(false).setInstanceInfoMap(instanceInfoMap)
.addInstanceInfo("CAGE", "G").addInstanceInfo("CABINET", "30")
.setInstanceCapacityMap(capacityDataMap).build("instance1");

Assert.assertEquals(instanceConfig.getId(), "instance1");
Assert.assertEquals(instanceConfig.getHostName(), "testHost");
Expand All @@ -187,6 +193,8 @@ public void testInstanceConfigBuilder() {
Assert.assertTrue(instanceConfig.getTags().contains("tag1"));
Assert.assertTrue(instanceConfig.getTags().contains("tag2"));
Assert.assertFalse(instanceConfig.getInstanceEnabled());
Assert.assertEquals(instanceConfig.getInstanceInfoMap().get("CAGE"), "H");
Assert.assertEquals(instanceConfig.getInstanceInfoMap().get("CABINET"), "30");
Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight1"), Integer.valueOf(1));
}
}

0 comments on commit 045deb6

Please sign in to comment.