Skip to content

Commit 8348f7d

Browse files
authored
[FLINK-33977][runtime] Support minimize TM number during downscaling in adaptive scheduler
Also enable this strategy by default via the introduced config option
1 parent 62f6c0f commit 8348f7d

File tree

13 files changed

+438
-57
lines changed

13 files changed

+438
-57
lines changed

docs/layouts/shortcodes/generated/all_jobmanager_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Integer</td>
1515
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
19+
<td style="word-wrap: break-word;">true</td>
20+
<td>Boolean</td>
21+
<td>This parameter defines whether the adaptive scheduler prioritizes using the minimum number of <code class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, this parameter is suitable if <code class="highlighter-rouge">state.backend.local-recovery</code> is not enabled. More details about this configuration are available at <a href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
1925
<td style="word-wrap: break-word;">10 s</td>

docs/layouts/shortcodes/generated/expert_scheduling_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@
6868
<td>Integer</td>
6969
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
7070
</tr>
71+
<tr>
72+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
73+
<td style="word-wrap: break-word;">true</td>
74+
<td>Boolean</td>
75+
<td>This parameter defines whether the adaptive scheduler prioritizes using the minimum number of <code class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, this parameter is suitable if <code class="highlighter-rouge">state.backend.local-recovery</code> is not enabled. More details about this configuration are available at <a href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
76+
</tr>
7177
<tr>
7278
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
7379
<td style="word-wrap: break-word;">10 s</td>

docs/layouts/shortcodes/generated/job_manager_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Integer</td>
1515
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
1616
</tr>
17+
<tr>
18+
<td><h5>jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers</h5></td>
19+
<td style="word-wrap: break-word;">true</td>
20+
<td>Boolean</td>
21+
<td>This parameter defines whether the adaptive scheduler prioritizes using the minimum number of <code class="highlighter-rouge">TaskManagers</code> when scheduling tasks.<br />Note, this parameter is suitable if <code class="highlighter-rouge">state.backend.local-recovery</code> is not enabled. More details about this configuration are available at <a href="https://issues.apache.org/jira/browse/FLINK-33977">FLINK-33977</a>.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
1925
<td style="word-wrap: break-word;">10 s</td>

flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.configuration;
2020

21+
import org.apache.flink.annotation.Experimental;
2122
import org.apache.flink.annotation.Internal;
2223
import org.apache.flink.annotation.PublicEvolving;
2324
import org.apache.flink.annotation.docs.Documentation;
@@ -26,6 +27,7 @@
2627

2728
import java.time.Duration;
2829

30+
import static org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
2931
import static org.apache.flink.configuration.ConfigOptions.key;
3032
import static org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED;
3133
import static org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS;
@@ -678,6 +680,31 @@ public InlineElement getDescription() {
678680
code(SchedulerType.AdaptiveBatch.name()))
679681
.build());
680682

683+
@Experimental
684+
@Documentation.Section({
685+
Documentation.Sections.EXPERT_SCHEDULING,
686+
Documentation.Sections.ALL_JOB_MANAGER
687+
})
688+
public static final ConfigOption<Boolean> SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED =
689+
key("jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers")
690+
.booleanType()
691+
.defaultValue(true)
692+
.withDescription(
693+
Description.builder()
694+
.text(
695+
"This parameter defines whether the adaptive scheduler prioritizes "
696+
+ "using the minimum number of %s when scheduling tasks.",
697+
code("TaskManagers"))
698+
.linebreak()
699+
.text(
700+
"Note, this parameter is suitable if %s is not enabled. "
701+
+ "More details about this configuration are available at %s.",
702+
code(LOCAL_RECOVERY.key()),
703+
link(
704+
"https://issues.apache.org/jira/browse/FLINK-33977",
705+
"FLINK-33977"))
706+
.build());
707+
681708
/** @deprecated Use {@link BatchExecutionOptions#SPECULATIVE_ENABLED}. */
682709
@Deprecated
683710
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.time.Time;
2222
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.DeploymentOptions;
2324
import org.apache.flink.configuration.JobManagerOptions;
2425
import org.apache.flink.core.failure.FailureEnricher;
2526
import org.apache.flink.runtime.blob.BlobWriter;
@@ -47,6 +48,8 @@
4748

4849
import org.slf4j.Logger;
4950

51+
import javax.annotation.Nullable;
52+
5053
import java.util.Collection;
5154
import java.util.concurrent.Executor;
5255
import java.util.concurrent.ScheduledExecutorService;
@@ -103,7 +106,11 @@ public SchedulerNG createInstance(
103106
jobGraph.getJobID());
104107

105108
final SlotSharingSlotAllocator slotAllocator =
106-
createSlotSharingSlotAllocator(declarativeSlotPool);
109+
createSlotSharingSlotAllocator(
110+
declarativeSlotPool,
111+
jobMasterConfiguration.get(DeploymentOptions.TARGET),
112+
jobMasterConfiguration.get(
113+
JobManagerOptions.SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED));
107114

108115
final ExecutionGraphFactory executionGraphFactory =
109116
new DefaultExecutionGraphFactory(
@@ -145,10 +152,14 @@ public JobManagerOptions.SchedulerType getSchedulerType() {
145152
}
146153

147154
public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
148-
DeclarativeSlotPool declarativeSlotPool) {
155+
DeclarativeSlotPool declarativeSlotPool,
156+
@Nullable String executionTarget,
157+
boolean minimalTaskManagerPreferred) {
149158
return SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
150159
declarativeSlotPool::reserveFreeSlot,
151160
declarativeSlotPool::freeReservedSlot,
152-
declarativeSlotPool::containsFreeSlot);
161+
declarativeSlotPool::containsFreeSlot,
162+
executionTarget,
163+
minimalTaskManagerPreferred);
153164
}
154165
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.scheduler.adaptive.allocator;
20+
21+
import org.apache.flink.runtime.instance.SlotSharingGroupId;
22+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
23+
import org.apache.flink.runtime.jobmaster.SlotInfo;
24+
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
25+
26+
import java.util.Collection;
27+
import java.util.HashMap;
28+
import java.util.HashSet;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Set;
32+
import java.util.stream.Collectors;
33+
34+
import static org.apache.flink.util.Preconditions.checkState;
35+
36+
/** The allocator util class. */
37+
class AllocatorUtil {
38+
39+
private AllocatorUtil() {}
40+
41+
static Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
42+
getSlotSharingGroupMetaInfos(JobInformation jobInformation) {
43+
return SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
44+
}
45+
46+
static int getMinimumRequiredSlots(
47+
Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo>
48+
slotSharingGroupMetaInfos) {
49+
return slotSharingGroupMetaInfos.values().stream()
50+
.map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound)
51+
.reduce(0, Integer::sum);
52+
}
53+
54+
static void checkMinimumRequiredSlots(
55+
JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots) {
56+
final int minimumRequiredSlots =
57+
getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation));
58+
checkState(
59+
freeSlots.size() >= minimumRequiredSlots,
60+
"Not enough slots to allocate all the execution slot sharing groups (have: %s, need: %s)",
61+
freeSlots.size(),
62+
minimumRequiredSlots);
63+
}
64+
65+
static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>
66+
createExecutionSlotSharingGroups(
67+
VertexParallelism vertexParallelism, SlotSharingGroup slotSharingGroup) {
68+
final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>();
69+
slotSharingGroup
70+
.getJobVertexIds()
71+
.forEach(
72+
jobVertexId -> {
73+
int parallelism = vertexParallelism.getParallelism(jobVertexId);
74+
for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) {
75+
sharedSlotToVertexAssignment
76+
.computeIfAbsent(subtaskIdx, ignored -> new HashSet<>())
77+
.add(new ExecutionVertexID(jobVertexId, subtaskIdx));
78+
}
79+
});
80+
return sharedSlotToVertexAssignment.values().stream()
81+
.map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new)
82+
.collect(Collectors.toList());
83+
}
84+
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java

Lines changed: 95 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,60 +18,132 @@
1818

1919
package org.apache.flink.runtime.scheduler.adaptive.allocator;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
2223
import org.apache.flink.runtime.jobmaster.SlotInfo;
2324
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
2425
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
25-
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
26+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
27+
28+
import javax.annotation.Nullable;
2629

2730
import java.util.ArrayList;
2831
import java.util.Collection;
29-
import java.util.HashMap;
30-
import java.util.HashSet;
32+
import java.util.Comparator;
3133
import java.util.Iterator;
3234
import java.util.List;
3335
import java.util.Map;
3436
import java.util.Set;
3537
import java.util.stream.Collectors;
3638

37-
/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups equally. */
39+
import static java.util.function.Function.identity;
40+
import static org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots;
41+
import static org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups;
42+
43+
/**
44+
* Simple {@link SlotAssigner} that treats all slots and slot sharing groups equally. Specifically,
45+
* when the cluster is deployed in application mode and the {@link
46+
* org.apache.flink.configuration.JobManagerOptions#SCHEDULER_PREFER_MINIMAL_TASKMANAGERS_ENABLED}
47+
* is enabled, execution slot sharing groups are preferentially assigned to the minimal number of
48+
* task managers.
49+
*/
3850
public class DefaultSlotAssigner implements SlotAssigner {
3951

52+
@VisibleForTesting static final String APPLICATION_MODE_EXECUTION_TARGET = "embedded";
53+
54+
private final @Nullable String executionTarget;
55+
private final boolean minimalTaskManagerPreferred;
56+
57+
DefaultSlotAssigner(@Nullable String executionTarget, boolean minimalTaskManagerPreferred) {
58+
this.executionTarget = executionTarget;
59+
this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
60+
}
61+
4062
@Override
4163
public Collection<SlotAssignment> assignSlots(
4264
JobInformation jobInformation,
4365
Collection<? extends SlotInfo> freeSlots,
4466
VertexParallelism vertexParallelism,
4567
JobAllocationsInformation previousAllocations) {
46-
List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
68+
checkMinimumRequiredSlots(jobInformation, freeSlots);
69+
70+
final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
4771
for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
4872
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup));
4973
}
5074

51-
Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
75+
final Collection<? extends SlotInfo> pickedSlots =
76+
pickSlotsIfNeeded(allGroups.size(), freeSlots);
77+
78+
Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
5279
Collection<SlotAssignment> assignments = new ArrayList<>();
5380
for (ExecutionSlotSharingGroup group : allGroups) {
5481
assignments.add(new SlotAssignment(iterator.next(), group));
5582
}
5683
return assignments;
5784
}
5885

59-
static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
60-
VertexParallelism vertexParallelism, SlotSharingGroup slotSharingGroup) {
61-
final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>();
62-
slotSharingGroup
63-
.getJobVertexIds()
64-
.forEach(
65-
jobVertexId -> {
66-
int parallelism = vertexParallelism.getParallelism(jobVertexId);
67-
for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) {
68-
sharedSlotToVertexAssignment
69-
.computeIfAbsent(subtaskIdx, ignored -> new HashSet<>())
70-
.add(new ExecutionVertexID(jobVertexId, subtaskIdx));
71-
}
72-
});
73-
return sharedSlotToVertexAssignment.values().stream()
74-
.map(ExecutionSlotSharingGroup::new)
75-
.collect(Collectors.toList());
86+
@VisibleForTesting
87+
Collection<? extends SlotInfo> pickSlotsIfNeeded(
88+
int requestExecutionSlotSharingGroups, Collection<? extends SlotInfo> freeSlots) {
89+
Collection<? extends SlotInfo> pickedSlots = freeSlots;
90+
if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(executionTarget)
91+
&& minimalTaskManagerPreferred
92+
// To avoid the sort-work loading.
93+
&& freeSlots.size() > requestExecutionSlotSharingGroups) {
94+
final Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsPerTaskExecutor =
95+
getSlotsPerTaskExecutor(freeSlots);
96+
pickedSlots =
97+
pickSlotsInMinimalTaskExecutors(
98+
slotsPerTaskExecutor, requestExecutionSlotSharingGroups);
99+
}
100+
return pickedSlots;
101+
}
102+
103+
/**
104+
* In order to minimize the using of task executors at the resource manager side in the
105+
* application-mode and release more task executors in a timely manner, it is a good choice to
106+
* prioritize selecting slots on task executors with the most available slots.
107+
*
108+
* @param slotsPerTaskExecutor The slots per task manager.
109+
* @return The ordered task manager that orders by the number of free slots descending.
110+
*/
111+
private Iterator<TaskManagerLocation> getSortedTaskExecutors(
112+
Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsPerTaskExecutor) {
113+
final Comparator<TaskManagerLocation> taskExecutorComparator =
114+
(leftTml, rightTml) ->
115+
Integer.compare(
116+
slotsPerTaskExecutor.get(rightTml).size(),
117+
slotsPerTaskExecutor.get(leftTml).size());
118+
return slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
119+
}
120+
121+
/**
122+
* Pick the target slots to assign with the requested groups.
123+
*
124+
* @param slotsByTaskExecutor slots per task executor.
125+
* @param requestedGroups the number of the request execution slot sharing groups.
126+
* @return the target slots that are distributed on the minimal task executors.
127+
*/
128+
private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(
129+
Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsByTaskExecutor,
130+
int requestedGroups) {
131+
final List<SlotInfo> pickedSlots = new ArrayList<>();
132+
final Iterator<TaskManagerLocation> sortedTaskExecutors =
133+
getSortedTaskExecutors(slotsByTaskExecutor);
134+
while (pickedSlots.size() < requestedGroups) {
135+
Set<? extends SlotInfo> slotInfos = slotsByTaskExecutor.get(sortedTaskExecutors.next());
136+
pickedSlots.addAll(slotInfos);
137+
}
138+
return pickedSlots;
139+
}
140+
141+
private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> getSlotsPerTaskExecutor(
142+
Collection<? extends SlotInfo> slots) {
143+
return slots.stream()
144+
.collect(
145+
Collectors.groupingBy(
146+
SlotInfo::getTaskManagerLocation,
147+
Collectors.mapping(identity(), Collectors.toSet())));
76148
}
77149
}

0 commit comments

Comments
 (0)