-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Eliminate warnings for AdminUtils #16470
Changes from 4 commits
aec8cde
73bca12
b2e3714
c50d997
6a3b3d5
2d06142
1ad9d29
87e1734
1e35470
6e36429
eaf9a5e
e2ff23c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,9 +143,9 @@ private static Map<Integer, List<Integer>> assignReplicasToBrokersRackUnaware(in | |
int fixedStartIndex, | ||
int startPartitionId) { | ||
Map<Integer, List<Integer>> ret = new HashMap<>(); | ||
int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size()); | ||
int startIndex = determineIndex(fixedStartIndex, brokerList.size()); | ||
int currentPartitionId = Math.max(0, startPartitionId); | ||
int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(brokerList.size()); | ||
int nextReplicaShift = determineIndex(fixedStartIndex, brokerList.size()); | ||
for (int i = 0; i < nPartitions; i++) { | ||
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) | ||
nextReplicaShift += 1; | ||
|
@@ -166,14 +166,14 @@ private static Map<Integer, List<Integer>> assignReplicasToBrokersRackAware(int | |
int fixedStartIndex, | ||
int startPartitionId) { | ||
Map<Integer, String> brokerRackMap = new HashMap<>(); | ||
brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get())); | ||
brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElse(null))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it expected that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Before calling this function, the parameter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How about moving the check into brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.orElseThrow(() -> new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.")))); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
int numRacks = new HashSet<>(brokerRackMap.values()).size(); | ||
List<Integer> arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap); | ||
int numBrokers = arrangedBrokerList.size(); | ||
Map<Integer, List<Integer>> ret = new HashMap<>(); | ||
int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size()); | ||
int startIndex = determineIndex(fixedStartIndex, arrangedBrokerList.size()); | ||
int currentPartitionId = Math.max(0, startPartitionId); | ||
int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size()); | ||
int nextReplicaShift = determineIndex(fixedStartIndex, arrangedBrokerList.size()); | ||
for (int i = 0; i < nPartitions; i++) { | ||
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size() == 0)) | ||
nextReplicaShift += 1; | ||
|
@@ -211,6 +211,20 @@ private static Map<Integer, List<Integer>> assignReplicasToBrokersRackAware(int | |
return ret; | ||
} | ||
|
||
/** | ||
* Determines the index value. | ||
* | ||
* If the provided fixed index value is greater than or equal to 0, returns the fixed index. | ||
* Otherwise, generates a random index between 0 (inclusive) and the broker size (exclusive). | ||
* | ||
* @param fixedIndex the fixed index value, used if it is greater than or equal to 0 | ||
* @param brokerSize the size of the broker list, used as the upper limit for generating a random index | ||
* @return the calculated index value | ||
*/ | ||
private static int determineIndex(int fixedIndex, int brokerSize) { | ||
return fixedIndex >= 0 ? fixedIndex : RAND.nextInt(brokerSize); | ||
} | ||
|
||
/** | ||
* Given broker and rack information, returns a list of brokers alternated by the rack. Assume | ||
* this is the rack and its brokers: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to refactor this function,the original code is straightforward enough