diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index f0aab090bb247..6a412112c3f8a 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -233,13 +233,13 @@ private void checkHerder(SourceAndTarget sourceAndTarget) { private void addHerder(SourceAndTarget sourceAndTarget) { log.info("creating herder for {}", sourceAndTarget.toString()); Map workerProps = config.workerConfig(sourceAndTarget); + DistributedConfig distributedConfig = new DistributedConfig(workerProps); String encodedSource = encodePath(sourceAndTarget.source()); String encodedTarget = encodePath(sourceAndTarget.target()); List restNamespace = List.of(encodedSource, encodedTarget); String workerId = generateWorkerId(sourceAndTarget); Plugins plugins = new Plugins(workerProps); plugins.compareAndSwapWithDelegatingLoader(); - DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = distributedConfig.kafkaClusterId(); String clientIdBase = ConnectUtils.clientIdBase(distributedConfig); // Create the admin client to be shared by all backing stores for this herder diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java index 8c0d30b1c992e..5a8bc5e08ae4f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java @@ -114,14 +114,15 @@ public Connect startConnect(Map workerProps) { log.info("Kafka Connect worker initializing ..."); long initStart = time.hiResClockMs(); + T config = createConfig(workerProps); + log.debug("Kafka cluster ID: {}", config.kafkaClusterId()); + WorkerInfo initInfo = new WorkerInfo(); initInfo.logAll(); log.info("Scanning for plugin classes. This might take a moment ..."); Plugins plugins = new Plugins(workerProps); plugins.compareAndSwapWithDelegatingLoader(); - T config = createConfig(workerProps); - log.debug("Kafka cluster ID: {}", config.kafkaClusterId()); RestClient restClient = new RestClient(config);