-
Notifications
You must be signed in to change notification settings - Fork 405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36192][autocaler] Autocaler supports adjusting the parallelism of source vertex based on the number of partitions in Kafka or pulsars #879
base: main
Are you sure you want to change the base?
Conversation
…of the Source to the number of partitions in kafka or pulsar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huyuanfeng2018 for this PR! A couple suggestions and a comment for my understanding.
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Outdated
Show resolved
Hide resolved
if (numPartitions <= 0) { | ||
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, | ||
// we try to adjust the parallelism such that it divides the maxParallelism without a | ||
// remainder => data is evenly spread across subtasks | ||
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { | ||
if (maxParallelism % p == 0) { | ||
return Tuple2.of(p, Optional.empty()); | ||
} | ||
} | ||
// If parallelism adjustment fails, use originally computed parallelism | ||
return Tuple2.of(newParallelism, Optional.empty()); | ||
} else { | ||
|
||
// When we know the numPartitions at a vertex, | ||
// adjust the parallelism such that it divides the numPartitions without a remainder | ||
// => Data is evenly distributed among subtasks | ||
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) { | ||
if (numPartitions % p == 0) { | ||
return Tuple2.of(p, Optional.empty()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like code duplication to me. Only the for loop termination condition changed. We should be able to pass an argument to the for loop which we set based on the number of partitions.
We may even completely simplify like this:
if (numPartitions <= 0) { | |
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, | |
// we try to adjust the parallelism such that it divides the maxParallelism without a | |
// remainder => data is evenly spread across subtasks | |
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { | |
if (maxParallelism % p == 0) { | |
return Tuple2.of(p, Optional.empty()); | |
} | |
} | |
// If parallelism adjustment fails, use originally computed parallelism | |
return Tuple2.of(newParallelism, Optional.empty()); | |
} else { | |
// When we know the numPartitions at a vertex, | |
// adjust the parallelism such that it divides the numPartitions without a remainder | |
// => Data is evenly distributed among subtasks | |
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) { | |
if (numPartitions % p == 0) { | |
return Tuple2.of(p, Optional.empty()); | |
} | |
} | |
if (numPartitions <= 0) { | |
// No partition information is available, assume numPartitions equals the number of key groups | |
numPartitions = maxParallelism; | |
} | |
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { | |
if (maxParallelism % p == 0) { | |
return Tuple2.of(p, Optional.empty()); | |
} | |
} | |
// If parallelism adjustment fails, use originally computed parallelism | |
return Tuple2.of(newParallelism, Optional.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with respect to p <= maxParallelism / 2
When dealing with inputShipStrategies = hash, maxParallelism = 128, newParallelism = 78, I think newParallelism = 78 is acceptable, because not all tasks have a large state after keyby,
But for consuming kafka's vertex, this becomes unacceptable
Imagine that Kafka with 128 partitions is consumed concurrently by 78 task :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I missed that. I was trying to generalize the two code blocks. How about the following?
if (numPartitions <= 0) { | |
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, | |
// we try to adjust the parallelism such that it divides the maxParallelism without a | |
// remainder => data is evenly spread across subtasks | |
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { | |
if (maxParallelism % p == 0) { | |
return Tuple2.of(p, Optional.empty()); | |
} | |
} | |
// If parallelism adjustment fails, use originally computed parallelism | |
return Tuple2.of(newParallelism, Optional.empty()); | |
} else { | |
// When we know the numPartitions at a vertex, | |
// adjust the parallelism such that it divides the numPartitions without a remainder | |
// => Data is evenly distributed among subtasks | |
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) { | |
if (numPartitions % p == 0) { | |
return Tuple2.of(p, Optional.empty()); | |
} | |
} | |
if (numPartitions <= 0) { | |
upperBound = Math.min(maxParallelism / 2, upperBound); | |
} else { | |
upperBound = Math.min(num_partitions, upperBound); | |
maxParallelism = num_partitions; | |
} | |
for (int p = newParallelism; p <= upperBound; p++) { | |
if (maxParallelism % p == 0) { | |
return Tuple2.of(p, Optional.empty()); | |
} | |
} | |
... | |
// Resource optimization logic follows (if we can't achieve optimal partitioning) | |
// (See review comment below) | |
... | |
// If parallelism adjustment fails, use originally computed parallelism | |
return Tuple2.of(newParallelism, Optional.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I missed that. I was trying to generalize the two code blocks. How about the following?
Thansk, fine with me , However, I do not recommend overriding the values of MaxParallelism and UpperBound, so I added two new variables instead:
- adjustableMaxParallelism( Indicates the MaxParallelism in the adjustment process )
- adjustableUpperBound (Indicates the UpperBound in the adjustment process)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call them like this?
- adjustableMaxParallelism => numKeyGroupsOrPartitions
- adjustableUpperBound => upperBoundForAlignment
Adjustable just doesn't tell someone who is unfamiliar with the code very much.
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit | ||
var message = | ||
String.format( | ||
SCALE_LIMITED_MESSAGE_FORMAT, | ||
vertex, | ||
newParallelism, | ||
parallelismLowerLimit, | ||
String.format("parallelismLowerLimit : %s", parallelismLowerLimit)); | ||
return Tuple2.of(parallelismLowerLimit, Optional.of(message)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this behavior. Why return the lower limit, instead of the originally computed target parallelism? I think we should retain this logic:
// If parallelism adjustment fails, use originally computed parallelism
return newParallelism;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I responded with the specific logic, but I think it's still worth discussing
if (numPartitions / p > numPartitions / newParallelism) { | ||
if (numPartitions % p != 0) { | ||
p += 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why p++
here? Maybe I'm overlooking something but p
already divides numPartitions
without a remainder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me expand on my thoughts on this point:
-
The first thing we need to consider is the p that can be divisible by the number of partitions when
p<upperBound
is satisfied. -
If step 1 fails to obtain the corresponding result, it means that we cannot guarantee the even consumption of the number of partitions when rounding up (due to uneven consumption, bottlenecks will exist in some tasks, so theoretically we get the largest upperBound The processing rate will not increase), at this time we consider taking a minimum value that is comparable to the current processing rate, satisfying newParallelism / partition = p/partition
Here is an example:
numPartitions=35 ,newParallelism=20, upperBound = 30;
step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain the number of partitions that can be consumed evenly. p=30 and p=20 have no change in the consumption rate.
step2:
Since 35/20 = 1 .... 15
, 20 degrees of parallelism cannot consume Kafka evenly, so at this time there will be 15 tasks consuming two partitions,5 task consuming one partitions, so our final result only requires that there are tasks consuming two partition, then our processing rate won’t slow down
That is ( numPartitions / p = 2 && numPartitions % p =0 || numPartitions/ (p-1) =2 && (p-1) % !=0 )
.
So p+=1
here is caused by the indivisible partition obtained when fetching down. It should need +1 to meet the conditions. 35 / 17 = 2
But eventually there will be a task consuming three partitions, so we need to add 17+1, 18 is our final result (17 tasks consume 2 partitions each, 1 task consumes one partition)
step3:
If p is already less than parallelismLowerLimit
during the fetching process, we should directly use parallelismLowerLimit as the final degree of parallelism
However, the above is all theoretical logic. I am not sure whether this will cause some negative effects. For example, the data distribution of the partition itself is uneven. Step 2 may aggravate this phenomenon, so I have reservations about this part of the logic, another approach is to directly return newParallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining in detail. I misread some of the code. It is correct that we need to add +1 when we have found a parallelism which yields a greater value for num_partitions / p
than the initial num_partitions / new_parallelism
because we have found the tipping point where we achieve the most utilization in terms of partitions per task.
I think we should return new_parallelism
if all adaptation logic fails because using a potentially very small configured lower parallelism could make things a lot worse due to resource constraints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining in detail. I misread some of the code. It is correct that we need to add +1 when we have found a parallelism which yields a greater value for
num_partitions / p
than the initialnum_partitions / new_parallelism
because we have found the tipping point where we achieve the most utilization in terms of partitions per task.I think we should return
new_parallelism
if all adaptation logic fails because using a potentially very small configured lower parallelism could make things a lot worse due to resource constraints.
I want to explain the reason for using parallelismLowerLimit
, example:
numPartitions=35 ,newParallelism=24, upperBound = 30, parallelismLowerLimit = 19
Step1 cannot get a result, so it goes to step2, but step2 still cannot get a result because parallelismLowerLimit = 19 and the expected value of step2 is 18, so it will eventually return 19
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, it's probably ok to use the lower limit. As you said, we would already be approaching the limit. Most users will never run into this because they haven't configured a minimum parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm I explained some logic. You can review again when you have time. Thank you very much !
if (numPartitions / p > numPartitions / newParallelism) { | ||
if (numPartitions % p != 0) { | ||
p += 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me expand on my thoughts on this point:
-
The first thing we need to consider is the p that can be divisible by the number of partitions when
p<upperBound
is satisfied. -
If step 1 fails to obtain the corresponding result, it means that we cannot guarantee the even consumption of the number of partitions when rounding up (due to uneven consumption, bottlenecks will exist in some tasks, so theoretically we get the largest upperBound The processing rate will not increase), at this time we consider taking a minimum value that is comparable to the current processing rate, satisfying newParallelism / partition = p/partition
Here is an example:
numPartitions=35 ,newParallelism=20, upperBound = 30;
step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain the number of partitions that can be consumed evenly. p=30 and p=20 have no change in the consumption rate.
step2:
Since 35/20 = 1 .... 15
, 20 degrees of parallelism cannot consume Kafka evenly, so at this time there will be 15 tasks consuming two partitions,5 task consuming one partitions, so our final result only requires that there are tasks consuming two partition, then our processing rate won’t slow down
That is ( numPartitions / p = 2 && numPartitions % p =0 || numPartitions/ (p-1) =2 && (p-1) % !=0 )
.
So p+=1
here is caused by the indivisible partition obtained when fetching down. It should need +1 to meet the conditions. 35 / 17 = 2
But eventually there will be a task consuming three partitions, so we need to add 17+1, 18 is our final result (17 tasks consume 2 partitions each, 1 task consumes one partition)
step3:
If p is already less than parallelismLowerLimit
during the fetching process, we should directly use parallelismLowerLimit as the final degree of parallelism
However, the above is all theoretical logic. I am not sure whether this will cause some negative effects. For example, the data distribution of the partition itself is uneven. Step 2 may aggravate this phenomenon, so I have reservations about this part of the logic, another approach is to directly return newParallelism
if (numPartitions <= 0) { | ||
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, | ||
// we try to adjust the parallelism such that it divides the maxParallelism without a | ||
// remainder => data is evenly spread across subtasks | ||
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { | ||
if (maxParallelism % p == 0) { | ||
return Tuple2.of(p, Optional.empty()); | ||
} | ||
} | ||
// If parallelism adjustment fails, use originally computed parallelism | ||
return Tuple2.of(newParallelism, Optional.empty()); | ||
} else { | ||
|
||
// When we know the numPartitions at a vertex, | ||
// adjust the parallelism such that it divides the numPartitions without a remainder | ||
// => Data is evenly distributed among subtasks | ||
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) { | ||
if (numPartitions % p == 0) { | ||
return Tuple2.of(p, Optional.empty()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with respect to p <= maxParallelism / 2
When dealing with inputShipStrategies = hash, maxParallelism = 128, newParallelism = 78, I think newParallelism = 78 is acceptable, because not all tasks have a large state after keyby,
But for consuming kafka's vertex, this becomes unacceptable
Imagine that Kafka with 128 partitions is consumed concurrently by 78 task :)
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit | ||
var message = | ||
String.format( | ||
SCALE_LIMITED_MESSAGE_FORMAT, | ||
vertex, | ||
newParallelism, | ||
parallelismLowerLimit, | ||
String.format("parallelismLowerLimit : %s", parallelismLowerLimit)); | ||
return Tuple2.of(parallelismLowerLimit, Optional.of(message)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I responded with the specific logic, but I think it's still worth discussing
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Outdated
Show resolved
Hide resolved
6057e8e
to
af88f74
Compare
af88f74
to
745f499
Compare
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Outdated
Show resolved
Hide resolved
a19bca8
to
16b5d4a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huyuanfeng2018 for the contribution, and @mxm for the review!
I left some comments, please take a look in your free time, thanks~
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
Outdated
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
Outdated
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
Outdated
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Outdated
Show resolved
Hide resolved
@@ -345,15 +356,22 @@ private boolean detectIneffectiveScaleUp( | |||
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the | |||
* parallelism for source and keyed vertex such that it divides the maxParallelism without a | |||
* remainder. | |||
* | |||
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the | |||
* number of partitions if a vertex has a known partition count. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* number of partitions if a vertex has a known partition count. | |
* number of source partitions if a source vertex has a known partition count. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Show resolved
Hide resolved
int numKeyGroupsOrPartitions = maxParallelism; | ||
int upperBoundForAlignment; | ||
if (numPartitions <= 0) { | ||
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why maxParallelism / 2
?
Assuming the upstream edge of vertex is keyBy(hash):
- The maxParallelism is 100
- newParallelism is 80
We will use 80 as the result, right? If so, it will meet same issue with source partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but I think not all tasks after hash are in large flink keyedState. In this scenario, miss alignment has almost no impact on the task. I think this is the trade-off between performance and resources that was taken into consideration by the previous logic.
But this becomes less acceptable for consuming kafka or pulsar
@mxm @1996fanrui Maybe we can discuss this logic further
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand, the key group is totally similar with source partition(kafka or plusar). They determine how many partitions or groups a Flink parallelism can consume.
The performance is unbalanced even if without large state. For example, the maxParallelism(number of keyGroups) is 100, and the actual parallelism is 70.
- It means that 30 instances process 2 keyGroups each, and the remaining 40 instances process 2 keyGroups each.
- Assuming that the data of each keyGroup is balanced, the 30 instances processing 2 keyGroups will become the bottleneck of the job.
For this scenario, there is no difference when the parallelism is set to 50 and 99.
IIUC, this situation is exactly the source partition problem you want to solve, and it works exactly the same for keyGroup as well.
Please correct me if anything is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, you are right, the specific entrance is here KeyGroupStreamPartitionerIn this case, We can unify our logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.
Are you asking to expand the source logic introduced here to hash keyed state?
For this scenario, there is no difference when the parallelism is set to 50 and 99.
That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.
In this scenario though, we should not be going down to 50, we probably should be going up to 100. I think the maxParallelism / 2
stems from the idea that the maximum parallelism won't be reached because it is set to a number parallelism <= maxParallelism / 2
which would mean that it doesn't make sense to continue beyond maxParallelism/2 because there aren't more possible divisors. However, when parallelism > maxParallelism / 2
, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism
for the initial parallelism > maxParallelism / 2
. We could just skip this (premature) optimization entirely.
I agree that we should replace the current key alignment logic with the generalized source logic introduced here. Something like this:
final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
numKeyGroupsOrPartitions = maxParallelism;
upperBoundForAlignment = Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2,
upperBound
);
} else {
numKeyGroupsOrPartitions = numSourcePartitions;
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
if (numKeyGroupsOrPartitions % p == 0) {
return p;
}
}
// When adjust the parallelism after rounding up cannot be evenly divided by source
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
// current
// consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
if (numKeyGroupsOrPartitions % p != 0) {
p++;
}
break;
}
}
p = Math.max(p, parallelismLowerLimit);
return p;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @mxm , sorry for the late reply. Because there are too many comments, I missed this one.
@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.
Are you asking to expand the source logic introduced here to hash keyed state?
I'm asking hash keyed state, I don't know why we recommend the result is 80 instead of 100 for hash keyed state case. But I think you have answered my question in this comment.
For this scenario, there is no difference when the parallelism is set to 50 and 99.
That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.
Good point! Hot keys may happen in the flink job. In general, Source partition without data skew.
However, when
parallelism > maxParallelism / 2
, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up tomaxParallelism
for the initialparallelism > maxParallelism / 2
. We could just skip this (premature) optimization entirely.
This is exactly my question, I think we should use maxParallelism
as the final parallelism. (For the above example, 100 instead of 80).
upperBoundForAlignment = Math.min( // Optimize the case where newParallelism <= maxParallelism / 2 newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, upperBound );
Great, this part solved my concern. thank you~
if (numPartitions <= 0) { | ||
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); | ||
} else { | ||
upperBoundForAlignment = Math.min(numPartitions, upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the source vertex respect the maxParallelism
?
If maxParallelism is 100, source has 1000 partition, and upperBound
is 200. The new parallelism may be greater than 100, right?
IIUC, the flink job won't run when parallelism > maxParallelism
even if without key group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upperBound is already the smaller value obtained by comparing parallelismUpperLimit and maxParallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @1996fanrui for your review, I have made some changes to the code, PTAL.
@@ -345,15 +356,22 @@ private boolean detectIneffectiveScaleUp( | |||
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the | |||
* parallelism for source and keyed vertex such that it divides the maxParallelism without a | |||
* remainder. | |||
* | |||
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the | |||
* number of partitions if a vertex has a known partition count. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
int numKeyGroupsOrPartitions = maxParallelism; | ||
int upperBoundForAlignment; | ||
if (numPartitions <= 0) { | ||
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but I think not all tasks after hash are in large flink keyedState. In this scenario, miss alignment has almost no impact on the task. I think this is the trade-off between performance and resources that was taken into consideration by the previous logic.
But this becomes less acceptable for consuming kafka or pulsar
@mxm @1996fanrui Maybe we can discuss this logic further
if (numPartitions <= 0) { | ||
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); | ||
} else { | ||
upperBoundForAlignment = Math.min(numPartitions, upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upperBound is already the smaller value obtained by comparing parallelismUpperLimit and maxParallelism.
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great discussion @huyuanfeng2018 @1996fanrui! I think this is a good opportunity to unify the logic further and to address a gap with the current key alignment logic for hash partitioning.
int numKeyGroupsOrPartitions = maxParallelism; | ||
int upperBoundForAlignment; | ||
if (numPartitions <= 0) { | ||
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.
Are you asking to expand the source logic introduced here to hash keyed state?
For this scenario, there is no difference when the parallelism is set to 50 and 99.
That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.
In this scenario though, we should not be going down to 50, we probably should be going up to 100. I think the maxParallelism / 2
stems from the idea that the maximum parallelism won't be reached because it is set to a number parallelism <= maxParallelism / 2
which would mean that it doesn't make sense to continue beyond maxParallelism/2 because there aren't more possible divisors. However, when parallelism > maxParallelism / 2
, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism
for the initial parallelism > maxParallelism / 2
. We could just skip this (premature) optimization entirely.
I agree that we should replace the current key alignment logic with the generalized source logic introduced here. Something like this:
final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
numKeyGroupsOrPartitions = maxParallelism;
upperBoundForAlignment = Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2,
upperBound
);
} else {
numKeyGroupsOrPartitions = numSourcePartitions;
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
if (numKeyGroupsOrPartitions % p == 0) {
return p;
}
}
// When adjust the parallelism after rounding up cannot be evenly divided by source
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
// current
// consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
if (numKeyGroupsOrPartitions % p != 0) {
p++;
}
break;
}
}
p = Math.max(p, parallelismLowerLimit);
return p;
Thanks @mxm for review,LGTM for these suggestions , I fixed code. |
332795e
to
976e2a9
Compare
final int numKeyGroupsOrPartitions; | ||
final int upperBoundForAlignment; | ||
if (numSourcePartitions <= 0) { | ||
numKeyGroupsOrPartitions = maxParallelism; | ||
upperBoundForAlignment = | ||
Math.min( | ||
// Optimize the case where newParallelism <= maxParallelism / 2 | ||
newParallelism > maxParallelism / 2 | ||
? maxParallelism | ||
: maxParallelism / 2, | ||
upperBound); | ||
} else { | ||
numKeyGroupsOrPartitions = numSourcePartitions; | ||
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize the case where newParallelism <= maxParallelism / 2
Why need this this optimization? Reducing the count of for loop
?
I'm curious why source partition doesn't use this optimization? If both of source and keygroup could use this optimization, does the following code work?
final int numKeyGroupsOrPartitions; | |
final int upperBoundForAlignment; | |
if (numSourcePartitions <= 0) { | |
numKeyGroupsOrPartitions = maxParallelism; | |
upperBoundForAlignment = | |
Math.min( | |
// Optimize the case where newParallelism <= maxParallelism / 2 | |
newParallelism > maxParallelism / 2 | |
? maxParallelism | |
: maxParallelism / 2, | |
upperBound); | |
} else { | |
numKeyGroupsOrPartitions = numSourcePartitions; | |
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound); | |
} | |
var numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions; | |
var upperBoundForAlignment = | |
Math.min( | |
// Optimize the case where newParallelism <= maxParallelism / 2 | |
newParallelism > numKeyGroupsOrPartitions / 2 | |
? numKeyGroupsOrPartitions | |
: numKeyGroupsOrPartitions / 2, | |
upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need this this optimization? Reducing the count of for loop?
Yes precisely. We had this optimization in place before, but it is only valid when newParallelism <= maxParallellism / 2
|
||
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, | ||
// we try to adjust the parallelism such that it divides | ||
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks | |
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks |
if (numKeyGroupsOrPartitions % p == 0) { | ||
return p; | ||
} | ||
} | ||
|
||
// If parallelism adjustment fails, use originally computed parallelism | ||
return newParallelism; | ||
// When adjust the parallelism after rounding up cannot be evenly divided by | ||
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the | ||
// current consumption rate. | ||
int p = newParallelism; | ||
for (; p > 0; p--) { | ||
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) { | ||
if (numKeyGroupsOrPartitions % p != 0) { | ||
p++; | ||
} | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i found our discussion cannot cover all cases during I review this part in detail.
For example: sourcePartition is 199, and new parallelism is 99. IIUC, the final parallelism is 67(every subtask consume 3 source partitions, except for the last subtask), right?
But 100 as the final parallelism makes sense to me(every subtask consume 2 source partitions, except for the last subtask).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow #879 (comment) . I found the current logic isn't perfect even if sourcePartitionNumber
is 200.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good observation, I think there are two opposing ideas in this discussion:
- Reducing the parallelism as much as possible when we can't evenly balance partitions / key groups
- Increasing the parallelism such that we can have the most evenly balanced partitions / key groups, even if we cannot do it perfectly ([FLINK-36192][autocaler] Autocaler supports adjusting the parallelism of source vertex based on the number of partitions in Kafka or pulsars #879 (comment))
Both are good ideas. For the sake of stability, I think we probably want to default to doing (2). I could imagine adding an option for (1) but I'm not sure it should be the default mode.
// we try to adjust the parallelism such that it divides | ||
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks | ||
for (int p = newParallelism; p <= upperBoundForAlignment; p++) { | ||
if (numKeyGroupsOrPartitions % p == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About this comment #879 (comment), I'm thinking whether the following change is more reasonable?
Note: numKeyGroupsOrPartitions / p
means how many source partitions or key groups every subtask consume.
if (numKeyGroupsOrPartitions % p == 0) { | |
if (numKeyGroupsOrPartitions % p == 0 || numKeyGroupsOrPartitions / p < numKeyGroupsOrPartitions / newParallelism) { |
For example: maxParallelism is 200, and new parallelism is 60. (Some subtasks consume 4 keyGroups, the rest of subtask consume 3 keyGroups)
- The final parallelism is 100 based on the main branch code due to we only return p when
maxParallelism % p == 0
. - But I think 67 is more reasonable here. (One subtask consumes 2 key groups. The remaining 66 subtasks, each subtask consumes 3 key groups.)
Also, it's a bit beyond the scope of this PR. I could file a separate PR if you think it makes sense. Of course, it's acceptable to be done at this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense, but it makes the scaling more aggressive and less balanced. If we want to be more conservative, maybe 100 is ok in this scenario, where there is actually a divisor without a remainder. When there isn't, I think what you propose is way better than just using the initially provided parallelism.
In summary, I'm proposing to do a two-step process, similarly as for the partitions, where we first try to find a parallelism that divides the key groups without a remainder, and if that fails we do what you propose.
What is the purpose of the change
Autoscaler adjusts the parallelism of the corresponding vertex according to the number of partitions in Kafka or Pulsar, so that the parallelism is a divisor of the number of partitions.
Brief change log
ScalingMetric.NUM_PARTITIONS
to record partition count of kafka or pulsarorg.apache.flink.autoscaler.JobVertexScaler.scale
: This method also attempts to adjust the parallelism to ensure it aligns well with the number of partitions if a vertex has a known partition countorg.apache.flink.autoscaler.JobVertexScaler.scale
Return exception information that occurs during the adjustment processeventhandler
will handle events where the final degree of parallelism does not meet expectations due to the number of partitions or maxparallelism limitations.Verifying this change
org.apache.flink.autoscaler.JobVertexScalerTest#testNumPartitionsAdjustment
andorg.apache.flink.autoscaler.JobVertexScalerTest#testSendingScalingLimitedEvents
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors
: (no)Documentation