From aec8cde90092f930be53a14357925d864e2a2a0f Mon Sep 17 00:00:00 2001 From: andrew Date: Thu, 27 Jun 2024 17:48:56 +0800 Subject: [PATCH 1/5] MINOR: Eliminate warnings for AdminUtils --- .../org/apache/kafka/admin/AdminUtils.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java index ad69e1617586..2d32e5803bff 100644 --- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java +++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java @@ -130,7 +130,7 @@ public static Map> assignReplicasToBrokers(Collection b.id).collect(Collectors.toList()), fixedStartIndex, startPartitionId); else { - if (brokerMetadatas.stream().anyMatch(b -> !b.rack.isPresent())) + if (brokerMetadatas.stream().anyMatch(b -> b.rack.isEmpty())) throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment."); return assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId); @@ -143,9 +143,9 @@ private static Map> assignReplicasToBrokersRackUnaware(in int fixedStartIndex, int startPartitionId) { Map> ret = new HashMap<>(); - int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size()); + int startIndex = determineIndex(fixedStartIndex, brokerList.size()); int currentPartitionId = Math.max(0, startPartitionId); - int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size()); + int nextReplicaShift = determineIndex(fixedStartIndex, brokerList.size()); for (int i = 0; i < nPartitions; i++) { if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) nextReplicaShift += 1; @@ -166,14 +166,14 @@ private static Map> assignReplicasToBrokersRackAware(int int fixedStartIndex, int startPartitionId) { Map brokerRackMap = new HashMap<>(); - brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get())); + brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElse(null))); int numRacks = new HashSet<>(brokerRackMap.values()).size(); List arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap); int numBrokers = arrangedBrokerList.size(); Map> ret = new HashMap<>(); - int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size()); + int startIndex = determineIndex(fixedStartIndex, arrangedBrokerList.size()); int currentPartitionId = Math.max(0, startPartitionId); - int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size()); + int nextReplicaShift = determineIndex(fixedStartIndex, arrangedBrokerList.size()); for (int i = 0; i < nPartitions; i++) { if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size() == 0)) nextReplicaShift += 1; @@ -211,6 +211,20 @@ private static Map> assignReplicasToBrokersRackAware(int return ret; } + /** + * Determines the index value. + * + * If the provided fixed index value is greater than or equal to 0, returns the fixed index. + * Otherwise, generates a random index between 0 (inclusive) and the broker size (exclusive). + * + * @param fixedIndex the fixed index value, used if it is greater than or equal to 0 + * @param brokerSize the size of the broker list, used as the upper limit for generating a random index + * @return the calculated index value + */ + private static int determineIndex(int fixedIndex, int brokerSize) { + return fixedIndex >= 0 ? fixedIndex : RAND.nextInt(brokerSize); + } + /** * Given broker and rack information, returns a list of brokers alternated by the rack. Assume * this is the rack and its brokers: From b2e37147e908001de30d30ff2d0ead9dc1c949fb Mon Sep 17 00:00:00 2001 From: andrew Date: Thu, 27 Jun 2024 18:40:34 +0800 Subject: [PATCH 2/5] MINOR: Fix build issues --- .../src/main/java/org/apache/kafka/admin/AdminUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java index 2d32e5803bff..e1faa16a9d1c 100644 --- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java +++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java @@ -130,7 +130,7 @@ public static Map> assignReplicasToBrokers(Collection b.id).collect(Collectors.toList()), fixedStartIndex, startPartitionId); else { - if (brokerMetadatas.stream().anyMatch(b -> b.rack.isEmpty())) + if (brokerMetadatas.stream().anyMatch(b -> !b.rack.isPresent())) throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment."); return assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId); From 6a3b3d5b76a1cb9e16c04518fceeddfb1278c8b8 Mon Sep 17 00:00:00 2001 From: andrew Date: Thu, 27 Jun 2024 20:23:10 +0800 Subject: [PATCH 3/5] MINOR: Original code is straightforward enough, revert and refactor. --- .../org/apache/kafka/admin/AdminUtils.java | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java index e1faa16a9d1c..d9b2fdfa4dc8 100644 --- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java +++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java @@ -143,9 +143,9 @@ private static Map> assignReplicasToBrokersRackUnaware(in int fixedStartIndex, int startPartitionId) { Map> ret = new HashMap<>(); - int startIndex = determineIndex(fixedStartIndex, brokerList.size()); + int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size()); int currentPartitionId = Math.max(0, startPartitionId); - int nextReplicaShift = determineIndex(fixedStartIndex, brokerList.size()); + int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size()); for (int i = 0; i < nPartitions; i++) { if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) nextReplicaShift += 1; @@ -171,9 +171,9 @@ private static Map> assignReplicasToBrokersRackAware(int List arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap); int numBrokers = arrangedBrokerList.size(); Map> ret = new HashMap<>(); - int startIndex = determineIndex(fixedStartIndex, arrangedBrokerList.size()); + int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size()); int currentPartitionId = Math.max(0, startPartitionId); - int nextReplicaShift = determineIndex(fixedStartIndex, arrangedBrokerList.size()); + int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size()); for (int i = 0; i < nPartitions; i++) { if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size() == 0)) nextReplicaShift += 1; @@ -211,20 +211,6 @@ private static Map> assignReplicasToBrokersRackAware(int return ret; } - /** - * Determines the index value. - * - * If the provided fixed index value is greater than or equal to 0, returns the fixed index. - * Otherwise, generates a random index between 0 (inclusive) and the broker size (exclusive). - * - * @param fixedIndex the fixed index value, used if it is greater than or equal to 0 - * @param brokerSize the size of the broker list, used as the upper limit for generating a random index - * @return the calculated index value - */ - private static int determineIndex(int fixedIndex, int brokerSize) { - return fixedIndex >= 0 ? fixedIndex : RAND.nextInt(brokerSize); - } - /** * Given broker and rack information, returns a list of brokers alternated by the rack. Assume * this is the rack and its brokers: From 87e17347ae3d30558b5388f9e0f6d7eac1997155 Mon Sep 17 00:00:00 2001 From: LoganZhu Date: Fri, 28 Jun 2024 18:20:32 +0800 Subject: [PATCH 4/5] MINOR: Revert unrelated changes --- .../src/main/java/org/apache/kafka/admin/AdminUtils.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java index d9b2fdfa4dc8..53fb9764e097 100644 --- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java +++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java @@ -130,8 +130,6 @@ public static Map> assignReplicasToBrokers(Collection b.id).collect(Collectors.toList()), fixedStartIndex, startPartitionId); else { - if (brokerMetadatas.stream().anyMatch(b -> !b.rack.isPresent())) - throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment."); return assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId); } @@ -166,7 +164,7 @@ private static Map> assignReplicasToBrokersRackAware(int int fixedStartIndex, int startPartitionId) { Map brokerRackMap = new HashMap<>(); - brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElse(null))); + brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElseThrow(() -> new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")))); int numRacks = new HashSet<>(brokerRackMap.values()).size(); List arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap); int numBrokers = arrangedBrokerList.size(); From e2ff23cada0ba0b551cbb7105e59d6e1ed28eaaa Mon Sep 17 00:00:00 2001 From: LoganZhu Date: Sun, 30 Jun 2024 13:56:57 +0800 Subject: [PATCH 5/5] MINOR: Add unit test for AdminUtils --- .../unit/kafka/admin/AdminRackAwareTest.scala | 25 ++++++++++++++++++- .../unit/kafka/admin/RackAwareTest.scala | 2 +- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala index 9a3aaa31bc26..015331c1d8e1 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala @@ -18,7 +18,8 @@ package kafka.admin import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.admin.{AdminUtils, BrokerMetadata} -import org.apache.kafka.common.errors.InvalidReplicationFactorException +import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicationFactorException} +import org.apache.kafka.server.common.AdminOperationException import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -225,4 +226,26 @@ class AdminRackAwareTest extends RackAwareTest with Logging { val actualAssignment = CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0, -1)) assertEquals(expectedAssignment, actualAssignment) } + + @Test + def testAssignReplicasToBrokersWithInvalidParameters(): Unit = { + val rackMap = Map(0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> null) + val brokerMetadatas = toBrokerMetadata(rackMap) + + // test 0 partition + assertThrows(classOf[InvalidPartitionsException], + () => AdminUtils.assignReplicasToBrokers(brokerMetadatas, 0, 0, -1, -1)) + + // test 0 replication factor + assertThrows(classOf[InvalidReplicationFactorException], + () => AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0, -1, -1)) + + // test wrong replication factor + assertThrows(classOf[InvalidReplicationFactorException], + () => AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, brokerMetadatas.size() + 1, -1, -1)) + + // test wrong brokerMetadatas + assertThrows(classOf[AdminOperationException], + () => AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, brokerMetadatas.size(), -1, -1)) + } } diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala index 17ce08a39bfe..62df52273741 100644 --- a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala +++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala @@ -81,7 +81,7 @@ trait RackAwareTest { def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): util.Collection[BrokerMetadata] = { val res = rackMap.toSeq.map { case (brokerId, rack) => - new BrokerMetadata(brokerId, Optional.of(rack)) + new BrokerMetadata(brokerId, Optional.ofNullable(rack)) } ++ brokersWithoutRack.map { brokerId => new BrokerMetadata(brokerId, Optional.empty()) }.sortBy(_.id)