Skip to content

Commit f87d948

Browse files
authored
Improve setInstanceOperation performance (#3017)
This change will switch to parallel/async get on all instance configs, using HelixDataAccessor, and avoid calling findInstancesWithMatchingLogicalId for instance operation transitions where this check is not required.
1 parent e25ac5d commit f87d948

File tree

2 files changed

+142
-46
lines changed

2 files changed

+142
-46
lines changed

helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) {
211211
}
212212

213213
List<InstanceConfig> matchingLogicalIdInstances =
214-
InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName,
214+
InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName,
215215
instanceConfig);
216216
if (matchingLogicalIdInstances.size() > 1) {
217217
throw new HelixException(
@@ -224,7 +224,8 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) {
224224
InstanceConstants.InstanceOperation attemptedInstanceOperation =
225225
instanceConfig.getInstanceOperation().getOperation();
226226
try {
227-
InstanceUtil.validateInstanceOperationTransition(_configAccessor, clusterName, instanceConfig,
227+
InstanceUtil.validateInstanceOperationTransition(_baseDataAccessor, clusterName,
228+
instanceConfig,
228229
InstanceConstants.InstanceOperation.UNKNOWN, attemptedInstanceOperation);
229230
} catch (HelixException e) {
230231
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
@@ -616,7 +617,7 @@ public boolean canCompleteSwap(String clusterName, String instanceName) {
616617
}
617618

618619
List<InstanceConfig> swappingInstances =
619-
InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName,
620+
InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName,
620621
instanceConfig);
621622
if (swappingInstances.size() != 1) {
622623
logger.warn(
@@ -655,7 +656,7 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName,
655656
}
656657

657658
List<InstanceConfig> swappingInstances =
658-
InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName,
659+
InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName,
659660
instanceConfig);
660661
if (swappingInstances.size() != 1) {
661662
logger.warn(

helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java

Lines changed: 137 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,25 @@
2020
*/
2121

2222
import java.util.List;
23-
import java.util.Map;
24-
import java.util.function.Function;
2523
import java.util.stream.Collectors;
2624

25+
import javax.annotation.Nullable;
26+
2727
import com.google.common.collect.ImmutableMap;
2828
import org.apache.helix.AccessOption;
2929
import org.apache.helix.BaseDataAccessor;
3030
import org.apache.helix.ConfigAccessor;
31+
import org.apache.helix.HelixDataAccessor;
3132
import org.apache.helix.HelixException;
3233
import org.apache.helix.PropertyPathBuilder;
3334
import org.apache.helix.constants.InstanceConstants;
35+
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
36+
import org.apache.helix.model.ClusterConfig;
3437
import org.apache.helix.model.ClusterTopologyConfig;
3538
import org.apache.helix.model.HelixConfigScope;
3639
import org.apache.helix.model.InstanceConfig;
3740
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
3841
import org.apache.helix.zookeeper.datamodel.ZNRecord;
39-
import org.apache.helix.zookeeper.zkclient.DataUpdater;
4042

4143
public class InstanceUtil {
4244

@@ -45,27 +47,43 @@ private InstanceUtil() {
4547
}
4648

4749
// Validators for instance operation transitions
48-
private static final Function<List<InstanceConfig>, Boolean> ALWAYS_ALLOWED =
49-
(matchingInstances) -> true;
50-
private static final Function<List<InstanceConfig>, Boolean> ALL_MATCHES_ARE_UNKNOWN =
51-
(matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch(
52-
instance -> instance.getInstanceOperation().getOperation()
53-
.equals(InstanceConstants.InstanceOperation.UNKNOWN));
54-
private static final Function<List<InstanceConfig>, Boolean> ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
55-
(matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch(
56-
instance -> instance.getInstanceOperation().getOperation()
57-
.equals(InstanceConstants.InstanceOperation.UNKNOWN)
58-
|| instance.getInstanceOperation().getOperation()
59-
.equals(InstanceConstants.InstanceOperation.EVACUATE));
60-
private static final Function<List<InstanceConfig>, Boolean> ANY_MATCH_ENABLE_OR_DISABLE =
61-
(matchingInstances) -> !matchingInstances.isEmpty() && matchingInstances.stream().anyMatch(
62-
instance -> instance.getInstanceOperation().getOperation()
63-
.equals(InstanceConstants.InstanceOperation.ENABLE) || instance.getInstanceOperation()
64-
.getOperation().equals(InstanceConstants.InstanceOperation.DISABLE));
50+
private static final InstanceOperationValidator ALWAYS_ALLOWED =
51+
(baseDataAccessor, configAccessor, clusterName, instanceConfig) -> true;
52+
private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN =
53+
(baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
54+
List<InstanceConfig> matchingInstances =
55+
findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName,
56+
instanceConfig);
57+
return matchingInstances.isEmpty() || matchingInstances.stream().allMatch(
58+
instance -> instance.getInstanceOperation().getOperation()
59+
.equals(InstanceConstants.InstanceOperation.UNKNOWN));
60+
};
61+
private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
62+
(baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
63+
List<InstanceConfig> matchingInstances =
64+
findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName,
65+
instanceConfig);
66+
return matchingInstances.isEmpty() || matchingInstances.stream().allMatch(instance ->
67+
instance.getInstanceOperation().getOperation()
68+
.equals(InstanceConstants.InstanceOperation.UNKNOWN)
69+
|| instance.getInstanceOperation().getOperation()
70+
.equals(InstanceConstants.InstanceOperation.EVACUATE));
71+
};
72+
private static final InstanceOperationValidator ANY_MATCH_ENABLE_OR_DISABLE =
73+
(baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
74+
List<InstanceConfig> matchingInstances =
75+
findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName,
76+
instanceConfig);
77+
return !matchingInstances.isEmpty() && matchingInstances.stream().anyMatch(instance ->
78+
instance.getInstanceOperation().getOperation()
79+
.equals(InstanceConstants.InstanceOperation.ENABLE)
80+
|| instance.getInstanceOperation().getOperation()
81+
.equals(InstanceConstants.InstanceOperation.DISABLE));
82+
};
6583

6684
// Validator map for valid instance operation transitions <currentOperation>:<targetOperation>:<validator>
67-
private static final ImmutableMap<InstanceConstants.InstanceOperation, ImmutableMap<InstanceConstants.InstanceOperation, Function<List<InstanceConfig>, Boolean>>>
68-
validInstanceOperationTransitions =
85+
private static final ImmutableMap<InstanceConstants.InstanceOperation, ImmutableMap<InstanceConstants.InstanceOperation, InstanceOperationValidator>>
86+
VALID_INSTANCE_OPERATION_TRANSITIONS =
6987
ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE,
7088
// ENABLE and DISABLE can be set to UNKNOWN when matching instance is in SWAP_IN and set to ENABLE in a transaction.
7189
ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, ALWAYS_ALLOWED,
@@ -100,22 +118,55 @@ private InstanceUtil() {
100118
* @param instanceConfig The current instance configuration
101119
* @param currentOperation The current operation
102120
* @param targetOperation The target operation
121+
* @deprecated Use {@link #validateInstanceOperationTransition(BaseDataAccessor, String, InstanceConfig, InstanceConstants.InstanceOperation, InstanceConstants.InstanceOperation)}
122+
* instead for better performance.
103123
*/
124+
@Deprecated
104125
public static void validateInstanceOperationTransition(ConfigAccessor configAccessor,
105126
String clusterName, InstanceConfig instanceConfig,
106127
InstanceConstants.InstanceOperation currentOperation,
107128
InstanceConstants.InstanceOperation targetOperation) {
108-
// Check if the current operation and target operation are in the valid transitions map
109-
if (!validInstanceOperationTransitions.containsKey(currentOperation)
110-
|| !validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation)) {
129+
130+
validateInstanceOperationTransition(null, configAccessor, clusterName, instanceConfig,
131+
currentOperation, targetOperation);
132+
}
133+
134+
/**
135+
* Validates if the transition from the current operation to the target operation is valid.
136+
*
137+
* @param baseDataAccessor The BaseDataAccessor instance
138+
* @param clusterName The cluster name
139+
* @param instanceConfig The current instance configuration
140+
* @param currentOperation The current operation
141+
* @param targetOperation The target operation
142+
*/
143+
public static void validateInstanceOperationTransition(
144+
BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName,
145+
InstanceConfig instanceConfig,
146+
InstanceConstants.InstanceOperation currentOperation,
147+
InstanceConstants.InstanceOperation targetOperation) {
148+
149+
validateInstanceOperationTransition(baseDataAccessor, null, clusterName, instanceConfig,
150+
currentOperation, targetOperation);
151+
}
152+
153+
private static void validateInstanceOperationTransition(
154+
@Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
155+
@Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig,
156+
InstanceConstants.InstanceOperation currentOperation,
157+
InstanceConstants.InstanceOperation targetOperation) {
158+
ImmutableMap<InstanceConstants.InstanceOperation, InstanceOperationValidator> transitionMap =
159+
VALID_INSTANCE_OPERATION_TRANSITIONS.get(currentOperation);
160+
161+
if (transitionMap == null || !transitionMap.containsKey(targetOperation)) {
111162
throw new HelixException(
112163
"Invalid instance operation transition from " + currentOperation + " to "
113164
+ targetOperation);
114165
}
115166

116-
// Throw exception if the validation fails
117-
if (!validInstanceOperationTransitions.get(currentOperation).get(targetOperation)
118-
.apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName, instanceConfig))) {
167+
InstanceOperationValidator validator = transitionMap.get(targetOperation);
168+
if (validator == null || !validator.validate(baseDataAccessor, configAccessor, clusterName,
169+
instanceConfig)) {
119170
throw new HelixException(
120171
"Failed validation for instance operation transition from " + currentOperation + " to "
121172
+ targetOperation);
@@ -130,6 +181,7 @@ public static void validateInstanceOperationTransition(ConfigAccessor configAcce
130181
* @param instanceConfig The instance configuration to match
131182
* @return A list of matching instances
132183
*/
184+
@Deprecated
133185
public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
134186
ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig) {
135187
String logicalIdKey =
@@ -148,17 +200,57 @@ public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
148200
.collect(Collectors.toList());
149201
}
150202

203+
/**
204+
* Finds the instances that have a matching logical ID with the given instance.
205+
*
206+
* @param clusterName The cluster name
207+
* @param instanceConfig The instance configuration to match
208+
* @return A list of matching instances
209+
*/
210+
public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
211+
BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName,
212+
InstanceConfig instanceConfig) {
213+
HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
214+
215+
ClusterConfig clusterConfig =
216+
helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig());
217+
String logicalIdKey =
218+
ClusterTopologyConfig.createFromClusterConfig(clusterConfig).getEndNodeType();
219+
220+
List<InstanceConfig> instanceConfigs =
221+
helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs(), true);
222+
223+
// Retrieve and filter instances with matching logical ID
224+
return instanceConfigs.stream().filter(potentialInstanceConfig ->
225+
!potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
226+
&& potentialInstanceConfig.getLogicalId(logicalIdKey)
227+
.equals(instanceConfig.getLogicalId(logicalIdKey))).collect(Collectors.toList());
228+
}
229+
230+
private static List<InstanceConfig> findInstancesWithMatchingLogicalId(
231+
@Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
232+
@Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig) {
233+
if (baseDataAccessor == null && configAccessor == null) {
234+
throw new HelixException(
235+
"Both BaseDataAccessor and ConfigAccessor cannot be null at the same time");
236+
}
237+
238+
return baseDataAccessor != null ? findInstancesWithMatchingLogicalId(baseDataAccessor,
239+
clusterName, instanceConfig)
240+
: findInstancesWithMatchingLogicalId(configAccessor, clusterName, instanceConfig);
241+
}
242+
151243
/**
152244
* Sets the instance operation for the given instance.
153245
*
154246
* @param configAccessor The ConfigAccessor instance
155-
* @param baseAccessor The BaseDataAccessor instance
247+
* @param baseDataAccessor The BaseDataAccessor instance
156248
* @param clusterName The cluster name
157249
* @param instanceName The instance name
158250
* @param instanceOperation The instance operation to set
159251
*/
160252
public static void setInstanceOperation(ConfigAccessor configAccessor,
161-
BaseDataAccessor<ZNRecord> baseAccessor, String clusterName, String instanceName,
253+
BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName, String instanceName,
162254
InstanceConfig.InstanceOperation instanceOperation) {
163255
String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
164256

@@ -170,29 +262,32 @@ public static void setInstanceOperation(ConfigAccessor configAccessor,
170262
}
171263

172264
// Validate the instance operation transition
173-
validateInstanceOperationTransition(configAccessor, clusterName, instanceConfig,
265+
validateInstanceOperationTransition(baseDataAccessor, configAccessor, clusterName,
266+
instanceConfig,
174267
instanceConfig.getInstanceOperation().getOperation(),
175268
instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE
176269
: instanceOperation.getOperation());
177270

178271
// Update the instance operation
179-
boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
180-
@Override
181-
public ZNRecord update(ZNRecord currentData) {
182-
if (currentData == null) {
183-
throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
184-
+ ", participant config is null");
185-
}
186-
187-
InstanceConfig config = new InstanceConfig(currentData);
188-
config.setInstanceOperation(instanceOperation);
189-
return config.getRecord();
272+
boolean succeeded = baseDataAccessor.update(path, currentData -> {
273+
if (currentData == null) {
274+
throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
275+
+ ", participant config is null");
190276
}
277+
278+
InstanceConfig config = new InstanceConfig(currentData);
279+
config.setInstanceOperation(instanceOperation);
280+
return config.getRecord();
191281
}, AccessOption.PERSISTENT);
192282

193283
if (!succeeded) {
194284
throw new HelixException(
195285
"Failed to update instance operation. Please check if instance is disabled.");
196286
}
197287
}
288+
289+
private interface InstanceOperationValidator {
290+
boolean validate(@Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
291+
@Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig);
292+
}
198293
}

0 commit comments

Comments
 (0)