applicationIds =
+ BlobUtils.listExistingApplications(storageDir.deref().toPath());
+
+ final long expiryTimeout = System.currentTimeMillis() + cleanupInterval;
+ for (ApplicationID applicationId : applicationIds) {
+ registerApplicationWithExpiry(applicationId, expiryTimeout);
+ }
+ }
+ }
+
+ private void registerApplicationWithExpiry(ApplicationID applicationId, long expiryTimeout) {
+ checkNotNull(applicationId);
+ synchronized (jobRefCounters) {
+ final RefCount refCount =
+ applicationRefCounters.computeIfAbsent(
+ applicationId, ignored -> new RefCount());
+
+ refCount.keepUntil = expiryTimeout;
+ }
+ }
+
/**
* Registers use of job-related BLOBs.
*
* Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls
- * to registerJob(JobID) and {@link #releaseJob(JobID)}.
+ * to registerJob(JobID) and {@link #releaseJob(JobID, ApplicationID)}.
*
* @param jobId ID of the job this blob belongs to
- * @see #releaseJob(JobID)
+ * @see #releaseJob(JobID, ApplicationID)
*/
@Override
- public void registerJob(JobID jobId) {
+ public void registerJob(JobID jobId, ApplicationID applicationId) {
checkNotNull(jobId);
synchronized (jobRefCounters) {
@@ -208,6 +235,17 @@ public void registerJob(JobID jobId) {
ref.keepUntil = -1;
}
++ref.references;
+
+ // also register under application because the job may use application-level blobs
+ RefCount applicationRef = applicationRefCounters.get(applicationId);
+ if (applicationRef == null) {
+ applicationRef = new RefCount();
+ applicationRefCounters.put(applicationId, applicationRef);
+ } else {
+ // reset cleanup timeout
+ applicationRef.keepUntil = -1;
+ }
+ ++applicationRef.references;
}
}
@@ -215,19 +253,21 @@ public void registerJob(JobID jobId) {
* Unregisters use of job-related BLOBs and allow them to be released.
*
* @param jobId ID of the job this blob belongs to
- * @see #registerJob(JobID)
+ * @see #registerJob(JobID, ApplicationID)
*/
@Override
- public void releaseJob(JobID jobId) {
+ public void releaseJob(JobID jobId, ApplicationID applicationId) {
checkNotNull(jobId);
synchronized (jobRefCounters) {
+ String warning =
+ "improper use of releaseJob() without a matching number of registerJob() calls for jobId "
+ + jobId;
+
RefCount ref = jobRefCounters.get(jobId);
if (ref == null || ref.references == 0) {
- log.warn(
- "improper use of releaseJob() without a matching number of registerJob() calls for jobId "
- + jobId);
+ log.warn(warning);
return;
}
@@ -235,6 +275,19 @@ public void releaseJob(JobID jobId) {
if (ref.references == 0) {
ref.keepUntil = System.currentTimeMillis() + cleanupInterval;
}
+
+ // make sure application related data can be cleaned up
+ RefCount applicationRef = applicationRefCounters.get(applicationId);
+
+ if (applicationRef == null || applicationRef.references == 0) {
+ log.warn(warning);
+ return;
+ }
+
+ --applicationRef.references;
+ if (applicationRef.references == 0) {
+ applicationRef.keepUntil = System.currentTimeMillis() + cleanupInterval;
+ }
}
}
@@ -439,6 +492,21 @@ public File getStorageLocation(JobID jobId, BlobKey key) throws IOException {
return BlobUtils.getStorageLocation(storageDir.deref(), jobId, key);
}
+ /**
+ * Returns a file handle to the file associated with the given blob key on the blob server.
+ *
+ * @param applicationId ID of the application this blob belongs to (or null if
+ * job-unrelated)
+ * @param key identifying the file
+ * @return file handle to the file
+ * @throws IOException if creating the directory fails
+ */
+ @VisibleForTesting
+ public File getStorageLocation(ApplicationID applicationId, BlobKey key) throws IOException {
+ checkNotNull(applicationId);
+ return BlobUtils.getStorageLocation(storageDir.deref(), applicationId, key);
+ }
+
/**
* Returns the job reference counters - for testing purposes only!
*
@@ -505,6 +573,56 @@ public void run() {
}
}
}
+
+ // also clean up application blobs
+ runApplicationsCleanup();
+ }
+ }
+ }
+
+ private void runApplicationsCleanup() {
+ Iterator> entryIter =
+ applicationRefCounters.entrySet().iterator();
+ final long currentTimeMillis = System.currentTimeMillis();
+
+ while (entryIter.hasNext()) {
+ Map.Entry entry = entryIter.next();
+ RefCount ref = entry.getValue();
+
+ if (ref.references <= 0 && ref.keepUntil > 0 && currentTimeMillis >= ref.keepUntil) {
+ ApplicationID applicationId = entry.getKey();
+
+ final File localFile =
+ new File(
+ BlobUtils.getStorageLocationPath(
+ storageDir.deref().getAbsolutePath(), applicationId));
+
+ /*
+ * NOTE: normally it is not required to acquire the write lock to delete the job's
+ * storage directory since there should be no one accessing it with the ref
+ * counter being 0 - acquire it just in case, to always be on the safe side
+ */
+ readWriteLock.writeLock().lock();
+
+ boolean success = false;
+ try {
+ FileUtils.deleteDirectory(localFile);
+ success = true;
+ } catch (Throwable t) {
+ log.warn(
+ "Failed to locally delete application directory "
+ + localFile.getAbsolutePath(),
+ t);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
+ // let's only remove this directory from cleanup if the cleanup was
+ // successful
+ // (does not need the write lock)
+ if (success) {
+ entryIter.remove();
+ }
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index d0c0f2242456a..355b264262e38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -51,7 +51,10 @@
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.dispatcher.cleanup.ApplicationResourceCleaner;
+import org.apache.flink.runtime.dispatcher.cleanup.ApplicationResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
+import org.apache.flink.runtime.dispatcher.cleanup.DispatcherApplicationResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
@@ -62,6 +65,9 @@
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
+import org.apache.flink.runtime.highavailability.ApplicationResultEntry;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
@@ -70,6 +76,8 @@
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry;
+import org.apache.flink.runtime.jobmanager.ApplicationWriter;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
@@ -136,7 +144,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -173,6 +180,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint
private final ExecutionPlanWriter executionPlanWriter;
private final JobResultStore jobResultStore;
+ private final ApplicationWriter applicationWriter;
+ private final ApplicationResultStore applicationResultStore;
private final HighAvailabilityServices highAvailabilityServices;
private final GatewayRetriever resourceManagerGatewayRetriever;
@@ -185,6 +194,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint
private final OnMainThreadJobManagerRunnerRegistry jobManagerRunnerRegistry;
+ /**
+ * Map of applications that were suspended in a previous execution attempt, which must be
+ * recovered in the current execution attempt.
+ */
+ private final Map suspendedApplications = new HashMap<>();
+
+ private final Collection recoveredDirtyApplicationResults;
+
/**
* Map of jobs that were suspended in a previous application execution attempt, which may be
* recovered or deprecated in the current execution attempt.
@@ -227,6 +244,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint
private final ResourceCleaner localResourceCleaner;
private final ResourceCleaner globalResourceCleaner;
+ private final ApplicationResourceCleaner applicationResourceCleaner;
+
private final Duration webTimeout;
private final Map jobClientExpiredTimestamp = new HashMap<>();
@@ -243,7 +262,13 @@ public abstract class Dispatcher extends FencedRpcEndpoint
private final Map applications = new HashMap<>();
/** ExecutionGraphInfo for the terminated job whose application is not terminated yet. */
- private final Map> partialExecutionGraphInfoStore =
+ private final Map partialExecutionGraphInfoStore = new HashMap<>();
+
+ /** Futures to control the termination workflow for applications and jobs. */
+ private final Map> jobCreateDirtyResultFutures = new HashMap<>();
+
+ private final Map> jobMarkResultCleanFutures = new HashMap<>();
+ private final Map> applicationCreateDirtyResultFutures =
new HashMap<>();
private final Map> applicationTerminationFutures =
@@ -262,6 +287,8 @@ public Dispatcher(
DispatcherId fencingToken,
Collection recoveredJobs,
Collection recoveredDirtyJobs,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices)
throws Exception {
@@ -270,6 +297,8 @@ public Dispatcher(
fencingToken,
recoveredJobs,
recoveredDirtyJobs,
+ recoveredApplications,
+ recoveredDirtyApplicationResults,
dispatcherBootstrapFactory,
dispatcherServices,
new DefaultJobManagerRunnerRegistry(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY));
@@ -280,6 +309,8 @@ private Dispatcher(
DispatcherId fencingToken,
Collection recoveredJobs,
Collection recoveredDirtyJobs,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices,
JobManagerRunnerRegistry jobManagerRunnerRegistry)
@@ -289,10 +320,13 @@ private Dispatcher(
fencingToken,
recoveredJobs,
recoveredDirtyJobs,
+ recoveredApplications,
+ recoveredDirtyApplicationResults,
dispatcherBootstrapFactory,
dispatcherServices,
jobManagerRunnerRegistry,
- new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices));
+ new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices),
+ new DispatcherApplicationResourceCleanerFactory(dispatcherServices));
}
@VisibleForTesting
@@ -301,13 +335,18 @@ protected Dispatcher(
DispatcherId fencingToken,
Collection recoveredJobs,
Collection recoveredDirtyJobs,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices,
JobManagerRunnerRegistry jobManagerRunnerRegistry,
- ResourceCleanerFactory resourceCleanerFactory)
+ ResourceCleanerFactory resourceCleanerFactory,
+ ApplicationResourceCleanerFactory applicationResourceCleanerFactory)
throws Exception {
super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken);
assertRecoveredJobsAndDirtyJobResults(recoveredJobs, recoveredDirtyJobs);
+ assertRecoveredApplicationsAndDirtyApplicationResults(
+ recoveredApplications, recoveredDirtyApplicationResults);
this.configuration = dispatcherServices.getConfiguration();
this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices();
@@ -319,6 +358,8 @@ protected Dispatcher(
this.failureEnrichers = dispatcherServices.getFailureEnrichers();
this.executionPlanWriter = dispatcherServices.getExecutionPlanWriter();
this.jobResultStore = dispatcherServices.getJobResultStore();
+ this.applicationWriter = dispatcherServices.getApplicationWriter();
+ this.applicationResultStore = dispatcherServices.getApplicationResultStore();
this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
this.ioExecutor = dispatcherServices.getIoExecutor();
@@ -347,6 +388,12 @@ protected Dispatcher(
this.dispatcherBootstrapFactory = checkNotNull(dispatcherBootstrapFactory);
+ this.recoveredDirtyApplicationResults = new ArrayList<>(recoveredDirtyApplicationResults);
+
+ for (AbstractApplication application : recoveredApplications) {
+ this.suspendedApplications.put(application.getApplicationId(), application);
+ }
+
for (ExecutionPlan executionPlan : recoveredJobs) {
final JobID jobId = executionPlan.getJobID();
final ApplicationID applicationId =
@@ -379,6 +426,12 @@ protected Dispatcher(
recoveredJobs.stream().map(ExecutionPlan::getJobID).collect(Collectors.toSet()),
dispatcherServices.getIoExecutor());
+ this.blobServer.retainApplications(
+ recoveredApplications.stream()
+ .map(AbstractApplication::getApplicationId)
+ .collect(Collectors.toSet()),
+ dispatcherServices.getIoExecutor());
+
this.dispatcherCachedOperationsHandler =
new DispatcherCachedOperationsHandler(
dispatcherServices.getOperationCaches(),
@@ -390,6 +443,9 @@ protected Dispatcher(
resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor());
this.globalResourceCleaner =
resourceCleanerFactory.createGlobalResourceCleaner(this.getMainThreadExecutor());
+ this.applicationResourceCleaner =
+ applicationResourceCleanerFactory.createApplicationResourceCleaner(
+ this.getMainThreadExecutor());
this.webTimeout = configuration.get(WebOptions.TIMEOUT);
@@ -428,24 +484,53 @@ public void onStart() throws Exception {
this::onFatalError);
if (dispatcherBootstrap instanceof ApplicationBootstrap) {
- // defer starting recovered jobs, as they might be skipped based on user logic
- internalSubmitApplication(((ApplicationBootstrap) dispatcherBootstrap).getApplication())
- .get();
+ // Application Mode
+ checkState(suspendedApplications.isEmpty());
+ checkState(recoveredDirtyApplicationResults.size() <= 1);
+
+ AbstractApplication application =
+ ((ApplicationBootstrap) dispatcherBootstrap).getApplication();
+ if (!recoveredDirtyApplicationResults.isEmpty()) {
+ // the application is already terminated
+ ApplicationResult applicationResult =
+ recoveredDirtyApplicationResults.iterator().next();
+ checkState(application.getApplicationId() == applicationResult.getApplicationId());
+
+ startApplicationCleanup();
+ } else {
+ // defer starting recovered jobs, as they might be skipped based on user logic
+ internalSubmitApplication(application).get();
+ }
} else {
- // Jobs cannot be associated with applications because applications do not yet exist
- // during recovery. This is a temporary workaround and will be removed once application
- // recovery is supported.
- recoveredDirtyJobResultsByApplicationId.values().stream()
- .flatMap(Collection::stream)
- .forEach(jobResult -> runJobWithCleanupRunner(jobResult, false));
-
- // start recovered jobs by wrapping them into a SingleJobApplication
- for (ExecutionPlan recoveredJob : suspendedJobs.values()) {
- runRecoveredJob(recoveredJob, true);
+ // Session Mode
+ startApplicationCleanup();
+
+ // start suspended applications
+ for (AbstractApplication suspendedApplication : suspendedApplications.values()) {
+ // defer starting recovered jobs, as they might be skipped based on user logic
+ internalSubmitApplication(suspendedApplication).get();
+ }
+
+ // start suspended jobs that do not belong to any application (previously submitted in a
+ // SingleJobApplication) by wrapping them into a SingleJobApplication
+ Iterator> jobIterator =
+ suspendedJobs.entrySet().iterator();
+ while (jobIterator.hasNext()) {
+ Map.Entry entry = jobIterator.next();
+ ExecutionPlan recoveredJob = entry.getValue();
+ ApplicationID applicationId = recoveredJob.getApplicationId().orElse(null);
+
+ if (!suspendedApplications.containsKey(applicationId)) {
+ runRecoveredJob(recoveredJob, true);
+ jobIterator.remove();
+ suspendedJobIdsByApplicationId.remove(applicationId);
+ }
}
- suspendedJobs.clear();
- suspendedJobIdsByApplicationId.clear();
}
+
+ checkState(recoveredDirtyJobResultsByApplicationId.isEmpty());
+ recoveredDirtyApplicationResults.clear();
+ suspendedApplications.clear();
}
private void startDispatcherServices() throws Exception {
@@ -480,6 +565,26 @@ private static void assertRecoveredJobsAndDirtyJobResults(
"There should be no overlap between the recovered ExecutionPlans and the passed dirty JobResults based on their job ID.");
}
+ private static void assertRecoveredApplicationsAndDirtyApplicationResults(
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults) {
+ final Set applicationIdsOfFinishedApplications =
+ recoveredDirtyApplicationResults.stream()
+ .map(ApplicationResult::getApplicationId)
+ .collect(Collectors.toSet());
+
+ final boolean noRecoveredApplicationHasDirtyApplicationResult =
+ recoveredApplications.stream()
+ .noneMatch(
+ recoveredApplication ->
+ applicationIdsOfFinishedApplications.contains(
+ recoveredApplication.getApplicationId()));
+
+ Preconditions.checkArgument(
+ noRecoveredApplicationHasDirtyApplicationResult,
+ "There should be no overlap between the recovered Applications and the passed dirty ApplicationResults based on their application ID.");
+ }
+
/**
* Runs a recovered job in HA mode.
*
@@ -565,6 +670,51 @@ private void runJobWithCleanupRunner(
}
}
+ private void startApplicationCleanup() {
+ for (ApplicationResult applicationResult : recoveredDirtyApplicationResults) {
+ ApplicationID applicationId = applicationResult.getApplicationId();
+ ApplicationState applicationState = applicationResult.getApplicationState();
+
+ Map jobs = new HashMap<>();
+ Collection dirtyJobResults =
+ recoveredDirtyJobResultsByApplicationId.remove(applicationId);
+ if (dirtyJobResults != null) {
+ for (JobResult jobResult : dirtyJobResults) {
+ JobID jobId = jobResult.getJobId();
+ ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createFrom(jobResult, -1));
+ jobs.put(jobId, executionGraphInfo);
+
+ runJobWithCleanupRunner(jobResult, false);
+ }
+ }
+
+ long[] stateTimestamps = new long[ApplicationState.values().length];
+ stateTimestamps[ApplicationState.CREATED.ordinal()] = applicationResult.getStartTime();
+ stateTimestamps[applicationState.ordinal()] = applicationResult.getEndTime();
+
+ ArchivedApplication sparseArchivedApplication =
+ new ArchivedApplication(
+ applicationId,
+ applicationResult.getApplicationName(),
+ applicationState,
+ stateTimestamps,
+ jobs,
+ Collections.emptyList());
+
+ writeToArchivedApplicationStore(sparseArchivedApplication);
+
+ // the dirty result already exists
+ // create a completed future to make sure the jobs can be marked clean
+ applicationCreateDirtyResultFutures.put(
+ applicationId, FutureUtils.completedVoidFuture());
+ applicationTerminationFutures.put(applicationId, new CompletableFuture<>());
+
+ removeApplication(applicationId, jobs.keySet());
+ }
+ }
+
private void handleStartDispatcherServicesException(Exception e) throws Exception {
try {
stopDispatcherServices();
@@ -726,6 +876,21 @@ public CompletableFuture submitApplication(
new DuplicateApplicationSubmissionException(applicationId));
}
+ Optional optionalApplicationStoreEntry =
+ application.getApplicationStoreEntry();
+ if (optionalApplicationStoreEntry.isPresent()) {
+ try {
+ applicationWriter.putApplication(optionalApplicationStoreEntry.get());
+ } catch (Exception e) {
+ String msg =
+ String.format(
+ "Could not persist application %s to the ApplicationStore.",
+ applicationId);
+ log.warn(msg);
+ throw new CompletionException(new RuntimeException(msg, e));
+ }
+ }
+
return internalSubmitApplication(application);
}
@@ -737,6 +902,7 @@ private CompletableFuture internalSubmitApplication(
applications.put(applicationId, application);
application.registerStatusListener(this);
+ applicationCreateDirtyResultFutures.put(applicationId, new CompletableFuture<>());
applicationTerminationFutures.put(applicationId, new CompletableFuture<>());
// cleanup dirty job results by creating a JobManagerRunner during application submission to
@@ -768,94 +934,24 @@ public void notifyApplicationStatusChange(
"The application (" + applicationId + ") has already terminated.");
AbstractApplication application = applications.get(applicationId);
- Set remainingSuspendedJobIds =
- suspendedJobIdsByApplicationId.remove(applicationId);
- if (remainingSuspendedJobIds != null) {
- for (JobID jobId : remainingSuspendedJobIds) {
- final ExecutionPlan executionPlan = suspendedJobs.remove(jobId);
- checkNotNull(executionPlan);
-
- final JobResult jobResult =
- new JobResult.Builder()
- .jobId(jobId)
- .jobStatus(JobStatus.FAILED)
- .netRuntime(0)
- .serializedThrowable(
- new SerializedThrowable(
- new FlinkException(
- "Job recovery is not needed.")))
- .jobName(executionPlan.getName())
- .applicationId(applicationId)
- .build();
- runJobWithCleanupRunner(jobResult);
- }
- }
- log.info(
- "Archiving application ({}) with terminal state {}.", applicationId, newStatus);
- long[] stateTimestamps = new long[ApplicationState.values().length];
- for (ApplicationState applicationState : ApplicationState.values()) {
- final int ordinal = applicationState.ordinal();
- stateTimestamps[ordinal] = application.getStatusTimestamp(applicationState);
- }
+ // Step 1: cleanup remaining suspended jobs for the application
+ cleanupRemainingSuspendedJobs(applicationId);
- List> jobFutures =
- application.getJobs().stream()
- .map(
- jobId -> {
- checkState(
- partialExecutionGraphInfoStore.containsKey(jobId),
- "ExecutionGraphInfo for job %s does not exist.",
- jobId);
- return partialExecutionGraphInfoStore.get(jobId);
- })
- .collect(Collectors.toList());
-
- // wait for all jobs to be stored, then archive the application
- CompletableFuture> applicationArchivingFuture =
- FutureUtils.combineAll(jobFutures)
+ // Step 2: finalize the archived application after all jobs are marked as terminated
+ CompletableFuture applicationArchivingFuture =
+ FutureUtils.waitForAll(
+ application.getJobs().stream()
+ .map(jobCreateDirtyResultFutures::get)
+ .collect(Collectors.toList()))
.thenComposeAsync(
- combinedJobs -> {
- Map jobs = new HashMap<>();
- for (ExecutionGraphInfo executionGraphInfo : combinedJobs) {
- jobs.put(
- executionGraphInfo.getJobId(),
- executionGraphInfo);
- partialExecutionGraphInfoStore.remove(
- executionGraphInfo.getJobId());
- }
-
- // record job exception for SingleJobApplication
- if (application instanceof SingleJobApplication) {
- jobs.values()
- .forEach(
- executionGraphInfo -> {
- ErrorInfo errorInfo =
- executionGraphInfo
- .getArchivedExecutionGraph()
- .getFailureInfo();
- if (errorInfo != null) {
- application
- .addExceptionHistoryEntry(
- errorInfo
- .getException(),
- executionGraphInfo
- .getJobId());
- }
- });
- }
-
- ArchivedApplication archivedApplication =
- new ArchivedApplication(
- application.getApplicationId(),
- application.getName(),
- application.getApplicationStatus(),
- stateTimestamps,
- jobs,
- application.getExceptionHistory());
-
+ ignored -> {
+ final ArchivedApplication archivedApplication =
+ finalizeArchivedApplication(application, newStatus);
applications.remove(applicationId);
+
writeToArchivedApplicationStore(archivedApplication);
+
return historyServerArchivist
.archiveApplication(archivedApplication)
.exceptionally(
@@ -865,16 +961,95 @@ public void notifyApplicationStatusChange(
applicationId,
throwable);
return null;
- });
+ })
+ .thenApply(ack -> archivedApplication);
},
getMainThreadExecutor());
- applicationArchivingFuture.thenRunAsync(
- () -> applicationTerminationFutures.get(applicationId).complete(null),
- getMainThreadExecutor());
+ // Step 3: create a dirty result for the application
+ CompletableFuture> applicationDirtyResultFuture =
+ applicationArchivingFuture.thenCompose(
+ this::registerGloballyTerminatedApplicationInApplicationResultStore);
+
+ // Step 4: perform application cleanup and mark result clean after all cleanup
+ // (including job cleanup) is done
+ applicationDirtyResultFuture.thenCompose(
+ ignored -> removeApplication(applicationId, application.getJobs()));
}
}
+ private void cleanupRemainingSuspendedJobs(ApplicationID applicationId) {
+ Set remainingSuspendedJobIds = suspendedJobIdsByApplicationId.remove(applicationId);
+ if (remainingSuspendedJobIds != null) {
+ for (JobID jobId : remainingSuspendedJobIds) {
+ final ExecutionPlan executionPlan = suspendedJobs.remove(jobId);
+ checkNotNull(executionPlan);
+
+ final JobResult jobResult =
+ new JobResult.Builder()
+ .jobId(jobId)
+ .jobStatus(JobStatus.FAILED)
+ .netRuntime(0)
+ .serializedThrowable(
+ new SerializedThrowable(
+ new FlinkException("Job recovery is not needed.")))
+ .jobName(executionPlan.getName())
+ .applicationId(applicationId)
+ .build();
+ runJobWithCleanupRunner(jobResult);
+ }
+ }
+ }
+
+ private ArchivedApplication finalizeArchivedApplication(
+ AbstractApplication application, ApplicationState status) {
+ final ApplicationID applicationId = application.getApplicationId();
+
+ log.info("Archiving application ({}) with terminal state {}.", applicationId, status);
+
+ long[] stateTimestamps = new long[ApplicationState.values().length];
+ for (ApplicationState applicationState : ApplicationState.values()) {
+ final int ordinal = applicationState.ordinal();
+ stateTimestamps[ordinal] = application.getStatusTimestamp(applicationState);
+ }
+
+ final Map jobs = new HashMap<>(application.getJobs().size());
+
+ for (JobID jobId : application.getJobs()) {
+ ExecutionGraphInfo executionGraphInfo =
+ checkNotNull(
+ partialExecutionGraphInfoStore.remove(jobId),
+ "ExecutionGraphInfo for job %s does not exist.",
+ jobId);
+ jobs.put(jobId, executionGraphInfo);
+ }
+
+ // record job exception for SingleJobApplication
+ if (application instanceof SingleJobApplication) {
+ jobs.values()
+ .forEach(
+ executionGraphInfo -> {
+ ErrorInfo errorInfo =
+ executionGraphInfo
+ .getArchivedExecutionGraph()
+ .getFailureInfo();
+ if (errorInfo != null) {
+ application.addExceptionHistoryEntry(
+ errorInfo.getException(),
+ executionGraphInfo.getJobId());
+ }
+ });
+ }
+
+ return new ArchivedApplication(
+ applicationId,
+ application.getName(),
+ status,
+ stateTimestamps,
+ jobs,
+ application.getExceptionHistory());
+ }
+
private void writeToArchivedApplicationStore(ArchivedApplication archivedApplication) {
try {
archivedApplicationStore.put(archivedApplication);
@@ -887,6 +1062,83 @@ private void writeToArchivedApplicationStore(ArchivedApplication archivedApplica
}
}
+ private CompletableFuture> registerGloballyTerminatedApplicationInApplicationResultStore(
+ ArchivedApplication application) {
+ final ApplicationID applicationId = application.getApplicationId();
+
+ return applicationResultStore
+ .hasCleanApplicationResultEntryAsync(applicationId)
+ .thenCompose(
+ hasCleanResult -> {
+ if (hasCleanResult) {
+ log.warn(
+ "Application {} is already marked as clean but clean up was triggered again.",
+ applicationId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return applicationResultStore
+ .hasDirtyApplicationResultEntryAsync(applicationId)
+ .thenCompose(
+ hasDirtyResult -> {
+ if (hasDirtyResult) {
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return applicationResultStore
+ .createDirtyResultAsync(
+ new ApplicationResultEntry(
+ ApplicationResult
+ .createFrom(
+ application)));
+ });
+ })
+ .handleAsync(
+ (ignored, error) -> {
+ if (error != null) {
+ fatalErrorHandler.onFatalError(
+ new FlinkException(
+ String.format(
+ "The application %s couldn't be marked as pre-cleanup finished in ApplicationResultStore.",
+ applicationId),
+ error));
+ }
+ applicationCreateDirtyResultFutures.get(applicationId).complete(null);
+ return null;
+ },
+ getMainThreadExecutor());
+ }
+
+ private CompletableFuture removeApplication(
+ ApplicationID applicationId, Collection jobs) {
+ return applicationResourceCleaner
+ .cleanupAsync(applicationId)
+ .thenCombine(
+ FutureUtils.waitForAll(
+ jobs.stream()
+ .map(jobMarkResultCleanFutures::get)
+ .collect(Collectors.toList())),
+ (unused1, unused2) ->
+ applicationResultStore.markResultAsCleanAsync(applicationId))
+ .handle(
+ (ignored, t) -> {
+ if (t == null) {
+ log.debug(
+ "Cleanup for the application {} has finished. Application has been marked as clean.",
+ applicationId);
+ } else {
+ log.warn(
+ "Could not properly mark application {} result as clean.",
+ applicationId,
+ t);
+ }
+ return null;
+ })
+ .thenRunAsync(
+ () -> applicationTerminationFutures.get(applicationId).complete(null),
+ getMainThreadExecutor());
+ }
+
@VisibleForTesting
Map getApplications() {
return applications;
@@ -955,7 +1207,6 @@ private void associateJobWithApplication(JobID jobId, ApplicationID applicationI
checkState(applications.containsKey(applicationId));
applications.get(applicationId).addJob(jobId);
- jobIdsToApplicationIds.put(jobId, applicationId);
}
private CompletableFuture handleTermination(
@@ -1051,7 +1302,9 @@ private void runJob(
final JobID jobId = jobManagerRunner.getJobID();
- partialExecutionGraphInfoStore.put(jobId, new CompletableFuture<>());
+ jobCreateDirtyResultFutures.put(jobId, new CompletableFuture<>());
+ jobMarkResultCleanFutures.put(jobId, new CompletableFuture<>());
+ jobIdsToApplicationIds.put(jobId, applicationId);
if (associateJobWithApplication) {
associateJobWithApplication(jobId, applicationId);
@@ -1314,22 +1567,7 @@ private Collection getCompletedJobDetails() {
}
private Stream getPartialExecutionGraphInfo() {
- return partialExecutionGraphInfoStore.values().stream()
- .filter(CompletableFuture::isDone)
- .map(
- executionGraphInfoFuture -> {
- try {
- ExecutionGraphInfo executionGraphInfo =
- executionGraphInfoFuture.getNow(null);
- if (executionGraphInfo != null) {
- return executionGraphInfo;
- }
- } catch (Exception e) {
- log.error("ExecutionGraphInfo future completed with exception", e);
- }
- return null;
- })
- .filter(Objects::nonNull);
+ return partialExecutionGraphInfoStore.values().stream();
}
@Override
@@ -1453,17 +1691,9 @@ private ExecutionGraphInfo getExecutionGraphInfoFromStore(Throwable t, JobID job
}
private Optional getExecutionGraphInfoFromStore(JobID jobId) {
- final CompletableFuture executionGraphInfoFuture =
- partialExecutionGraphInfoStore.get(jobId);
- if (executionGraphInfoFuture != null && executionGraphInfoFuture.isDone()) {
- try {
- ExecutionGraphInfo executionGraphInfo = executionGraphInfoFuture.getNow(null);
- if (executionGraphInfo != null) {
- return Optional.of(executionGraphInfo);
- }
- } catch (Exception e) {
- log.error("Error while retrieving ExecutionGraphInfo for job {}", jobId, e);
- }
+ final ExecutionGraphInfo executionGraphInfo = partialExecutionGraphInfoStore.get(jobId);
+ if (executionGraphInfo != null) {
+ return Optional.of(executionGraphInfo);
}
// check whether the job belongs to a completed application
@@ -1850,19 +2080,14 @@ void registerJobManagerRunnerTerminationFuture(
private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJobState) {
if (cleanupJobState.isGlobalCleanup()) {
final ApplicationID applicationId = jobIdsToApplicationIds.get(jobId);
- final CompletableFuture> applicationTerminationFuture;
- if (applicationTerminationFutures.containsKey(applicationId)) {
- applicationTerminationFuture = applicationTerminationFutures.get(applicationId);
- } else {
- // This can occur during cleanup for recovered dirty job results in session mode
- applicationTerminationFuture = CompletableFuture.completedFuture(null);
- }
+ final CompletableFuture> applicationCreateDirtyResultFuture =
+ applicationCreateDirtyResultFutures.get(applicationId);
- // wait for the application termination before marking the job result as clean
+ // wait for the application dirty result creation before marking the job result as clean
return globalResourceCleaner
.cleanupAsync(jobId)
.thenCombine(
- applicationTerminationFuture,
+ applicationCreateDirtyResultFuture,
(unused1, unused2) -> jobResultStore.markResultAsCleanAsync(jobId))
.handle(
(unusedVoid, e) -> {
@@ -1870,11 +2095,13 @@ private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJo
log.debug(
"Cleanup for the job '{}' has finished. Job has been marked as clean.",
jobId);
+ jobMarkResultCleanFutures.get(jobId).complete(null);
} else {
log.warn(
"Could not properly mark job {} result as clean.",
jobId,
e);
+ jobMarkResultCleanFutures.get(jobId).completeExceptionally(e);
}
return null;
})
@@ -1927,6 +2154,7 @@ private void terminateRunningApplications() {
for (ApplicationID applicationId : applicationsToRemove) {
applications.get(applicationId).dispose();
+ applicationCreateDirtyResultFutures.get(applicationId).cancel(false);
applicationTerminationFutures.get(applicationId).cancel(false);
}
}
@@ -2013,6 +2241,7 @@ private CompletableFuture registerGloballyTerminatedJobInJobRes
executionGraphInfo.getJobId()),
error));
}
+ jobCreateDirtyResultFutures.get(jobId).complete(null);
return CleanupJobState.globalCleanup(terminalJobStatus);
},
getMainThreadExecutor(jobId));
@@ -2070,19 +2299,7 @@ private CompletableFuture createDirtyJobResultEntryAsync(
* @param executionGraphInfo the execution graph information to be stored temporarily
*/
private void writeToExecutionGraphInfoStore(ExecutionGraphInfo executionGraphInfo) {
- final JobID jobId = executionGraphInfo.getJobId();
- if (!partialExecutionGraphInfoStore.containsKey(jobId)) {
- // This can only occur in tests for job termination without submitting the job
- final CompletableFuture executionGraphInfoFuture =
- new CompletableFuture<>();
- executionGraphInfoFuture.complete(executionGraphInfo);
- partialExecutionGraphInfoStore.put(jobId, executionGraphInfoFuture);
- return;
- }
-
- partialExecutionGraphInfoStore
- .get(executionGraphInfo.getJobId())
- .complete(executionGraphInfo);
+ partialExecutionGraphInfoStore.put(executionGraphInfo.getJobId(), executionGraphInfo);
}
private CompletableFuture archiveExecutionGraphToHistoryServer(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
index b7cc00e2a77d6..33a38deb58082 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
@@ -33,8 +35,10 @@ Dispatcher createDispatcher(
DispatcherId fencingToken,
Collection recoveredJobs,
Collection recoveredDirtyJobResults,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
- PartialDispatcherServicesWithJobPersistenceComponents
- partialDispatcherServicesWithJobPersistenceComponents)
+ PartialDispatcherServicesWithPersistenceComponents
+ partialDispatcherServicesWithPersistenceComponents)
throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
index a4faf1300fe16..629a0c8d2fc54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
@@ -23,8 +23,10 @@
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobmanager.ApplicationWriter;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -66,6 +68,10 @@ public class DispatcherServices {
private final JobResultStore jobResultStore;
+ private final ApplicationWriter applicationWriter;
+
+ private final ApplicationResultStore applicationResultStore;
+
private final JobManagerRunnerFactory jobManagerRunnerFactory;
private final CleanupRunnerFactory cleanupRunnerFactory;
@@ -88,6 +94,8 @@ public class DispatcherServices {
JobManagerMetricGroup jobManagerMetricGroup,
ExecutionPlanWriter planWriter,
JobResultStore jobResultStore,
+ ApplicationWriter applicationWriter,
+ ApplicationResultStore applicationResultStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
CleanupRunnerFactory cleanupRunnerFactory,
Executor ioExecutor,
@@ -111,6 +119,9 @@ public class DispatcherServices {
Preconditions.checkNotNull(jobManagerMetricGroup, "JobManagerMetricGroup");
this.executionPlanWriter = Preconditions.checkNotNull(planWriter, "ExecutionPlanWriter");
this.jobResultStore = Preconditions.checkNotNull(jobResultStore, "JobResultStore");
+ this.applicationWriter = Preconditions.checkNotNull(applicationWriter, "ApplicationWriter");
+ this.applicationResultStore =
+ Preconditions.checkNotNull(applicationResultStore, "ApplicationResultStore");
this.jobManagerRunnerFactory =
Preconditions.checkNotNull(jobManagerRunnerFactory, "JobManagerRunnerFactory");
this.cleanupRunnerFactory =
@@ -172,6 +183,14 @@ public JobResultStore getJobResultStore() {
return jobResultStore;
}
+ public ApplicationWriter getApplicationWriter() {
+ return applicationWriter;
+ }
+
+ public ApplicationResultStore getApplicationResultStore() {
+ return applicationResultStore;
+ }
+
JobManagerRunnerFactory getJobManagerRunnerFactory() {
return jobManagerRunnerFactory;
}
@@ -189,31 +208,32 @@ public Collection getFailureEnrichers() {
}
public static DispatcherServices from(
- PartialDispatcherServicesWithJobPersistenceComponents
- partialDispatcherServicesWithJobPersistenceComponents,
+ PartialDispatcherServicesWithPersistenceComponents
+ partialDispatcherServicesWithPersistenceComponents,
JobManagerRunnerFactory jobManagerRunnerFactory,
CleanupRunnerFactory cleanupRunnerFactory) {
return new DispatcherServices(
- partialDispatcherServicesWithJobPersistenceComponents.getConfiguration(),
- partialDispatcherServicesWithJobPersistenceComponents.getHighAvailabilityServices(),
- partialDispatcherServicesWithJobPersistenceComponents
+ partialDispatcherServicesWithPersistenceComponents.getConfiguration(),
+ partialDispatcherServicesWithPersistenceComponents.getHighAvailabilityServices(),
+ partialDispatcherServicesWithPersistenceComponents
.getResourceManagerGatewayRetriever(),
- partialDispatcherServicesWithJobPersistenceComponents.getBlobServer(),
- partialDispatcherServicesWithJobPersistenceComponents.getHeartbeatServices(),
- partialDispatcherServicesWithJobPersistenceComponents.getArchivedApplicationStore(),
- partialDispatcherServicesWithJobPersistenceComponents.getFatalErrorHandler(),
- partialDispatcherServicesWithJobPersistenceComponents.getHistoryServerArchivist(),
- partialDispatcherServicesWithJobPersistenceComponents
- .getMetricQueryServiceAddress(),
- partialDispatcherServicesWithJobPersistenceComponents.getOperationCaches(),
- partialDispatcherServicesWithJobPersistenceComponents
+ partialDispatcherServicesWithPersistenceComponents.getBlobServer(),
+ partialDispatcherServicesWithPersistenceComponents.getHeartbeatServices(),
+ partialDispatcherServicesWithPersistenceComponents.getArchivedApplicationStore(),
+ partialDispatcherServicesWithPersistenceComponents.getFatalErrorHandler(),
+ partialDispatcherServicesWithPersistenceComponents.getHistoryServerArchivist(),
+ partialDispatcherServicesWithPersistenceComponents.getMetricQueryServiceAddress(),
+ partialDispatcherServicesWithPersistenceComponents.getOperationCaches(),
+ partialDispatcherServicesWithPersistenceComponents
.getJobManagerMetricGroupFactory()
.create(),
- partialDispatcherServicesWithJobPersistenceComponents.getExecutionPlanWriter(),
- partialDispatcherServicesWithJobPersistenceComponents.getJobResultStore(),
+ partialDispatcherServicesWithPersistenceComponents.getExecutionPlanWriter(),
+ partialDispatcherServicesWithPersistenceComponents.getJobResultStore(),
+ partialDispatcherServicesWithPersistenceComponents.getApplicationWriter(),
+ partialDispatcherServicesWithPersistenceComponents.getApplicationResultStore(),
jobManagerRunnerFactory,
cleanupRunnerFactory,
- partialDispatcherServicesWithJobPersistenceComponents.getIoExecutor(),
- partialDispatcherServicesWithJobPersistenceComponents.getFailureEnrichers());
+ partialDispatcherServicesWithPersistenceComponents.getIoExecutor(),
+ partialDispatcherServicesWithPersistenceComponents.getFailureEnrichers());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
index ed564eed5d8d6..4b6ba4a44f92a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
@@ -92,7 +92,9 @@ public JobManagerRunner createJobManagerRunner(
final LibraryCacheManager.ClassLoaderLease classLoaderLease =
jobManagerServices
.getLibraryCacheManager()
- .registerClassLoaderLease(executionPlan.getJobID());
+ .registerClassLoaderLease(
+ executionPlan.getJobID(),
+ executionPlan.getApplicationId().orElseThrow());
final ClassLoader userCodeClassLoader =
classLoaderLease
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithPersistenceComponents.java
similarity index 77%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithPersistenceComponents.java
index 4f701bbffd0cc..3dc05b0881884 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithPersistenceComponents.java
@@ -22,8 +22,10 @@
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobmanager.ApplicationWriter;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -35,13 +37,14 @@
import java.util.concurrent.Executor;
/** {@link DispatcherFactory} services container. */
-public class PartialDispatcherServicesWithJobPersistenceComponents
- extends PartialDispatcherServices {
+public class PartialDispatcherServicesWithPersistenceComponents extends PartialDispatcherServices {
private final ExecutionPlanWriter executionPlanWriter;
private final JobResultStore jobResultStore;
+ private final ApplicationWriter applicationWriter;
+ private final ApplicationResultStore applicationResultStore;
- private PartialDispatcherServicesWithJobPersistenceComponents(
+ private PartialDispatcherServicesWithPersistenceComponents(
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
GatewayRetriever resourceManagerGatewayRetriever,
@@ -56,7 +59,9 @@ private PartialDispatcherServicesWithJobPersistenceComponents(
DispatcherOperationCaches operationCaches,
Collection failureEnrichers,
ExecutionPlanWriter executionPlanWriter,
- JobResultStore jobResultStore) {
+ JobResultStore jobResultStore,
+ ApplicationWriter applicationWriter,
+ ApplicationResultStore applicationResultStore) {
super(
configuration,
highAvailabilityServices,
@@ -73,6 +78,8 @@ private PartialDispatcherServicesWithJobPersistenceComponents(
failureEnrichers);
this.executionPlanWriter = executionPlanWriter;
this.jobResultStore = jobResultStore;
+ this.applicationWriter = applicationWriter;
+ this.applicationResultStore = applicationResultStore;
}
public ExecutionPlanWriter getExecutionPlanWriter() {
@@ -83,11 +90,21 @@ public JobResultStore getJobResultStore() {
return jobResultStore;
}
- public static PartialDispatcherServicesWithJobPersistenceComponents from(
+ public ApplicationWriter getApplicationWriter() {
+ return applicationWriter;
+ }
+
+ public ApplicationResultStore getApplicationResultStore() {
+ return applicationResultStore;
+ }
+
+ public static PartialDispatcherServicesWithPersistenceComponents from(
PartialDispatcherServices partialDispatcherServices,
ExecutionPlanWriter executionPlanWriter,
- JobResultStore jobResultStore) {
- return new PartialDispatcherServicesWithJobPersistenceComponents(
+ JobResultStore jobResultStore,
+ ApplicationWriter applicationWriter,
+ ApplicationResultStore applicationResultStore) {
+ return new PartialDispatcherServicesWithPersistenceComponents(
partialDispatcherServices.getConfiguration(),
partialDispatcherServices.getHighAvailabilityServices(),
partialDispatcherServices.getResourceManagerGatewayRetriever(),
@@ -102,6 +119,8 @@ public static PartialDispatcherServicesWithJobPersistenceComponents from(
partialDispatcherServices.getOperationCaches(),
partialDispatcherServices.getFailureEnrichers(),
executionPlanWriter,
- jobResultStore);
+ jobResultStore,
+ applicationWriter,
+ applicationResultStore);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
index 239349c60f609..a11b912aa6661 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.runtime.application.AbstractApplication;
import org.apache.flink.runtime.dispatcher.cleanup.CheckpointResourcesCleanupRunnerFactory;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
@@ -35,9 +37,11 @@ public StandaloneDispatcher createDispatcher(
DispatcherId fencingToken,
Collection recoveredJobs,
Collection recoveredDirtyJobResults,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
- PartialDispatcherServicesWithJobPersistenceComponents
- partialDispatcherServicesWithJobPersistenceComponents)
+ PartialDispatcherServicesWithPersistenceComponents
+ partialDispatcherServicesWithPersistenceComponents)
throws Exception {
// create the default dispatcher
return new StandaloneDispatcher(
@@ -45,9 +49,11 @@ public StandaloneDispatcher createDispatcher(
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
+ recoveredApplications,
+ recoveredDirtyApplicationResults,
dispatcherBootstrapFactory,
DispatcherServices.from(
- partialDispatcherServicesWithJobPersistenceComponents,
+ partialDispatcherServicesWithPersistenceComponents,
JobMasterServiceLeadershipRunnerFactory.INSTANCE,
CheckpointResourcesCleanupRunnerFactory.INSTANCE));
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 58b34f8bd972e..9abb133482c9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobResult;
@@ -37,6 +39,8 @@ public StandaloneDispatcher(
DispatcherId fencingToken,
Collection recoveredJobs,
Collection recoveredDirtyJobResults,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
DispatcherServices dispatcherServices)
throws Exception {
@@ -45,6 +49,8 @@ public StandaloneDispatcher(
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
+ recoveredApplications,
+ recoveredDirtyApplicationResults,
dispatcherBootstrapFactory,
dispatcherServices);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ApplicationResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ApplicationResourceCleaner.java
new file mode 100644
index 0000000000000..e2962e7ccb29c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ApplicationResourceCleaner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.ApplicationID;
+
+import java.util.concurrent.CompletableFuture;
+
+/** {@code ApplicationResourceCleaner} executes instances on the given {@code ApplicationID}. */
+@FunctionalInterface
+public interface ApplicationResourceCleaner {
+
+ /**
+ * Cleans application-related data from resources asynchronously.
+ *
+ * @param applicationId The {@link ApplicationID} referring to the application for which the
+ * data shall be cleaned up.
+ * @return the cleanup result future.
+ */
+ CompletableFuture cleanupAsync(ApplicationID applicationId);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ApplicationResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ApplicationResourceCleanerFactory.java
new file mode 100644
index 0000000000000..1266a86bb490a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/ApplicationResourceCleanerFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@code ApplicationResourceCleanerFactory} provides methods to create {@link
+ * ApplicationResourceCleaner} for application cleanup.
+ *
+ * @see GloballyCleanableApplicationResource
+ */
+public interface ApplicationResourceCleanerFactory {
+
+ /**
+ * Creates {@link ApplicationResourceCleaner} that initiates {@link
+ * GloballyCleanableApplicationResource#globalCleanupAsync(ApplicationID, Executor)} calls.
+ *
+ * @param mainThreadExecutor Used for validating that the {@link
+ * GloballyCleanableApplicationResource#globalCleanupAsync(ApplicationID, Executor)} is
+ * called from the main thread.
+ */
+ ApplicationResourceCleaner createApplicationResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 0c6b263ac14a7..b3128b4f654f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -218,17 +218,6 @@ private static JobStatus getJobStatus(JobResult jobResult) {
private static ExecutionGraphInfo generateExecutionGraphInfo(
JobResult jobResult, long initializationTimestamp) {
return new ExecutionGraphInfo(
- ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
- jobResult.getJobId(),
- jobResult.getJobName(),
- getJobStatus(jobResult),
- null,
- jobResult.getSerializedThrowable().orElse(null),
- null,
- jobResult.getStartTime() < 0
- ? initializationTimestamp
- : jobResult.getStartTime(),
- jobResult.getEndTime(),
- jobResult.getApplicationId().orElse(null)));
+ ArchivedExecutionGraph.createFrom(jobResult, initializationTimestamp));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultApplicationResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultApplicationResourceCleaner.java
new file mode 100644
index 0000000000000..a19df4fd1ffa9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultApplicationResourceCleaner.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * {@code DefaultApplicationResourceCleaner} is the default implementation of {@link
+ * ApplicationResourceCleaner}. It will try to clean up any resource that was added. Failure will
+ * result in an individual retry of the cleanup. The overall cleanup result succeeds after all
+ * subtasks succeeded.
+ */
+public class DefaultApplicationResourceCleaner implements ApplicationResourceCleaner {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultApplicationResourceCleaner.class);
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+ private final Executor cleanupExecutor;
+ private final CleanupFn cleanupFn;
+
+ private final Collection> regularCleanup;
+
+ private final RetryStrategy retryStrategy;
+
+ public static Builder forGloballyCleanableResources(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ RetryStrategy retryStrategy) {
+ return forCleanableResources(
+ mainThreadExecutor,
+ cleanupExecutor,
+ GloballyCleanableApplicationResource::globalCleanupAsync,
+ retryStrategy);
+ }
+
+ @VisibleForTesting
+ static Builder forCleanableResources(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn cleanupFunction,
+ RetryStrategy retryStrategy) {
+ return new Builder<>(mainThreadExecutor, cleanupExecutor, cleanupFunction, retryStrategy);
+ }
+
+ @VisibleForTesting
+ @FunctionalInterface
+ interface CleanupFn {
+ CompletableFuture cleanupAsync(
+ T resource, ApplicationID applicationId, Executor cleanupExecutor);
+ }
+
+ /**
+ * {@code Builder} for creating {@code DefaultApplicationResourceCleaner} instances.
+ *
+ * @param The functional interface that's being translated into the internally used {@link
+ * CleanupFn}.
+ */
+ public static class Builder {
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+ private final Executor cleanupExecutor;
+ private final CleanupFn cleanupFn;
+
+ private final RetryStrategy retryStrategy;
+
+ private final Collection> prioritizedCleanup = new ArrayList<>();
+ private final Collection> regularCleanup = new ArrayList<>();
+
+ private Builder(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn cleanupFn,
+ RetryStrategy retryStrategy) {
+ this.mainThreadExecutor = mainThreadExecutor;
+ this.cleanupExecutor = cleanupExecutor;
+ this.cleanupFn = cleanupFn;
+ this.retryStrategy = retryStrategy;
+ }
+
+ /**
+ * Prioritized cleanups run before their regular counterparts. This method enables the
+ * caller to model dependencies between cleanup tasks. The order in which cleanable
+ * resources are added matters, i.e. if two cleanable resources are added as prioritized
+ * cleanup tasks, the resource being added first will block the cleanup of the second
+ * resource. All prioritized cleanup resources will run and finish before any resource that
+ * is added using {@link #withRegularCleanup(String, Object)} is started.
+ *
+ * @param label The label being used when logging errors in the given cleanup.
+ * @param prioritizedCleanup The cleanup callback that is going to be prioritized.
+ */
+ public Builder withPrioritizedCleanup(String label, T prioritizedCleanup) {
+ this.prioritizedCleanup.add(new CleanupWithLabel<>(prioritizedCleanup, label));
+ return this;
+ }
+
+ /**
+ * Regular cleanups are resources for which the cleanup is triggered after all prioritized
+ * cleanups succeeded. All added regular cleanups will run concurrently to each other.
+ *
+ * @param label The label being used when logging errors in the given cleanup.
+ * @param regularCleanup The cleanup callback that is going to run after all prioritized
+ * cleanups are finished.
+ * @see #withPrioritizedCleanup(String, Object)
+ */
+ public Builder withRegularCleanup(String label, T regularCleanup) {
+ this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup, label));
+ return this;
+ }
+
+ public DefaultApplicationResourceCleaner build() {
+ return new DefaultApplicationResourceCleaner<>(
+ mainThreadExecutor,
+ cleanupExecutor,
+ cleanupFn,
+ prioritizedCleanup,
+ regularCleanup,
+ retryStrategy);
+ }
+ }
+
+ private DefaultApplicationResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor,
+ Executor cleanupExecutor,
+ CleanupFn cleanupFn,
+ Collection> prioritizedCleanup,
+ Collection> regularCleanup,
+ RetryStrategy retryStrategy) {
+ this.mainThreadExecutor = mainThreadExecutor;
+ this.cleanupExecutor = cleanupExecutor;
+ this.cleanupFn = cleanupFn;
+ this.regularCleanup = regularCleanup;
+ this.retryStrategy = retryStrategy;
+ }
+
+ @Override
+ public CompletableFuture cleanupAsync(ApplicationID applicationId) {
+ mainThreadExecutor.assertRunningInMainThread();
+
+ CompletableFuture cleanupFuture = FutureUtils.completedVoidFuture();
+
+ return cleanupFuture.thenCompose(
+ ignoredValue ->
+ FutureUtils.completeAll(
+ regularCleanup.stream()
+ .map(
+ cleanupWithLabel ->
+ withRetry(
+ applicationId,
+ cleanupWithLabel.getLabel(),
+ cleanupWithLabel.getCleanup()))
+ .collect(Collectors.toList())));
+ }
+
+ private CompletableFuture withRetry(
+ ApplicationID applicationId, String label, T cleanup) {
+ return FutureUtils.retryWithDelay(
+ () ->
+ cleanupFn
+ .cleanupAsync(cleanup, applicationId, cleanupExecutor)
+ .whenComplete(
+ (value, throwable) -> {
+ if (throwable != null) {
+ final String logMessage =
+ String.format(
+ "Cleanup of %s failed for application %s due to a %s: %s",
+ label,
+ applicationId,
+ throwable
+ .getClass()
+ .getSimpleName(),
+ throwable.getMessage());
+ if (LOG.isTraceEnabled()) {
+ LOG.warn(logMessage, throwable);
+ } else {
+ LOG.warn(logMessage);
+ }
+ }
+ }),
+ retryStrategy,
+ mainThreadExecutor);
+ }
+
+ /**
+ * {@code CleanupWithLabel} makes it possible to attach a label to a given cleanup that can be
+ * used as human-readable representation of the corresponding cleanup.
+ *
+ * @param The type of cleanup.
+ */
+ private static class CleanupWithLabel {
+
+ private final CLEANUP_TYPE cleanup;
+ private final String label;
+
+ public CleanupWithLabel(CLEANUP_TYPE cleanup, String label) {
+ this.cleanup = cleanup;
+ this.label = label;
+ }
+
+ public CLEANUP_TYPE getCleanup() {
+ return cleanup;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherApplicationResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherApplicationResourceCleanerFactory.java
new file mode 100644
index 0000000000000..570e1360cf8c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherApplicationResourceCleanerFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.jobmanager.ApplicationWriter;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.RetryStrategy;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@code DispatcherApplicationResourceCleanerFactory} instantiates {@link
+ * ApplicationResourceCleaner} instances that clean cleanable resources from the {@link
+ * org.apache.flink.runtime.dispatcher.Dispatcher}.
+ */
+public class DispatcherApplicationResourceCleanerFactory
+ implements ApplicationResourceCleanerFactory {
+
+ private static final String APPLICATION_STORE_LABEL = "ApplicationStore";
+ private static final String BLOB_SERVER_LABEL = "BlobServer";
+
+ private final Executor cleanupExecutor;
+ private final RetryStrategy retryStrategy;
+
+ private final ApplicationWriter applicationWriter;
+ private final BlobServer blobServer;
+
+ public DispatcherApplicationResourceCleanerFactory(DispatcherServices dispatcherServices) {
+ this(
+ dispatcherServices.getIoExecutor(),
+ CleanupRetryStrategyFactory.INSTANCE.createRetryStrategy(
+ dispatcherServices.getConfiguration()),
+ dispatcherServices.getApplicationWriter(),
+ dispatcherServices.getBlobServer());
+ }
+
+ @VisibleForTesting
+ public DispatcherApplicationResourceCleanerFactory(
+ Executor cleanupExecutor,
+ RetryStrategy retryStrategy,
+ ApplicationWriter applicationWriter,
+ BlobServer blobServer) {
+ this.cleanupExecutor = Preconditions.checkNotNull(cleanupExecutor);
+ this.retryStrategy = retryStrategy;
+ this.applicationWriter = Preconditions.checkNotNull(applicationWriter);
+ this.blobServer = Preconditions.checkNotNull(blobServer);
+ }
+
+ @Override
+ public ApplicationResourceCleaner createApplicationResourceCleaner(
+ ComponentMainThreadExecutor mainThreadExecutor) {
+ return DefaultApplicationResourceCleaner.forGloballyCleanableResources(
+ mainThreadExecutor, cleanupExecutor, retryStrategy)
+ .withRegularCleanup(APPLICATION_STORE_LABEL, applicationWriter)
+ .withRegularCleanup(BLOB_SERVER_LABEL, blobServer)
+ .build();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java
index f84ca53baa64f..dbc181faaa05b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/AbstractDispatcherLeaderProcess.java
@@ -21,10 +21,14 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.application.AbstractApplication;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobmanager.ApplicationStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -260,8 +264,12 @@ DispatcherGatewayService create(
DispatcherId dispatcherId,
Collection recoveredJobs,
Collection recoveredDirtyJobResults,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
ExecutionPlanWriter executionPlanWriter,
- JobResultStore jobResultStore);
+ JobResultStore jobResultStore,
+ ApplicationStore applicationStore,
+ ApplicationResultStore applicationResultStore);
}
/** An accessor of the {@link DispatcherGateway}. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
index b0e2c712e4493..1767ae6b6d89b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherGatewayServiceFactory.java
@@ -18,13 +18,17 @@
package org.apache.flink.runtime.dispatcher.runner;
+import org.apache.flink.runtime.application.AbstractApplication;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
-import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobPersistenceComponents;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithPersistenceComponents;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobmanager.ApplicationStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.RpcService;
@@ -57,8 +61,12 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection recoveredJobs,
Collection recoveredDirtyJobResults,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults,
ExecutionPlanWriter executionPlanWriter,
- JobResultStore jobResultStore) {
+ JobResultStore jobResultStore,
+ ApplicationStore applicationStore,
+ ApplicationResultStore applicationResultStore) {
final Dispatcher dispatcher;
try {
@@ -68,12 +76,16 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
+ recoveredApplications,
+ recoveredDirtyApplicationResults,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
- PartialDispatcherServicesWithJobPersistenceComponents.from(
+ PartialDispatcherServicesWithPersistenceComponents.from(
partialDispatcherServices,
executionPlanWriter,
- jobResultStore));
+ jobResultStore,
+ applicationStore,
+ applicationResultStore));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java
index fdd9333df4740..d0a2e4192172f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerFactory.java
@@ -20,7 +20,7 @@
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
-import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
+import org.apache.flink.runtime.jobmanager.PersistenceComponentFactory;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -43,7 +43,7 @@ public DefaultDispatcherRunnerFactory(
public DispatcherRunner createDispatcherRunner(
LeaderElection leaderElection,
FatalErrorHandler fatalErrorHandler,
- JobPersistenceComponentFactory jobPersistenceComponentFactory,
+ PersistenceComponentFactory persistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices)
@@ -51,7 +51,7 @@ public DispatcherRunner createDispatcherRunner(
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
dispatcherLeaderProcessFactoryFactory.createFactory(
- jobPersistenceComponentFactory,
+ persistenceComponentFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java
index 222c349870100..e630c066a3d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherLeaderProcessFactoryFactory.java
@@ -20,7 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
-import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
+import org.apache.flink.runtime.jobmanager.PersistenceComponentFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -31,7 +31,7 @@
public interface DispatcherLeaderProcessFactoryFactory {
DispatcherLeaderProcessFactory createFactory(
- JobPersistenceComponentFactory jobPersistenceComponentFactory,
+ PersistenceComponentFactory persistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java
index 887ae8209e7a8..ed8f7900128cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DispatcherRunnerFactory.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.dispatcher.runner;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
-import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
+import org.apache.flink.runtime.jobmanager.PersistenceComponentFactory;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -32,7 +32,7 @@ public interface DispatcherRunnerFactory {
DispatcherRunner createDispatcherRunner(
LeaderElection leaderElection,
FatalErrorHandler fatalErrorHandler,
- JobPersistenceComponentFactory jobPersistenceComponentFactory,
+ PersistenceComponentFactory persistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
index a01c32f905881..22504c20235bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
@@ -18,16 +18,26 @@
package org.apache.flink.runtime.dispatcher.runner;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.JobInfoImpl;
+import org.apache.flink.configuration.RpcOptions;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.application.SingleJobApplication;
+import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobmanager.ApplicationStore;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkApplicationNotFoundException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
@@ -36,15 +46,19 @@
import org.apache.flink.util.function.FunctionUtils;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -60,6 +74,12 @@ public class SessionDispatcherLeaderProcess extends AbstractDispatcherLeaderProc
private final JobResultStore jobResultStore;
+ private final ApplicationStore applicationStore;
+
+ private final ApplicationResultStore applicationResultStore;
+
+ private final BlobServer blobServer;
+
private final Executor ioExecutor;
private CompletableFuture onGoingRecoveryOperation = FutureUtils.completedVoidFuture();
@@ -69,6 +89,9 @@ private SessionDispatcherLeaderProcess(
DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory,
ExecutionPlanStore executionPlanStore,
JobResultStore jobResultStore,
+ ApplicationStore applicationStore,
+ ApplicationResultStore applicationResultStore,
+ BlobServer blobServer,
Executor ioExecutor,
FatalErrorHandler fatalErrorHandler) {
super(leaderSessionId, fatalErrorHandler);
@@ -76,6 +99,9 @@ private SessionDispatcherLeaderProcess(
this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
this.executionPlanStore = executionPlanStore;
this.jobResultStore = jobResultStore;
+ this.applicationStore = applicationStore;
+ this.applicationResultStore = applicationResultStore;
+ this.blobServer = blobServer;
this.ioExecutor = ioExecutor;
}
@@ -83,8 +109,7 @@ private SessionDispatcherLeaderProcess(
protected void onStart() {
startServices();
- onGoingRecoveryOperation =
- createDispatcherBasedOnRecoveredExecutionPlansAndRecoveredDirtyJobResults();
+ onGoingRecoveryOperation = createDispatcherBasedOnRecoveredApplicationsAndJobs();
}
private void startServices() {
@@ -98,46 +123,93 @@ private void startServices() {
getClass().getSimpleName()),
e);
}
+
+ try {
+ applicationStore.start();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format(
+ "Could not start %s when trying to start the %s.",
+ applicationStore.getClass().getSimpleName(),
+ getClass().getSimpleName()),
+ e);
+ }
}
private void createDispatcherIfRunning(
Collection executionPlans,
- Collection recoveredDirtyJobResults) {
+ Collection recoveredDirtyJobResults,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults) {
runIfStateIs(
- State.RUNNING, () -> createDispatcher(executionPlans, recoveredDirtyJobResults));
+ State.RUNNING,
+ () ->
+ createDispatcher(
+ executionPlans,
+ recoveredDirtyJobResults,
+ recoveredApplications,
+ recoveredDirtyApplicationResults));
}
private void createDispatcher(
Collection executionPlans,
- Collection recoveredDirtyJobResults) {
+ Collection recoveredDirtyJobResults,
+ Collection recoveredApplications,
+ Collection recoveredDirtyApplicationResults) {
final DispatcherGatewayService dispatcherService =
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
executionPlans,
recoveredDirtyJobResults,
+ recoveredApplications,
+ recoveredDirtyApplicationResults,
executionPlanStore,
- jobResultStore);
+ jobResultStore,
+ applicationStore,
+ applicationResultStore);
completeDispatcherSetup(dispatcherService);
}
- private CompletableFuture
- createDispatcherBasedOnRecoveredExecutionPlansAndRecoveredDirtyJobResults() {
- // TODO support application recovery which may require fetching user jar from blob server
-
- final CompletableFuture> dirtyJobsFuture =
+ private CompletableFuture createDispatcherBasedOnRecoveredApplicationsAndJobs() {
+ final CompletableFuture> dirtyJobResultsFuture =
CompletableFuture.supplyAsync(this::getDirtyJobResultsIfRunning, ioExecutor);
- return dirtyJobsFuture
- .thenApplyAsync(
- dirtyJobs ->
- this.recoverJobsIfRunning(
- dirtyJobs.stream()
+ final CompletableFuture> recoveredJobsFuture =
+ dirtyJobResultsFuture.thenApplyAsync(
+ dirtyJobResults ->
+ recoverJobsIfRunning(
+ dirtyJobResults.stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet())),
- ioExecutor)
- .thenAcceptBoth(dirtyJobsFuture, this::createDispatcherIfRunning)
+ ioExecutor);
+
+ final CompletableFuture> dirtyApplicationResultsFuture =
+ CompletableFuture.supplyAsync(
+ this::getDirtyApplicationResultsIfRunning, ioExecutor);
+
+ final CompletableFuture> recoveredApplicationsFuture =
+ dirtyApplicationResultsFuture.thenCombineAsync(
+ recoveredJobsFuture,
+ (dirtyApplicationResults, recoveredJobs) -> {
+ return recoverApplicationsIfRunning(
+ dirtyApplicationResults.stream()
+ .map(ApplicationResult::getApplicationId)
+ .collect(Collectors.toSet()),
+ recoveredJobsFuture.join(),
+ dirtyJobResultsFuture.join());
+ },
+ ioExecutor);
+
+ return recoveredApplicationsFuture
+ .thenRun(
+ () ->
+ createDispatcherIfRunning(
+ recoveredJobsFuture.join(),
+ dirtyJobResultsFuture.join(),
+ recoveredApplicationsFuture.join(),
+ dirtyApplicationResultsFuture.join()))
.handle(this::onErrorIfRunning);
}
@@ -205,6 +277,128 @@ private Collection getDirtyJobResults() {
}
}
+ private Collection getDirtyApplicationResultsIfRunning() {
+ return supplyUnsynchronizedIfRunning(this::getDirtyApplicationResults)
+ .orElse(Collections.emptyList());
+ }
+
+ private Collection getDirtyApplicationResults() {
+ try {
+ return applicationResultStore.getDirtyResults();
+ } catch (IOException e) {
+ throw new FlinkRuntimeException(
+ "Could not retrieve ApplicationResults from ApplicationResultStore", e);
+ }
+ }
+
+ private Collection recoverApplicationsIfRunning(
+ Set recoveredDirtyApplicationResults,
+ Collection recoveredJobs,
+ Collection dirtyJobResults) {
+ return supplyUnsynchronizedIfRunning(
+ () ->
+ recoverApplications(
+ recoveredDirtyApplicationResults,
+ recoveredJobs,
+ dirtyJobResults))
+ .orElse(Collections.emptyList());
+ }
+
+ private Collection recoverApplications(
+ Set recoveredDirtyJobResults,
+ Collection recoveredJobs,
+ Collection dirtyJobResults) {
+ log.info("Recover all persisted applications that are not finished, yet.");
+ final Collection applicationIds = getApplicationIds();
+ final Collection recoveredApplications = new ArrayList<>();
+
+ final Map> recoveredJobInfosByApplication =
+ new HashMap<>();
+ for (ExecutionPlan executionPlan : recoveredJobs) {
+ ApplicationID applicationId =
+ executionPlan
+ .getApplicationId()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Application ID is missing in the recovered execution plan. This suggests the job was submitted through an unsupported or incomplete path."));
+ recoveredJobInfosByApplication
+ .computeIfAbsent(applicationId, k -> new ArrayList<>())
+ .add(new JobInfoImpl(executionPlan.getJobID(), executionPlan.getName()));
+ }
+
+ final Map> recoveredTerminalJobInfosByApplication =
+ new HashMap<>();
+ for (JobResult jobResult : dirtyJobResults) {
+ ApplicationID applicationId =
+ jobResult
+ .getApplicationId()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Application ID is missing in the recovered job result. This suggests the job was submitted through an unsupported or incomplete path."));
+ recoveredTerminalJobInfosByApplication
+ .computeIfAbsent(applicationId, k -> new ArrayList<>())
+ .add(new JobInfoImpl(jobResult.getJobId(), jobResult.getJobName()));
+ }
+
+ for (ApplicationID applicationId : applicationIds) {
+ if (!recoveredDirtyJobResults.contains(applicationId)) {
+ Collection recoveredJobInfos =
+ recoveredJobInfosByApplication.getOrDefault(
+ applicationId, Collections.emptyList());
+ Collection recoveredTerminalJobInfos =
+ recoveredTerminalJobInfosByApplication.getOrDefault(
+ applicationId, Collections.emptyList());
+ tryRecoverApplication(applicationId, recoveredJobInfos, recoveredTerminalJobInfos)
+ .ifPresent(recoveredApplications::add);
+ } else {
+ log.info(
+ "Skipping recovery of an application with id {}, because it already reached a globally terminal state",
+ applicationId);
+ }
+ }
+
+ log.info("Successfully recovered {} persisted applications.", recoveredApplications.size());
+
+ return recoveredApplications;
+ }
+
+ private Collection getApplicationIds() {
+ try {
+ return applicationStore.getApplicationIds();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ "Could not retrieve application ids of persisted applications.", e);
+ }
+ }
+
+ private Optional tryRecoverApplication(
+ ApplicationID applicationId,
+ Collection recoveredJobInfos,
+ Collection recoveredTerminalJobInfos) {
+ log.info("Trying to recover application with id {}.", applicationId);
+ try {
+ Optional applicationStoreEntry =
+ applicationStore.recoverApplication(applicationId);
+ if (applicationStoreEntry.isEmpty()) {
+ log.info(
+ "Skipping recovery of application with id {}, because it already finished in a previous execution",
+ applicationId);
+ return Optional.empty();
+ }
+
+ return Optional.of(
+ applicationStoreEntry
+ .get()
+ .getApplication(
+ blobServer, recoveredJobInfos, recoveredTerminalJobInfos));
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format("Could not recover application with id %s.", applicationId), e);
+ }
+ }
+
@Override
protected CompletableFuture onClose() {
return CompletableFuture.runAsync(this::stopServices, ioExecutor);
@@ -213,6 +407,7 @@ protected CompletableFuture onClose() {
private void stopServices() {
try {
executionPlanStore.stop();
+ applicationStore.stop();
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
@@ -251,28 +446,40 @@ private Optional> submitAddedJobIfRunning(ExecutionPlan
private CompletableFuture submitAddedJob(ExecutionPlan executionPlan) {
final DispatcherGateway dispatcherGateway = getDispatcherGatewayInternal();
+ final JobID jobId = executionPlan.getJobID();
+ final Duration timeout =
+ executionPlan.getJobConfiguration().get(RpcOptions.ASK_TIMEOUT_DURATION);
+ // Skip job submission if its associated application exists, as the application will handle
+ // the job recovery
+ ApplicationID applicationId = executionPlan.getApplicationId().orElse(null);
return dispatcherGateway
- .submitJob(executionPlan, RpcUtils.INF_TIMEOUT)
- .thenApply(FunctionUtils.nullFn())
- .exceptionally(this::filterOutDuplicateJobSubmissionException);
- }
-
- private Void filterOutDuplicateJobSubmissionException(Throwable throwable) {
- final Throwable strippedException = ExceptionUtils.stripCompletionException(throwable);
- if (strippedException instanceof DuplicateJobSubmissionException) {
- final DuplicateJobSubmissionException duplicateJobSubmissionException =
- (DuplicateJobSubmissionException) strippedException;
-
- log.debug(
- "Ignore recovered job {} because the job is currently being executed.",
- duplicateJobSubmissionException.getJobID(),
- duplicateJobSubmissionException);
-
- return null;
- } else {
- throw new CompletionException(throwable);
- }
+ .requestApplication(applicationId, timeout)
+ .handle(
+ (ignored, throwable) -> {
+ if (throwable == null) {
+ log.debug(
+ "Ignore recovered job {} because the associated application {} exists.",
+ jobId,
+ applicationId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ final Throwable strippedException =
+ ExceptionUtils.stripCompletionException(throwable);
+ if (strippedException instanceof FlinkApplicationNotFoundException) {
+ log.info(
+ "Submitting job {} added to the {} by another process by wrapping it in a SingleJobApplication",
+ jobId,
+ executionPlanStore.getClass().getSimpleName());
+ return dispatcherGateway
+ .submitApplication(
+ new SingleJobApplication(executionPlan), timeout)
+ .thenApply(FunctionUtils.nullFn());
+ }
+ throw new CompletionException(throwable);
+ })
+ .thenCompose(Function.identity());
}
private DispatcherGateway getDispatcherGatewayInternal() {
@@ -322,6 +529,9 @@ public static SessionDispatcherLeaderProcess create(
DispatcherGatewayServiceFactory dispatcherFactory,
ExecutionPlanStore executionPlanStore,
JobResultStore jobResultStore,
+ ApplicationStore applicationStore,
+ ApplicationResultStore applicationResultStore,
+ BlobServer blobServer,
Executor ioExecutor,
FatalErrorHandler fatalErrorHandler) {
return new SessionDispatcherLeaderProcess(
@@ -329,6 +539,9 @@ public static SessionDispatcherLeaderProcess create(
dispatcherFactory,
executionPlanStore,
jobResultStore,
+ applicationStore,
+ applicationResultStore,
+ blobServer,
ioExecutor,
fatalErrorHandler);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java
index a239746b799da..777c66e0f5aab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactory.java
@@ -19,7 +19,8 @@
package org.apache.flink.runtime.dispatcher.runner;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.jobmanager.PersistenceComponentFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import java.util.UUID;
@@ -31,18 +32,21 @@ public class SessionDispatcherLeaderProcessFactory implements DispatcherLeaderPr
private final AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory
dispatcherGatewayServiceFactory;
- private final JobPersistenceComponentFactory jobPersistenceComponentFactory;
+ private final PersistenceComponentFactory persistenceComponentFactory;
+ private final BlobServer blobServer;
private final Executor ioExecutor;
private final FatalErrorHandler fatalErrorHandler;
public SessionDispatcherLeaderProcessFactory(
AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory
dispatcherGatewayServiceFactory,
- JobPersistenceComponentFactory jobPersistenceComponentFactory,
+ PersistenceComponentFactory persistenceComponentFactory,
+ BlobServer blobServer,
Executor ioExecutor,
FatalErrorHandler fatalErrorHandler) {
this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
- this.jobPersistenceComponentFactory = jobPersistenceComponentFactory;
+ this.persistenceComponentFactory = persistenceComponentFactory;
+ this.blobServer = blobServer;
this.ioExecutor = ioExecutor;
this.fatalErrorHandler = fatalErrorHandler;
}
@@ -52,8 +56,11 @@ public DispatcherLeaderProcess create(UUID leaderSessionID) {
return SessionDispatcherLeaderProcess.create(
leaderSessionID,
dispatcherGatewayServiceFactory,
- jobPersistenceComponentFactory.createExecutionPlanStore(),
- jobPersistenceComponentFactory.createJobResultStore(),
+ persistenceComponentFactory.createExecutionPlanStore(),
+ persistenceComponentFactory.createJobResultStore(),
+ persistenceComponentFactory.createApplicationStore(),
+ persistenceComponentFactory.createApplicationResultStore(),
+ blobServer,
ioExecutor,
fatalErrorHandler);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java
index ee2fb081647fe..99fb4667eec3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessFactoryFactory.java
@@ -20,7 +20,7 @@
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
-import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
+import org.apache.flink.runtime.jobmanager.PersistenceComponentFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -38,7 +38,7 @@ private SessionDispatcherLeaderProcessFactoryFactory(DispatcherFactory dispatche
@Override
public DispatcherLeaderProcessFactory createFactory(
- JobPersistenceComponentFactory jobPersistenceComponentFactory,
+ PersistenceComponentFactory persistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices,
@@ -50,7 +50,8 @@ public DispatcherLeaderProcessFactory createFactory(
return new SessionDispatcherLeaderProcessFactory(
dispatcherGatewayServiceFactory,
- jobPersistenceComponentFactory,
+ persistenceComponentFactory,
+ partialDispatcherServices.getBlobServer(),
ioExecutor,
fatalErrorHandler);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index 0408c4af5c301..60abff3464afd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -37,7 +37,7 @@
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobmanager.HaServicesJobPersistenceComponentFactory;
+import org.apache.flink.runtime.jobmanager.HaServicesPersistenceComponentFactory;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -226,7 +226,7 @@ public DispatcherResourceManagerComponent create(
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElection(),
fatalErrorHandler,
- new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
+ new HaServicesPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index eea9650fbba39..30874f75b2198 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.execution.librarycache;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
@@ -92,10 +93,10 @@ public BlobLibraryCacheManager(
}
@Override
- public ClassLoaderLease registerClassLoaderLease(JobID jobId) {
+ public ClassLoaderLease registerClassLoaderLease(JobID jobId, ApplicationID applicationId) {
synchronized (lockObject) {
return cacheEntries
- .computeIfAbsent(jobId, jobID -> new LibraryCacheEntry(jobId))
+ .computeIfAbsent(jobId, jobID -> new LibraryCacheEntry(jobId, applicationId))
.obtainLease();
}
}
@@ -210,6 +211,8 @@ private static Consumer createClassLoadingExceptionHandler(
private final class LibraryCacheEntry {
private final JobID jobId;
+ private final ApplicationID applicationId;
+
@GuardedBy("lockObject")
private int referenceCount;
@@ -220,8 +223,9 @@ private final class LibraryCacheEntry {
@GuardedBy("lockObject")
private boolean isReleased;
- private LibraryCacheEntry(JobID jobId) {
+ private LibraryCacheEntry(JobID jobId, ApplicationID applicationId) {
this.jobId = jobId;
+ this.applicationId = applicationId;
referenceCount = 0;
this.resolvedClassLoader = null;
this.isReleased = false;
@@ -241,7 +245,7 @@ private UserCodeClassLoader getOrResolveClassLoader(
systemClassLoader
? ClassLoader.getSystemClassLoader()
: createUserCodeClassLoader(
- jobId, libraries, classPaths),
+ jobId, applicationId, libraries, classPaths),
libraries,
classPaths,
systemClassLoader);
@@ -256,6 +260,7 @@ private UserCodeClassLoader getOrResolveClassLoader(
@GuardedBy("lockObject")
private URLClassLoader createUserCodeClassLoader(
JobID jobId,
+ ApplicationID applicationId,
Collection requiredJarFiles,
Collection requiredClasspaths)
throws IOException {
@@ -265,7 +270,19 @@ private URLClassLoader createUserCodeClassLoader(
int count = 0;
// add URLs to locally cached JAR files
for (PermanentBlobKey key : requiredJarFiles) {
- libraryURLs[count] = blobService.getFile(jobId, key).toURI().toURL();
+ try {
+ // first try job specific libraries
+ libraryURLs[count] = blobService.getFile(jobId, key).toURI().toURL();
+ } catch (Exception e) {
+ // then try application specific libraries
+ LOG.info(
+ "Cannot get job specific blob {} for job {}. Checking application specific blob with application id {}.",
+ key,
+ jobId,
+ applicationId);
+ libraryURLs[count] =
+ blobService.getFile(applicationId, key).toURI().toURL();
+ }
++count;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index d38301ea32ac1..e7f9e82f1a6c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.execution.librarycache;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.util.UserCodeClassLoader;
@@ -43,9 +44,10 @@ public interface LibraryCacheManager {
* job will be valid as long as there exists a valid lease for this job.
*
* @param jobId jobId for which to register a new class loader lease
+ * @param applicationId applicationId to which the job belongs
* @return a new class loader lease for the given job
*/
- ClassLoaderLease registerClassLoaderLease(JobID jobId);
+ ClassLoaderLease registerClassLoaderLease(JobID jobId, ApplicationID applicationId);
/**
* Shuts the library cache manager down. Thereby it will close all open {@link ClassLoaderLease}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index e259385507fab..da553564a3640 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -32,6 +32,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
@@ -401,6 +402,28 @@ public static ArchivedExecutionGraph createFrom(
executionGraph.getApplicationId().orElse(null));
}
+ /**
+ * Create a {@link ArchivedExecutionGraph} from the given {@link JobResult}.
+ *
+ * @param jobResult to create the ArchivedExecutionGraph from
+ * @param initializationTimestamp optionally overrides the initialization timestamp if the
+ * jobResult does not have a valid one
+ * @return ArchivedExecutionGraph created from the given jobResult
+ */
+ public static ArchivedExecutionGraph createFrom(
+ JobResult jobResult, long initializationTimestamp) {
+ return createSparseArchivedExecutionGraph(
+ jobResult.getJobId(),
+ jobResult.getJobName(),
+ jobResult.getJobStatus().orElseThrow(),
+ null,
+ jobResult.getSerializedThrowable().orElse(null),
+ null,
+ jobResult.getStartTime() < 0 ? initializationTimestamp : jobResult.getStartTime(),
+ jobResult.getEndTime(),
+ jobResult.getApplicationId().orElse(null));
+ }
+
/**
* Create a sparse ArchivedExecutionGraph for a job. Most fields will be empty, only job status
* and error-related fields are set.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index 24201fa28205a..1c0a416f27394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -23,6 +23,7 @@
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.ApplicationStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderElection;
@@ -67,6 +68,8 @@ public abstract class AbstractHaServices implements HighAvailabilityServices {
private final JobResultStore jobResultStore;
+ private final ApplicationResultStore applicationResultStore;
+
private final DefaultLeaderElectionService leaderElectionService;
protected AbstractHaServices(
@@ -74,12 +77,14 @@ protected AbstractHaServices(
LeaderElectionDriverFactory driverFactory,
Executor ioExecutor,
BlobStoreService blobStoreService,
- JobResultStore jobResultStore) {
+ JobResultStore jobResultStore,
+ ApplicationResultStore applicationResultStore) {
this.configuration = checkNotNull(config);
this.ioExecutor = checkNotNull(ioExecutor);
this.blobStoreService = checkNotNull(blobStoreService);
this.jobResultStore = checkNotNull(jobResultStore);
+ this.applicationResultStore = checkNotNull(applicationResultStore);
this.leaderElectionService = new DefaultLeaderElectionService(driverFactory);
}
@@ -145,6 +150,16 @@ public JobResultStore getJobResultStore() throws Exception {
return jobResultStore;
}
+ @Override
+ public ApplicationStore getApplicationStore() throws Exception {
+ return createApplicationStore();
+ }
+
+ @Override
+ public ApplicationResultStore getApplicationResultStore() throws Exception {
+ return applicationResultStore;
+ }
+
@Override
public BlobStore createBlobStore() {
return blobStoreService;
@@ -248,6 +263,14 @@ public CompletableFuture globalCleanupAsync(JobID jobID, Executor executor
*/
protected abstract ExecutionPlanStore createExecutionPlanStore() throws Exception;
+ /**
+ * Create the submitted application store for the job manager.
+ *
+ * @return Submitted application store
+ * @throws Exception if the submitted application store could not be created
+ */
+ protected abstract ApplicationStore createApplicationStore() throws Exception;
+
/**
* Closes the components which is used for external operations(e.g. Zookeeper Client, Kubernetes
* Client).
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
new file mode 100644
index 0000000000000..5fbdacff80707
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeApplicationResultStore.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** An abstract class for threadsafe implementations of the {@link ApplicationResultStore}. */
+public abstract class AbstractThreadsafeApplicationResultStore implements ApplicationResultStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractThreadsafeApplicationResultStore.class);
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ private final Executor ioExecutor;
+
+ protected AbstractThreadsafeApplicationResultStore(Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ }
+
+ @Override
+ public CompletableFuture createDirtyResultAsync(
+ ApplicationResultEntry applicationResultEntry) {
+ return hasApplicationResultEntryAsync(applicationResultEntry.getApplicationId())
+ .thenAccept(
+ hasApplicationResultEntry ->
+ Preconditions.checkState(
+ !hasApplicationResultEntry,
+ "Application result store already contains an entry for application %s",
+ applicationResultEntry.getApplicationId()))
+ .thenCompose(
+ ignoredVoid ->
+ withWriteLockAsync(
+ () -> createDirtyResultInternal(applicationResultEntry)));
+ }
+
+ @GuardedBy("readWriteLock")
+ protected abstract void createDirtyResultInternal(ApplicationResultEntry applicationResultEntry)
+ throws IOException;
+
+ @Override
+ public CompletableFuture markResultAsCleanAsync(ApplicationID applicationId) {
+ return hasCleanApplicationResultEntryAsync(applicationId)
+ .thenCompose(
+ hasCleanApplicationResultEntry -> {
+ if (hasCleanApplicationResultEntry) {
+ LOG.debug(
+ "The application {} is already marked as clean. No action required.",
+ applicationId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return withWriteLockAsync(
+ () -> markResultAsCleanInternal(applicationId));
+ });
+ }
+
+ @GuardedBy("readWriteLock")
+ protected abstract void markResultAsCleanInternal(ApplicationID applicationId)
+ throws IOException, NoSuchElementException;
+
+ @Override
+ public CompletableFuture hasApplicationResultEntryAsync(ApplicationID applicationId) {
+ return withReadLockAsync(
+ () ->
+ hasDirtyApplicationResultEntryInternal(applicationId)
+ || hasCleanApplicationResultEntryInternal(applicationId));
+ }
+
+ @Override
+ public CompletableFuture hasDirtyApplicationResultEntryAsync(
+ ApplicationID applicationId) {
+ return withReadLockAsync(() -> hasDirtyApplicationResultEntryInternal(applicationId));
+ }
+
+ @GuardedBy("readWriteLock")
+ protected abstract boolean hasDirtyApplicationResultEntryInternal(ApplicationID applicationId)
+ throws IOException;
+
+ @Override
+ public CompletableFuture hasCleanApplicationResultEntryAsync(
+ ApplicationID applicationId) {
+ return withReadLockAsync(() -> hasCleanApplicationResultEntryInternal(applicationId));
+ }
+
+ @GuardedBy("readWriteLock")
+ protected abstract boolean hasCleanApplicationResultEntryInternal(ApplicationID applicationId)
+ throws IOException;
+
+ @Override
+ public Set getDirtyResults() throws IOException {
+ return withReadLock(this::getDirtyResultsInternal);
+ }
+
+ @GuardedBy("readWriteLock")
+ protected abstract Set getDirtyResultsInternal() throws IOException;
+
+ private CompletableFuture withWriteLockAsync(ThrowingRunnable