Skip to content

Commit

Permalink
Bug fix for updating configChanged (#2900)
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-mittal-linkedin authored Sep 23, 2024
1 parent ab47064 commit f8bb6f3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ private void cleanupPropertyStore(String instance) {
boolean configChanged;
// Remove all sealed, stopped, partially sealed and disabled replicas from DataNodeConfig
configChanged = TaskUtils.removeIfPresent(dataNodeConfig.getSealedReplicas());
configChanged = configChanged || TaskUtils.removeIfPresent(dataNodeConfig.getStoppedReplicas());
configChanged = configChanged || TaskUtils.removeIfPresent(dataNodeConfig.getPartiallySealedReplicas());
configChanged = configChanged || TaskUtils.removeIfPresent(dataNodeConfig.getDisabledReplicas());
configChanged = TaskUtils.removeIfPresent(dataNodeConfig.getStoppedReplicas()) || configChanged;
configChanged = TaskUtils.removeIfPresent(dataNodeConfig.getPartiallySealedReplicas()) || configChanged;
configChanged = TaskUtils.removeIfPresent(dataNodeConfig.getDisabledReplicas()) || configChanged;
Map<String, DataNodeConfig.DiskConfig> diskConfigs = dataNodeConfig.getDiskConfigs();

// Remove all replicas for each disk in the DataNodeConfig
for (DataNodeConfig.DiskConfig diskConfig : diskConfigs.values()) {
configChanged = configChanged || TaskUtils.removeIfPresent(diskConfig.getReplicaConfigs());
configChanged = TaskUtils.removeIfPresent(diskConfig.getReplicaConfigs()) || configChanged;
}
// Only set the DatanodeConfig in ZK if it has changed
if (configChanged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,19 @@ public void testPropertyStoreCleanUpTaskInstanceDownAndNotPresentInIdealState()
//Replica should be removed from property store for down host -> localhost_2
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().isEmpty());
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().isEmpty());

//Replica should be present in property store for up hosts -> localhost_1, localhost_3
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));

assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));
}

/**
Expand Down Expand Up @@ -108,12 +115,19 @@ public void testPropertyStoreCleanUpTaskInstanceDownButPresentInIdealState() thr
//Replica should be present in property store for down host -> localhost_2
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));

//Replica should be present in property store for up hosts -> localhost_1, localhost_3
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));

assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));
}

/**
Expand Down Expand Up @@ -144,10 +158,18 @@ public void testPropertyStoreCleanUpTaskAllInstancesUp() throws IOException {
//Replica should be present in property store for all hosts -> localhost_1, localhost_2, localhost_3
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));

assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));

assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));
}

/**
Expand Down Expand Up @@ -180,10 +202,18 @@ public void testPropertyStoreCleanUpTaskDeleteDataFromDataNodeConfigFalse() thro
//Replica should be present in property store for all hosts -> localhost_1, localhost_2, localhost_3
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_1", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));

assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_2", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));

assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk1").getReplicaConfigs().containsKey("partition1"));
assertTrue(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName("localhost_3", PORT))
.getDiskConfigs().get("disk2").getReplicaConfigs().containsKey("partition2"));
}

/**
Expand All @@ -210,8 +240,14 @@ private void testCluster1(HelixManager helixManager, DataNodeConfigSource dataNo
admin.addResource(CLUSTER_NAME, "resource1", idealState);

addReplicasToDisk(dataNodeConfigSource, "localhost_1", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_1", "disk2", "partition2");

addReplicasToDisk(dataNodeConfigSource, "localhost_2", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_2", "disk2", "partition2");

addReplicasToDisk(dataNodeConfigSource, "localhost_3", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_3", "disk2", "partition2");


}

Expand Down Expand Up @@ -239,8 +275,13 @@ private void testCluster2(HelixManager helixManager, DataNodeConfigSource dataNo

// Add replicas to property store
addReplicasToDisk(dataNodeConfigSource, "localhost_1", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_1", "disk2", "partition2");

addReplicasToDisk(dataNodeConfigSource, "localhost_2", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_2", "disk2", "partition2");

addReplicasToDisk(dataNodeConfigSource, "localhost_3", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_3", "disk2", "partition2");
}

/**
Expand All @@ -264,12 +305,20 @@ private void testCluster3(HelixManager helixManager, DataNodeConfigSource dataNo

// Add replicas to property store
addReplicasToDisk(dataNodeConfigSource, "localhost_1", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_1", "disk2", "partition2");

addReplicasToDisk(dataNodeConfigSource, "localhost_2", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_2", "disk2", "partition2");

addReplicasToDisk(dataNodeConfigSource, "localhost_3", "disk1", "partition1");
addReplicasToDisk(dataNodeConfigSource, "localhost_3", "disk2", "partition2");

}

private DataNodeConfig getDataNodeConfig(String host) {
private DataNodeConfig getDataNodeConfig(DataNodeConfigSource dataNodeConfigSource, String host) {
if(dataNodeConfigSource.get(ClusterMapUtils.getInstanceName(host, PORT)) != null) {
return dataNodeConfigSource.get(ClusterMapUtils.getInstanceName(host, PORT));
}
String instanceName = ClusterMapUtils.getInstanceName(host, PORT);
return new DataNodeConfig(instanceName, host, PORT, DC, PORT + 1, PORT + 2, "rack", ClusterMapUtils.DEFAULT_XID);
}
Expand All @@ -283,7 +332,7 @@ private void setDataNodeConfig(DataNodeConfig dataNodeConfig, String disk, Strin

private void addReplicasToDisk(DataNodeConfigSource dataNodeConfigSource, String host, String disk, String partition) {
// Add replicas to property store
DataNodeConfig dataNodeConfig = getDataNodeConfig(host);
DataNodeConfig dataNodeConfig = getDataNodeConfig(dataNodeConfigSource, host);
setDataNodeConfig(dataNodeConfig, disk, partition, dataNodeConfigSource);
}

Expand Down

0 comments on commit f8bb6f3

Please sign in to comment.