Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
Expand All @@ -27,65 +28,119 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests the finish behaviour of the {@link ExecutionGraph}. */
class ExecutionGraphFinishTest {

/**
* Dedicated single-threaded main-thread executor; using forMainThread() here would run the
* deployment callbacks on the I/O thread and race the test (FLINK-38536).
*/
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> JM_MAIN_THREAD_EXECUTOR_RESOURCE =
TestingUtils.jmMainThreadExecutorExtension();

@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();

@Test
void testJobFinishes() throws Exception {

JobGraph jobGraph =
JobGraphTestUtils.streamingJobGraph(
ExecutionGraphTestUtils.createJobVertex("Task1", 2, NoOpInvokable.class),
ExecutionGraphTestUtils.createJobVertex("Task2", 2, NoOpInvokable.class));

final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
JM_MAIN_THREAD_EXECUTOR_RESOURCE.getExecutor());

SchedulerBase scheduler =
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
.build();

ExecutionGraph eg = scheduler.getExecutionGraph();

scheduler.startScheduling();
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);

Iterator<ExecutionJobVertex> jobVertices = eg.getVerticesTopologically().iterator();

ExecutionJobVertex sender = jobVertices.next();
ExecutionJobVertex receiver = jobVertices.next();

List<ExecutionVertex> senderVertices = Arrays.asList(sender.getTaskVertices());
List<ExecutionVertex> receiverVertices = Arrays.asList(receiver.getTaskVertices());
runInMainThread(mainThreadExecutor, scheduler::startScheduling);
runInMainThread(
mainThreadExecutor, () -> ExecutionGraphTestUtils.switchAllVerticesToRunning(eg));

final ExecutionJobVertex[] orderedVertices =
supplyInMainThread(
mainThreadExecutor,
() -> {
Iterator<ExecutionJobVertex> jobVertices =
eg.getVerticesTopologically().iterator();
return new ExecutionJobVertex[] {
jobVertices.next(), jobVertices.next()
};
});
ExecutionJobVertex sender = orderedVertices[0];
ExecutionJobVertex receiver = orderedVertices[1];

List<ExecutionVertex> senderVertices =
supplyInMainThread(
mainThreadExecutor, () -> Arrays.asList(sender.getTaskVertices()));
List<ExecutionVertex> receiverVertices =
supplyInMainThread(
mainThreadExecutor, () -> Arrays.asList(receiver.getTaskVertices()));

// test getNumExecutionVertexFinished
senderVertices.get(0).getCurrentExecutionAttempt().markFinished();
assertThat(sender.getNumExecutionVertexFinished()).isOne();
assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);

senderVertices.get(1).getCurrentExecutionAttempt().markFinished();
assertThat(sender.getNumExecutionVertexFinished()).isEqualTo(2);
assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
runInMainThread(
mainThreadExecutor,
() -> senderVertices.get(0).getCurrentExecutionAttempt().markFinished());
assertThat(supplyInMainThread(mainThreadExecutor, sender::getNumExecutionVertexFinished))
.isOne();
assertThat(supplyInMainThread(mainThreadExecutor, eg::getState))
.isEqualTo(JobStatus.RUNNING);

runInMainThread(
mainThreadExecutor,
() -> senderVertices.get(1).getCurrentExecutionAttempt().markFinished());
assertThat(supplyInMainThread(mainThreadExecutor, sender::getNumExecutionVertexFinished))
.isEqualTo(2);
assertThat(supplyInMainThread(mainThreadExecutor, eg::getState))
.isEqualTo(JobStatus.RUNNING);

// test job finishes
receiverVertices.get(0).getCurrentExecutionAttempt().markFinished();
receiverVertices.get(1).getCurrentExecutionAttempt().markFinished();
assertThat(eg.getNumFinishedVertices()).isEqualTo(4);
assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
runInMainThread(
mainThreadExecutor,
() -> {
receiverVertices.get(0).getCurrentExecutionAttempt().markFinished();
receiverVertices.get(1).getCurrentExecutionAttempt().markFinished();
});
assertThat(eg.waitUntilTerminal()).isEqualTo(JobStatus.FINISHED);
assertThat(supplyInMainThread(mainThreadExecutor, eg::getNumFinishedVertices)).isEqualTo(4);
assertThat(supplyInMainThread(mainThreadExecutor, eg::getState))
.isEqualTo(JobStatus.FINISHED);
}

/** Runs the action on the JobManager main thread and blocks until it completes. */
private static void runInMainThread(
final ComponentMainThreadExecutor mainThreadExecutor,
final ThrowingRunnable<? extends Throwable> action)
throws Exception {
CompletableFuture.runAsync(ThrowingRunnable.unchecked(action), mainThreadExecutor).get();
}

/** Reads the value on the JobManager main thread and blocks until it is available. */
private static <T> T supplyInMainThread(
final ComponentMainThreadExecutor mainThreadExecutor, final Supplier<T> supplier)
throws Exception {
return CompletableFuture.supplyAsync(supplier, mainThreadExecutor).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
Expand All @@ -27,13 +28,14 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createScheduler;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -47,7 +49,14 @@
* only when the execution graph reaches the successful final state.
*/
class FinalizeOnMasterTest {
private static final Logger LOG = LoggerFactory.getLogger(FinalizeOnMasterTest.class);

/**
* Dedicated single-threaded main-thread executor; using forMainThread() here would run the
* deployment callbacks on the I/O thread and race the test (FLINK-38536).
*/
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> JM_MAIN_THREAD_EXECUTOR_RESOURCE =
TestingUtils.jmMainThreadExecutorExtension();

@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
Expand All @@ -63,31 +72,35 @@ void testFinalizeIsCalledUponSuccess() throws Exception {
vertex2.setInvokableClass(NoOpInvokable.class);
vertex2.setParallelism(2);

final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
JM_MAIN_THREAD_EXECUTOR_RESOURCE.getExecutor());

final SchedulerBase scheduler =
createScheduler(
JobGraphTestUtils.streamingJobGraph(vertex1, vertex2),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor());
scheduler.startScheduling();

final ExecutionGraph eg = scheduler.getExecutionGraph();
if (!eg.getState().equals(JobStatus.RUNNING)) {
ErrorInfo ei = eg.getFailureInfo();
LOG.info("Unexpected state found: Exception as string " + ei.getExceptionAsString());
LOG.info("Unexpected state found: ErrorInfo as string " + ei);
}
assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);

ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
runInMainThread(mainThreadExecutor, scheduler::startScheduling);
assertThat(supplyInMainThread(mainThreadExecutor, eg::getState))
.isEqualTo(JobStatus.RUNNING);

runInMainThread(
mainThreadExecutor, () -> ExecutionGraphTestUtils.switchAllVerticesToRunning(eg));

// move all vertices to finished state
ExecutionGraphTestUtils.finishAllVertices(eg);
runInMainThread(mainThreadExecutor, () -> ExecutionGraphTestUtils.finishAllVertices(eg));
assertThat(eg.waitUntilTerminal()).isEqualTo(JobStatus.FINISHED);

// waitUntilTerminal acts as the barrier, so the assertions below are safe off the main
// thread.
verify(vertex1, times(1)).finalizeOnMaster(any(FinalizeOnMasterContext.class));
verify(vertex2, times(1)).finalizeOnMaster(any(FinalizeOnMasterContext.class));

assertThat(eg.getRegisteredExecutions()).isEmpty();
assertThat(supplyInMainThread(mainThreadExecutor, eg::getRegisteredExecutions)).isEmpty();
}

@Test
Expand All @@ -96,28 +109,55 @@ void testFinalizeIsNotCalledUponFailure() throws Exception {
vertex.setInvokableClass(NoOpInvokable.class);
vertex.setParallelism(1);

final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
JM_MAIN_THREAD_EXECUTOR_RESOURCE.getExecutor());

final SchedulerBase scheduler =
createScheduler(
JobGraphTestUtils.streamingJobGraph(vertex),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor());
scheduler.startScheduling();

final ExecutionGraph eg = scheduler.getExecutionGraph();

assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
runInMainThread(mainThreadExecutor, scheduler::startScheduling);
assertThat(supplyInMainThread(mainThreadExecutor, eg::getState))
.isEqualTo(JobStatus.RUNNING);

ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
runInMainThread(
mainThreadExecutor, () -> ExecutionGraphTestUtils.switchAllVerticesToRunning(eg));

// fail the execution
final Execution exec =
eg.getJobVertex(vertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
exec.fail(new Exception("test"));
runInMainThread(
mainThreadExecutor,
() -> {
final Execution exec =
eg.getJobVertex(vertex.getID())
.getTaskVertices()[0]
.getCurrentExecutionAttempt();
exec.fail(new Exception("test"));
});

assertThat(eg.waitUntilTerminal()).isEqualTo(JobStatus.FAILED);

verify(vertex, times(0)).finalizeOnMaster(any(FinalizeOnMasterContext.class));

assertThat(eg.getRegisteredExecutions()).isEmpty();
assertThat(supplyInMainThread(mainThreadExecutor, eg::getRegisteredExecutions)).isEmpty();
}

/** Runs the action on the JobManager main thread and blocks until it completes. */
private static void runInMainThread(
final ComponentMainThreadExecutor mainThreadExecutor,
final ThrowingRunnable<? extends Throwable> action)
throws Exception {
CompletableFuture.runAsync(ThrowingRunnable.unchecked(action), mainThreadExecutor).get();
}

/** Reads the value on the JobManager main thread and blocks until it is available. */
private static <T> T supplyInMainThread(
final ComponentMainThreadExecutor mainThreadExecutor, final Supplier<T> supplier)
throws Exception {
return CompletableFuture.supplyAsync(supplier, mainThreadExecutor).get();
}
}