From fede2a75afbe2ac5e54d7e6e419af3b6206c42e4 Mon Sep 17 00:00:00 2001 From: LoganZhu Date: Fri, 28 Jun 2024 15:51:21 +0800 Subject: [PATCH] MINOR: Eliminate warnings for AdminUtils --- .../main/java/org/apache/kafka/admin/AdminUtils.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java index d9b2fdfa4dc8..8c1d5a5cae1e 100644 --- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java +++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java @@ -128,12 +128,10 @@ public static Map> assignReplicasToBrokers(Collection b.rack.isPresent())) return assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.stream().map(b -> b.id).collect(Collectors.toList()), fixedStartIndex, - startPartitionId); + startPartitionId); else { - if (brokerMetadatas.stream().anyMatch(b -> !b.rack.isPresent())) - throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment."); return assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, - startPartitionId); + startPartitionId); } } @@ -166,7 +164,7 @@ private static Map> assignReplicasToBrokersRackAware(int int fixedStartIndex, int startPartitionId) { Map brokerRackMap = new HashMap<>(); - brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElse(null))); + brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElseThrow(() -> new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")))); int numRacks = new HashSet<>(brokerRackMap.values()).size(); List arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap); int numBrokers = arrangedBrokerList.size(); @@ -196,7 +194,7 @@ private static Map> assignReplicasToBrokersRackAware(int // that do not have any replica, or // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size() == numRacks) - && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) { + && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) { replicaBuffer.add(broker); racksWithReplicas.add(rack); brokersWithReplicas.add(broker);