Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/trunk' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
LoganZhuZzz committed Jun 28, 2024
2 parents be9f534 + fede2a7 commit 87fa82e
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ public static Map<Integer, List<Integer>> assignReplicasToBrokers(Collection<Bro
throw new InvalidReplicationFactorException("Replication factor: " + replicationFactor + " larger than available brokers: " + brokerMetadatas.size() + ".");
if (brokerMetadatas.stream().noneMatch(b -> b.rack.isPresent()))
return assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.stream().map(b -> b.id).collect(Collectors.toList()), fixedStartIndex,
startPartitionId);
startPartitionId);
else {
return assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId);
startPartitionId);
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ private static Map<Integer, List<Integer>> assignReplicasToBrokersRackAware(int
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size() == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) {
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) {
replicaBuffer.add(broker);
racksWithReplicas.add(rack);
brokersWithReplicas.add(broker);
Expand Down

0 comments on commit 87fa82e

Please sign in to comment.