Skip to content

Commit

Permalink
[apache/helix] -- Updated Clients to use shorter timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshukandwal committed Dec 13, 2023
1 parent c62bd3b commit c305726
Show file tree
Hide file tree
Showing 17 changed files with 40 additions and 72 deletions.
24 changes: 16 additions & 8 deletions helix-core/src/test/java/org/apache/helix/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
Expand Down Expand Up @@ -218,9 +219,7 @@ private static Method getMethod(String name) {

public static boolean verifyEmptyCurStateAndExtView(String clusterName, String resourceName,
Set<String> instanceNames, String zkAddr) {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
zkClient.setZkSerializer(new ZNRecordSerializer());
HelixZkClient zkClient = createZkClient(zkAddr);

try {
ZKHelixDataAccessor accessor =
Expand Down Expand Up @@ -275,8 +274,7 @@ public static void setupCluster(String clusterName, String zkAddr, int startPort
public static void setupCluster(String clusterName, String zkAddr, int startPort,
String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance) {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
HelixZkClient zkClient = createZkClient(zkAddr);
try {
if (zkClient.exists("/" + clusterName)) {
LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
Expand Down Expand Up @@ -336,9 +334,7 @@ public static void dropCluster(String clusterName, RealmAwareZkClient zkClient,
*/
public static void verifyState(String clusterName, String zkAddr,
Map<String, Set<String>> stateMap, String state) {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
zkClient.setZkSerializer(new ZNRecordSerializer());
HelixZkClient zkClient = createZkClient(zkAddr);

try {
ZKHelixDataAccessor accessor =
Expand Down Expand Up @@ -859,4 +855,16 @@ public static void printZkListeners(HelixZkClient client) throws Exception {
}
LOG.info("}");
}

public static HelixZkClient createZkClient(String zkAddress) {
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig()
.setZkSerializer(new org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer())
.setConnectInitTimeout(1000L)
.setOperationRetryTimeout(1000L);

HelixZkClient.ZkConnectionConfig zkConnectionConfig = new HelixZkClient.ZkConnectionConfig(zkAddress)
.setSessionTimeout(1000);

return DedicatedZkClientFactory.getInstance().buildZkClient(zkConnectionConfig, clientConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.helix.controller.HierarchicalDataHolder;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

Expand All @@ -34,8 +33,7 @@ public class TestHierarchicalDataStore extends ZkUnitTestBase {
})

public void testHierarchicalDataStore() {
_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
_zkClient = TestHelper.createZkClient(ZK_ADDR);

String path = "/tmp/testHierarchicalDataStore";
FileFilter filter = null;
Expand Down
12 changes: 2 additions & 10 deletions helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void beforeSuite() throws Exception {
private static synchronized void startZooKeeper(int i) {
String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
_zkServerMap.computeIfAbsent(zkAddress, ZkTestBase::createZookeeperServer);
_helixZkClientMap.computeIfAbsent(zkAddress, ZkTestBase::createZkClient);
_helixZkClientMap.computeIfAbsent(zkAddress, TestHelper::createZkClient);
_clusterSetupMap.computeIfAbsent(zkAddress, key -> new ClusterSetup(_helixZkClientMap.get(key)));
_baseDataAccessorMap.computeIfAbsent(zkAddress, key -> new ZkBaseDataAccessor(_helixZkClientMap.get(key)));
}
Expand All @@ -200,13 +200,6 @@ private static ZkServer createZookeeperServer(String zkAddress) {
}
}

private static HelixZkClient createZkClient(String zkAddress) {
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
return DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
}

@AfterSuite
public void afterSuite() throws IOException {
// Clean up all JMX objects
Expand Down Expand Up @@ -767,8 +760,7 @@ public EmptyZkVerifier(String clusterName, String resourceName) {
_clusterName = clusterName;
_resourceName = resourceName;

_zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
_zkClient = TestHelper.createZkClient(ZK_ADDR);
_zkClient.setZkSerializer(new ZNRecordSerializer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.helix.controller.rebalancer.TestConstraintRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithmV2;
import org.apache.helix.controller.rebalancer.topology.InstanceNode;
import org.apache.helix.controller.rebalancer.topology.Node;
Expand Down Expand Up @@ -70,9 +69,6 @@ public class TestCardDealingAdjustmentAlgorithmV2 {

@BeforeClass
public void setUpTopology() {
java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
topJavaLogger.setLevel(Level.INFO);

_topology = mock(Topology.class);
LOGGER.info("Default ZONES: " + Arrays.deepToString(DEFAULT_ZONES));
when(_topology.getFaultZones()).thenReturn(createFaultZones(DEFAULT_ZONES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public static void setupCluster(String uniqClusterName, String zkAddr, int numRe
public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
throws Exception {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
HelixZkClient zkClient = TestHelper.createZkClient(ZK_ADDR);

try {
zkClient.setZkSerializer(new ZNRecordSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public class TestZkConnectionLost extends TaskTestBase {
public void beforeClass() throws Exception {
ZkServer zkServer = TestHelper.startZkServer(_zkAddr);
_zkServerRef.set(zkServer);
_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr));
_zkClient = TestHelper.createZkClient(_zkAddr);
_zkClient.setZkSerializer(new ZNRecordSerializer());
_setupTool = new ClusterSetup(_zkClient);
_participants = new MockParticipantManager[_numNodes];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public void beforeClass() throws Exception {
for (int i = 0; i < NUM_ZK; i++) {
String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress));
ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())));
ZK_CLIENT_MAP.put(zkAddress, TestHelper.createZkClient(zkAddress));

// One cluster per ZkServer created
_rawRoutingData.put(zkAddress, Collections.singletonList("/" + CLUSTER_LIST.get(i)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,7 @@ public TaggedZkVerifier(String clusterName, String resourceName, String[] tagged
_taggedNodes = taggedNodes;
_isEmptyAllowed = isEmptyAllowed;

_zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
_zkClient = TestHelper.createZkClient(ZK_ADDR);
_zkClient.setZkSerializer(new ZNRecordSerializer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() {
String className = getShortClassName();

ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
HelixZkClient zkClient = TestHelper.createZkClient(ZK_ADDR);

try {
zkClient.setZkSerializer(serializer);
Expand Down Expand Up @@ -471,8 +470,7 @@ public void testZNRecordStreamingSerializerWriteSizeLimit() throws Exception {
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);

ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
HelixZkClient zkClient = TestHelper.createZkClient(ZK_ADDR);

try {
zkClient.setZkSerializer(serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
@BeforeClass
public void beforeClass() {
// Initialize ZK accessors for testing
HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
HelixZkClient zkClient = TestHelper.createZkClient(ZK_ADDR);
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ public void testSessionExpirationWithSharedZkClient() throws Exception {
String clusterName = className + "_" + methodName;

// init external base data accessor
HelixZkClient sharedZkclient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
HelixZkClient sharedZkclient = TestHelper.createZkClient(ZK_ADDR);
sharedZkclient.setZkSerializer(new ZNRecordSerializer());
ZkBaseDataAccessor<ZNRecord> sharedBaseAccessor = new ZkBaseDataAccessor<>(sharedZkclient);

Expand Down Expand Up @@ -87,8 +86,7 @@ public void testSessionExpirationWithSharedZkClient() throws Exception {
_gZkClient, true);

// dup shared ZkClient
HelixZkClient dupZkclient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
HelixZkClient dupZkclient = TestHelper.createZkClient(ZK_ADDR);

// kill the session to make sure shared zkClient re-installs watcher
final long sessionId = dupZkclient.getSessionId();
Expand Down Expand Up @@ -132,9 +130,7 @@ public void testHappyPathExtOpZkCacheBaseDataAccessor() throws Exception {
String clusterName = className + "_" + methodName;

// init external base data accessor
HelixZkClient extZkclient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
extZkclient.setZkSerializer(new ZNRecordSerializer());
HelixZkClient extZkclient = TestHelper.createZkClient(ZK_ADDR);
ZkBaseDataAccessor<ZNRecord> extBaseAccessor = new ZkBaseDataAccessor<>(extZkclient);

// init zkCacheBaseDataAccessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ public void testZkCacheCallbackExternalOpNoChroot() throws Exception {
String clusterName = className + "_" + methodName;

// init external base data accessor
HelixZkClient zkclient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
zkclient.setZkSerializer(new ZNRecordSerializer());
HelixZkClient zkclient = TestHelper.createZkClient(ZK_ADDR);
ZkBaseDataAccessor<ZNRecord> extBaseAccessor = new ZkBaseDataAccessor<>(zkclient);

// init zkCacheDataAccessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;

import com.google.common.collect.ImmutableList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableList;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
Expand All @@ -60,11 +60,10 @@
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class TestHelixTaskExecutor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
Expand All @@ -55,8 +56,7 @@ public class MockController {
public MockController(String src, String zkServer, String cluster) {
srcName = src;
clusterName = cluster;
client = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkServer));
client = TestHelper.createZkClient(zkServer);
client.setZkSerializer(new ZNRecordSerializer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,17 +215,8 @@ public void beforeSuite()
// TODO: use logging.properties file to config java.util.logging.Logger levels
java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
topJavaLogger.setLevel(Level.WARNING);

HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();

clientConfig.setZkSerializer(new ZNRecordSerializer());
_gZkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);

clientConfig.setZkSerializer(new ZNRecordSerializer());
_gZkClientTestNS = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), clientConfig);

_gZkClient = TestHelper.createZkClient(ZK_ADDR);
_gZkClientTestNS = TestHelper.createZkClient(_zkAddrTestNS);
_gSetupTool = new ClusterSetup(_gZkClient);
_configAccessor = new ConfigAccessor(_gZkClient);
_baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.ws.rs.core.Response;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.helix.TestHelper;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.rest.common.ContextPropertyKeys;
Expand Down Expand Up @@ -108,9 +109,7 @@ public void beforeClass() throws Exception {
_httpClient = HttpClients.createDefault();

// Start zkclient to verify leader election behavior
_zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
_zkClient = TestHelper.createZkClient(_zkAddrTestNS);
}

@AfterClass
Expand Down

0 comments on commit c305726

Please sign in to comment.