Skip to content

Commit

Permalink
Optimize the logic of obtaining parallelism downwards
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Sep 12, 2024
1 parent 745f499 commit a19bca8
Showing 1 changed file with 15 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ protected static int scale(

// Cap parallelism at either maxParallelism(number of key groups or source partitions) or
// parallelism upper limit
int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);

// Apply min/max parallelism
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
Expand All @@ -423,9 +423,8 @@ protected static int scale(
}

// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the adjustableMaxParallelism
// without a
// remainder => data is evenly spread across subtasks
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= adjustableUpperBound; p++) {
if (adjustableMaxParallelism % p == 0) {
return p;
Expand All @@ -437,32 +436,26 @@ protected static int scale(
// When adjust the parallelism after rounding up cannot be evenly divided by source
// numPartitions, Try to find the smallest parallelism that can satisfy the current
// consumption rate.
for (int p = newParallelism; p > parallelismLowerLimit; p--) {
if (numPartitions / p > numPartitions / newParallelism) {
if (numPartitions % p != 0) {
p++;
int finalParallelism = newParallelism;
for (; finalParallelism > parallelismLowerLimit; finalParallelism--) {
if (numPartitions / finalParallelism > numPartitions / newParallelism) {
if (numPartitions % finalParallelism != 0) {
finalParallelism++;
}
consumer.accept(
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
p,
String.format(
"numPartitions : %s,upperBound(maxParallelism or parallelismUpperLimit): %s",
numPartitions, upperBound)));
return p;
break;
}
}

consumer.accept(
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
parallelismLowerLimit,
String.format("parallelismLowerLimit : %s", parallelismLowerLimit)));
return parallelismLowerLimit;
finalParallelism,
String.format(
"numPartitions : %s,upperBound(maxParallelism or parallelismUpperLimit): %s, "
+ "parallelismLowerLimit: %s.",
numPartitions, upperBound, parallelismLowerLimit)));
return finalParallelism;
}

// If parallelism adjustment fails, use originally computed parallelism
Expand Down

0 comments on commit a19bca8

Please sign in to comment.