Skip to content

Commit b1965ee

Browse files
committed
[FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out
Instead of failing the ExecutionGraph with a generic TimeoutException if a slot request times out, this commit changes the exception to a more meaningful NoResourceAvailableException.
1 parent 0de4ef3 commit b1965ee

File tree

2 files changed

+50
-20
lines changed

2 files changed

+50
-20
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
4242
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
4343
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
44+
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
4445
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
4546
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
4647
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public CompletableFuture<Void> scheduleForExecution(
427428
deploymentFuture.whenComplete(
428429
(Void ignored, Throwable failure) -> {
429430
if (failure != null) {
430-
markFailed(ExceptionUtils.stripCompletionException(failure));
431+
final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure);
432+
final Throwable schedulingFailureCause;
433+
434+
if (stripCompletionException instanceof TimeoutException) {
435+
schedulingFailureCause = new NoResourceAvailableException(
436+
"Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " +
437+
"Please make sure that the cluster has enough resources.");
438+
} else {
439+
schedulingFailureCause = stripCompletionException;
440+
}
441+
442+
markFailed(schedulingFailureCause);
431443
}
432444
});
433445

flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java

+37-19
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws Exception {
101101
}
102102

103103
@Test
104-
public void testHandleJobsWhenNotEnoughSlot() throws Exception {
104+
public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception {
105+
try {
106+
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
107+
fail("Job should fail.");
108+
} catch (JobExecutionException e) {
109+
assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent());
110+
assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent());
111+
assertTrue(findThrowableWithMessage(e, "Slots required: 2, slots allocated: 1").isPresent());
112+
}
113+
}
114+
115+
@Test
116+
public void testHandleBatchJobsWhenNotEnoughSlot() throws Exception {
117+
try {
118+
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.LAZY_FROM_SOURCES);
119+
fail("Job should fail.");
120+
} catch (JobExecutionException e) {
121+
assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent());
122+
assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent());
123+
assertTrue(findThrowableWithMessage(e, "Could not allocate enough slots").isPresent());
124+
}
125+
}
126+
127+
private void setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode scheduleMode) throws Exception {
128+
final JobVertex vertex = new JobVertex("Test Vertex");
129+
vertex.setParallelism(2);
130+
vertex.setMaxParallelism(2);
131+
vertex.setInvokableClass(BlockingNoOpInvokable.class);
132+
133+
final JobGraph jobGraph = new JobGraph("Test Job", vertex);
134+
jobGraph.setScheduleMode(scheduleMode);
135+
136+
runHandleJobsWhenNotEnoughSlots(jobGraph);
137+
}
138+
139+
private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) throws Exception {
105140
final Configuration configuration = getDefaultConfiguration();
106141
configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L);
107142

@@ -114,24 +149,7 @@ public void testHandleJobsWhenNotEnoughSlot() throws Exception {
114149
try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
115150
miniCluster.start();
116151

117-
final JobVertex vertex = new JobVertex("Test Vertex");
118-
vertex.setParallelism(2);
119-
vertex.setMaxParallelism(2);
120-
vertex.setInvokableClass(BlockingNoOpInvokable.class);
121-
122-
final JobGraph jobGraph = new JobGraph("Test Job", vertex);
123-
jobGraph.setScheduleMode(ScheduleMode.EAGER);
124-
125-
try {
126-
miniCluster.executeJobBlocking(jobGraph);
127-
128-
fail("Job should fail.");
129-
} catch (JobExecutionException e) {
130-
assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent());
131-
132-
assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent());
133-
assertTrue(findThrowableWithMessage(e, "Slots required: 2, slots allocated: 1").isPresent());
134-
}
152+
miniCluster.executeJobBlocking(jobGraph);
135153
}
136154
}
137155

0 commit comments

Comments
 (0)