Skip to content

Commit

Permalink
[improve][broker] Support cleanup replication cluster and `allowed …
Browse files Browse the repository at this point in the history
…cluster` when cluster metadata teardown (apache#23561)
  • Loading branch information
Demogorgon314 authored Nov 12, 2024
1 parent d7433d0 commit 0969869
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pulsar;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
Expand All @@ -29,12 +31,18 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.docs.tools.CmdGenerateDocs;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
Expand Down Expand Up @@ -153,12 +161,45 @@ public static void main(String[] args) throws Exception {
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
.configFilePath(arguments.configurationStoreConfigPath)
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
PulsarResources resources = new PulsarResources(metadataStore, configMetadataStore);
// Cleanup replication cluster from all tenants and namespaces
TenantResources tenantResources = resources.getTenantResources();
NamespaceResources namespaceResources = resources.getNamespaceResources();
List<String> tenants = tenantResources.listTenants();
for (String tenant : tenants) {
List<String> namespaces = namespaceResources.listNamespacesAsync(tenant).get();
for (String namespace : namespaces) {
namespaceResources.setPolicies(NamespaceName.get(tenant, namespace), policies -> {
policies.replication_clusters.remove(arguments.cluster);
return policies;
});
}
removeCurrentClusterFromAllowedClusters(tenantResources, tenant, arguments.cluster);
}
try {
resources.getClusterResources().deleteCluster(arguments.cluster);
} catch (MetadataStoreException.NotFoundException ex) {
// Ignore if the cluster does not exist
log.info("Cluster metadata for '{}' does not exist.", arguments.cluster);
}
}

log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
}

private static void removeCurrentClusterFromAllowedClusters(
TenantResources tenantResources, String tenant, String curCluster)
throws MetadataStoreException, InterruptedException, ExecutionException {
Optional<TenantInfo> tenantInfoOptional = tenantResources.getTenant(tenant);
if (tenantInfoOptional.isEmpty()) {
return;
}
tenantResources.updateTenantAsync(tenant, ti -> {
ti.getAllowedClusters().remove(curCluster);
return ti;
}).get();
}

private static CompletableFuture<Void> deleteRecursively(MetadataStore metadataStore, String path) {
return metadataStore.getChildren(path)
.thenCompose(children -> FutureUtil.waitForAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.SortedMap;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.PulsarClusterMetadataTeardown;
Expand Down Expand Up @@ -54,7 +55,7 @@ void cleanup() {
@Test
public void testSetupClusterMetadataAndTeardown() throws Exception {
String[] args1 = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--cluster", "cluster1",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
Expand All @@ -65,7 +66,7 @@ public void testSetupClusterMetadataAndTeardown() throws Exception {
};
PulsarClusterMetadataSetup.main(args1);
SortedMap<String, String> data1 = localZkS.dumpData();
String clusterDataJson = data1.get("/admin/clusters/testReSetupClusterMetadata-cluster");
String clusterDataJson = data1.get("/admin/clusters/cluster1");
assertNotNull(clusterDataJson);
ClusterData clusterData = ObjectMapperFactory
.getMapper()
Expand All @@ -78,13 +79,78 @@ public void testSetupClusterMetadataAndTeardown() throws Exception {
assertFalse(clusterData.isBrokerClientTlsEnabled());

String[] args2 = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--cluster", "cluster1",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
};
PulsarClusterMetadataTeardown.main(args2);
SortedMap<String, String> data2 = localZkS.dumpData();
assertFalse(data2.containsKey("/admin/clusters/testReSetupClusterMetadata-cluster"));
assertFalse(data2.containsKey("/admin/clusters/cluster1"));
}

@Test
public void testSetupMultipleClusterMetadataAndTeardown() throws Exception {
String[] cluster1Args = {
"--cluster", "cluster1",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(cluster1Args);
String[] cluster2Args = {
"--cluster", "cluster2",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
"--web-service-url", "http://127.0.0.1:8081",
"--web-service-url-tls", "https://127.0.0.1:8445",
"--broker-service-url", "pulsar://127.0.0.1:6651",
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6652"
};
PulsarClusterMetadataSetup.main(cluster2Args);
SortedMap<String, String> data1 = localZkS.dumpData();
String clusterDataJson = data1.get("/admin/clusters/cluster1");
assertNotNull(clusterDataJson);
ClusterData clusterData = ObjectMapperFactory
.getMapper()
.reader()
.readValue(clusterDataJson, ClusterData.class);
assertEquals(clusterData.getServiceUrl(), "http://127.0.0.1:8080");
assertEquals(clusterData.getServiceUrlTls(), "https://127.0.0.1:8443");
assertEquals(clusterData.getBrokerServiceUrl(), "pulsar://127.0.0.1:6650");
assertEquals(clusterData.getBrokerServiceUrlTls(), "pulsar+ssl://127.0.0.1:6651");
assertFalse(clusterData.isBrokerClientTlsEnabled());

String[] args2 = {
"--cluster", "cluster1",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
};
PulsarClusterMetadataTeardown.main(args2);
SortedMap<String, String> data2 = localZkS.dumpData();
assertFalse(data2.containsKey("/admin/clusters/cluster1"));
assertTrue(data2.containsKey("/admin/clusters/cluster2"));

assertTrue(data2.containsKey("/admin/policies/public"));
assertFalse(data2.get("/admin/policies/public").contains("cluster1"));
assertTrue(data2.get("/admin/policies/public").contains("cluster2"));

assertTrue(data2.containsKey("/admin/policies/pulsar"));
assertFalse(data2.get("/admin/policies/pulsar").contains("cluster1"));
assertTrue(data2.get("/admin/policies/pulsar").contains("cluster2"));

assertTrue(data2.containsKey("/admin/policies/public/default"));
assertFalse(data2.get("/admin/policies/public/default").contains("cluster1"));
assertTrue(data2.get("/admin/policies/public/default").contains("cluster2"));

assertTrue(data2.containsKey("/admin/policies/pulsar/system"));
assertFalse(data2.get("/admin/policies/pulsar/system").contains("cluster1"));
assertTrue(data2.get("/admin/policies/pulsar/system").contains("cluster2"));
}
}

0 comments on commit 0969869

Please sign in to comment.