diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java index d9b2fdfa4dc8..53fb9764e097 100644 --- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java +++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java @@ -130,8 +130,6 @@ public static Map> assignReplicasToBrokers(Collection b.id).collect(Collectors.toList()), fixedStartIndex, startPartitionId); else { - if (brokerMetadatas.stream().anyMatch(b -> !b.rack.isPresent())) - throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment."); return assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId); } @@ -166,7 +164,7 @@ private static Map> assignReplicasToBrokersRackAware(int int fixedStartIndex, int startPartitionId) { Map brokerRackMap = new HashMap<>(); - brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElse(null))); + brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElseThrow(() -> new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")))); int numRacks = new HashSet<>(brokerRackMap.values()).size(); List arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap); int numBrokers = arrangedBrokerList.size();