recoveredTerminalJobInfos);
+
+ /**
+ * Gets the unique identifier of the application.
+ *
+ * @return the application id
+ */
+ ApplicationID getApplicationId();
+
+ /**
+ * Gets the name of the application.
+ *
+ * @return the application name
+ */
+ String getName();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java
new file mode 100644
index 0000000000000..0ec9f9d47c51c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationStoreUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+
+/**
+ * ApplicationStore utility interfaces. For example, convert a name(e.g. ZooKeeper path, key name in
+ * Kubernetes ConfigMap) to {@link ApplicationID}, or vice versa.
+ */
+public interface ApplicationStoreUtil {
+
+ /**
+ * Get the name in external storage from application id.
+ *
+ * @param applicationId application id
+ * @return Key name in ConfigMap or child path name in ZooKeeper
+ */
+ String applicationIdToName(ApplicationID applicationId);
+
+ /**
+ * Get the application id from name.
+ *
+ * @param name Key name in ConfigMap or child path name in ZooKeeper
+ * @return parsed application id.
+ */
+ ApplicationID nameToApplicationId(String name);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java
new file mode 100644
index 0000000000000..f49956b412e63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ApplicationWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableApplicationResource;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/** Allows to store and remove applications. */
+@Internal
+public interface ApplicationWriter extends GloballyCleanableApplicationResource {
+
+ /**
+ * Adds the {@link ApplicationStoreEntry} instance.
+ *
+ * If an application with the same {@link ApplicationID} exists, it is replaced.
+ */
+ void putApplication(ApplicationStoreEntry application) throws Exception;
+
+ @Override
+ default CompletableFuture globalCleanupAsync(
+ ApplicationID applicationId, Executor executor) {
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java
new file mode 100644
index 0000000000000..fc47b69bbad14
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultApplicationStore.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.persistence.ResourceVersion;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Default implementation for {@link ApplicationStore}. Combined with different {@link
+ * StateHandleStore}, we could persist the applications to various distributed storage.
+ */
+public class DefaultApplicationStore> implements ApplicationStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultApplicationStore.class);
+
+ private final Object lock = new Object();
+
+ /** The set of IDs of all added applications. */
+ @GuardedBy("lock")
+ private final Set addedApplications = new HashSet<>();
+
+ /** Submitted applications handle store. */
+ private final StateHandleStore applicationStateHandleStore;
+
+ private final ApplicationStoreUtil applicationStoreUtil;
+
+ /** Flag indicating whether this instance is running. */
+ @GuardedBy("lock")
+ private volatile boolean running;
+
+ public DefaultApplicationStore(
+ StateHandleStore stateHandleStore,
+ ApplicationStoreUtil applicationStoreUtil) {
+ this.applicationStateHandleStore = checkNotNull(stateHandleStore);
+ this.applicationStoreUtil = checkNotNull(applicationStoreUtil);
+
+ this.running = false;
+ }
+
+ @Override
+ public void start() throws Exception {
+ synchronized (lock) {
+ if (!running) {
+ running = true;
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ synchronized (lock) {
+ if (running) {
+ running = false;
+ LOG.info("Stopping DefaultApplicationStore.");
+ Exception exception = null;
+
+ try {
+ applicationStateHandleStore.releaseAll();
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
+ throw new FlinkException(
+ "Could not properly stop the DefaultApplicationStore.", exception);
+ }
+ }
+ }
+ }
+
+ @Override
+ public Optional recoverApplication(ApplicationID applicationId)
+ throws Exception {
+ checkNotNull(applicationId, "Application ID");
+
+ LOG.debug("Recovering application {} from {}.", applicationId, applicationStateHandleStore);
+
+ final String name = applicationStoreUtil.applicationIdToName(applicationId);
+
+ synchronized (lock) {
+ verifyIsRunning();
+
+ boolean success = false;
+
+ RetrievableStateHandle applicationRetrievableStateHandle;
+
+ try {
+ try {
+ applicationRetrievableStateHandle =
+ applicationStateHandleStore.getAndLock(name);
+ } catch (StateHandleStore.NotExistException ignored) {
+ success = true;
+ return Optional.empty();
+ } catch (Exception e) {
+ throw new FlinkException(
+ "Could not retrieve the submitted application state handle "
+ + "for "
+ + name
+ + " from the submitted application store.",
+ e);
+ }
+
+ ApplicationStoreEntry application;
+ try {
+ application = applicationRetrievableStateHandle.retrieveState();
+ } catch (ClassNotFoundException cnfe) {
+ throw new FlinkException(
+ "Could not retrieve submitted application from state handle under "
+ + name
+ + ". This indicates that you are trying to recover from state written by an "
+ + "older Flink version which is not compatible. Try cleaning the state handle store.",
+ cnfe);
+ } catch (IOException ioe) {
+ throw new FlinkException(
+ "Could not retrieve submitted application from state handle under "
+ + name
+ + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle "
+ + "store.",
+ ioe);
+ }
+
+ addedApplications.add(applicationId);
+
+ LOG.info("Recovered {} ({}).", application.getName(), applicationId);
+
+ success = true;
+ return Optional.of(application);
+ } finally {
+ if (!success) {
+ applicationStateHandleStore.release(name);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void putApplication(ApplicationStoreEntry application) throws Exception {
+ checkNotNull(application, "Application");
+
+ final ApplicationID applicationID = application.getApplicationId();
+ final String name = applicationStoreUtil.applicationIdToName(applicationID);
+
+ LOG.debug("Adding application {} to {}.", applicationID, applicationStateHandleStore);
+
+ boolean success = false;
+
+ while (!success) {
+ synchronized (lock) {
+ verifyIsRunning();
+
+ final R currentVersion = applicationStateHandleStore.exists(name);
+
+ if (!currentVersion.isExisting()) {
+ try {
+ applicationStateHandleStore.addAndLock(name, application);
+
+ addedApplications.add(applicationID);
+
+ success = true;
+ } catch (StateHandleStore.AlreadyExistException ignored) {
+ LOG.warn(
+ "{} already exists in {}.",
+ application,
+ applicationStateHandleStore);
+ }
+ } else if (addedApplications.contains(applicationID)) {
+ try {
+ applicationStateHandleStore.replace(name, currentVersion, application);
+ LOG.info("Updated {} in {}.", application, getClass().getSimpleName());
+
+ success = true;
+ } catch (StateHandleStore.NotExistException ignored) {
+ LOG.warn(
+ "{} does not exists in {}.",
+ application,
+ applicationStateHandleStore);
+ }
+ } else {
+ throw new IllegalStateException(
+ "Trying to update an application you didn't "
+ + "#getAllSubmittedApplications() or #putApplication() yourself before.");
+ }
+ }
+ }
+
+ LOG.info("Added {} to {}.", application, applicationStateHandleStore);
+ }
+
+ @Override
+ public CompletableFuture globalCleanupAsync(
+ ApplicationID applicationId, Executor executor) {
+ checkNotNull(applicationId, "Application ID");
+
+ return runAsyncWithLockAssertRunning(
+ () -> {
+ LOG.debug(
+ "Removing application {} from {}.",
+ applicationId,
+ applicationStateHandleStore);
+
+ final String name = applicationStoreUtil.applicationIdToName(applicationId);
+ releaseAndRemoveOrThrowCompletionException(applicationId, name);
+
+ addedApplications.remove(applicationId);
+
+ LOG.info(
+ "Removed application {} from {}.",
+ applicationId,
+ applicationStateHandleStore);
+ },
+ executor);
+ }
+
+ @GuardedBy("lock")
+ private void releaseAndRemoveOrThrowCompletionException(
+ ApplicationID applicationId, String applicationName) {
+ boolean success;
+ try {
+ success = applicationStateHandleStore.releaseAndTryRemove(applicationName);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+
+ if (!success) {
+ throw new CompletionException(
+ new FlinkException(
+ String.format(
+ "Could not remove application with application id %s from %s.",
+ applicationId, applicationStateHandleStore)));
+ }
+ }
+
+ private CompletableFuture runAsyncWithLockAssertRunning(
+ ThrowingRunnable runnable, Executor executor) {
+ return CompletableFuture.runAsync(
+ () -> {
+ synchronized (lock) {
+ verifyIsRunning();
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }
+ },
+ executor);
+ }
+
+ @Override
+ public Collection getApplicationIds() throws Exception {
+ LOG.debug("Retrieving all stored application ids from {}.", applicationStateHandleStore);
+
+ final Collection names;
+ try {
+ names = applicationStateHandleStore.getAllHandles();
+ } catch (Exception e) {
+ throw new Exception(
+ "Failed to retrieve all application ids from "
+ + applicationStateHandleStore
+ + ".",
+ e);
+ }
+
+ final List applicationIds = new ArrayList<>(names.size());
+
+ for (String name : names) {
+ try {
+ applicationIds.add(applicationStoreUtil.nameToApplicationId(name));
+ } catch (Exception exception) {
+ LOG.warn(
+ "Could not parse application id from {}. This indicates a malformed name.",
+ name,
+ exception);
+ }
+ }
+
+ LOG.info(
+ "Retrieved application ids {} from {}",
+ applicationIds,
+ applicationStateHandleStore);
+
+ return applicationIds;
+ }
+
+ /** Verifies that the state is running. */
+ private void verifyIsRunning() {
+ checkState(running, "Not running. Forgot to call start()?");
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobPersistenceComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesPersistenceComponentFactory.java
similarity index 76%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobPersistenceComponentFactory.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesPersistenceComponentFactory.java
index 064184da682ee..44ca012e4557f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesJobPersistenceComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HaServicesPersistenceComponentFactory.java
@@ -18,19 +18,20 @@
package org.apache.flink.runtime.jobmanager;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.SupplierWithException;
/**
- * {@link JobPersistenceComponentFactory} implementation which creates a {@link ExecutionPlanStore}
+ * {@link PersistenceComponentFactory} implementation which creates a {@link ExecutionPlanStore}
* using the provided {@link HighAvailabilityServices}.
*/
-public class HaServicesJobPersistenceComponentFactory implements JobPersistenceComponentFactory {
+public class HaServicesPersistenceComponentFactory implements PersistenceComponentFactory {
private final HighAvailabilityServices highAvailabilityServices;
- public HaServicesJobPersistenceComponentFactory(
+ public HaServicesPersistenceComponentFactory(
HighAvailabilityServices highAvailabilityServices) {
this.highAvailabilityServices = highAvailabilityServices;
}
@@ -45,6 +46,17 @@ public JobResultStore createJobResultStore() {
return create(highAvailabilityServices::getJobResultStore, JobResultStore.class);
}
+ @Override
+ public ApplicationStore createApplicationStore() {
+ return create(highAvailabilityServices::getApplicationStore, ApplicationStore.class);
+ }
+
+ @Override
+ public ApplicationResultStore createApplicationResultStore() {
+ return create(
+ highAvailabilityServices::getApplicationResultStore, ApplicationResultStore.class);
+ }
+
private T create(SupplierWithException supplier, Class clazz) {
try {
return supplier.get();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PersistenceComponentFactory.java
similarity index 73%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PersistenceComponentFactory.java
index 7834d7d39722b..256f972db4059 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PersistenceComponentFactory.java
@@ -18,10 +18,11 @@
package org.apache.flink.runtime.jobmanager;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
import org.apache.flink.runtime.highavailability.JobResultStore;
/** Factory for components that are responsible for persisting a job for recovery. */
-public interface JobPersistenceComponentFactory {
+public interface PersistenceComponentFactory {
/**
* Creates a {@link ExecutionPlanStore}.
@@ -36,4 +37,18 @@ public interface JobPersistenceComponentFactory {
* @return a {@code JobResultStore} instance.
*/
JobResultStore createJobResultStore();
+
+ /**
+ * Creates {@link ApplicationStore} instances.
+ *
+ * @return a {@code ApplicationStore} instance.
+ */
+ ApplicationStore createApplicationStore();
+
+ /**
+ * Creates {@link ApplicationResultStore} instances.
+ *
+ * @return a {@code ApplicationResultStore} instance.
+ */
+ ApplicationResultStore createApplicationResultStore();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java
new file mode 100644
index 0000000000000..d35f3ed7e7ee4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneApplicationStore.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * {@link ApplicationStore} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
+ *
+ * All operations are NoOps, because {@link ApplicationStoreEntry} instances cannot be recovered
+ * in this recovery mode.
+ */
+public class StandaloneApplicationStore implements ApplicationStore {
+
+ @Override
+ public void start() throws Exception {
+ // Nothing to do
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do
+ }
+
+ @Override
+ public void putApplication(ApplicationStoreEntry application) {
+ // Nothing to do
+ }
+
+ @Override
+ public Collection getApplicationIds() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Optional recoverApplication(ApplicationID applicationID) {
+ return Optional.empty();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java
new file mode 100644
index 0000000000000..3682ead2e9327
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperApplicationStoreUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/** Singleton {@link ApplicationStoreUtil} implementation for ZooKeeper. */
+public enum ZooKeeperApplicationStoreUtil implements ApplicationStoreUtil {
+ INSTANCE;
+
+ @Override
+ public String applicationIdToName(ApplicationID applicationId) {
+ return ZooKeeperUtils.getPathForApplication(applicationId);
+ }
+
+ @Override
+ public ApplicationID nameToApplicationId(String name) {
+ return ApplicationID.fromHexString(name);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 7d2defb29fb82..0b834ca503277 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -20,6 +20,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
@@ -35,6 +37,7 @@
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.application.SingleJobApplication;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
@@ -1182,6 +1185,18 @@ private CompletableFuture createBlobServerAddress(
.thenCompose(Function.identity());
}
+ // ------------------------------------------------------------------------
+ // Accessing applications
+ // ------------------------------------------------------------------------
+
+ public CompletableFuture getApplicationStatus(ApplicationID applicationId) {
+ return runDispatcherCommand(
+ dispatcherGateway ->
+ dispatcherGateway
+ .requestApplication(applicationId, rpcTimeout)
+ .thenApply(ArchivedApplication::getApplicationStatus));
+ }
+
// ------------------------------------------------------------------------
// factories - can be overridden by subclasses to alter behavior
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultDeserializer.java
new file mode 100644
index 0000000000000..223a83b54c88f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultDeserializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link ApplicationResult}.
+ *
+ * @see ApplicationResultSerializer
+ */
+public class ApplicationResultDeserializer extends StdDeserializer {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ApplicationIDDeserializer applicationIdDeserializer =
+ new ApplicationIDDeserializer();
+
+ private final SerializedThrowableDeserializer serializedThrowableDeserializer =
+ new SerializedThrowableDeserializer();
+
+ public ApplicationResultDeserializer() {
+ super(ApplicationResult.class);
+ }
+
+ @Override
+ public ApplicationResult deserialize(final JsonParser p, final DeserializationContext ctxt)
+ throws IOException {
+ ApplicationID applicationId = null;
+ ApplicationState applicationState = null;
+ String applicationName = "unknown";
+ long startTime = -1;
+ long endTime = -1;
+
+ while (true) {
+ final JsonToken jsonToken = p.nextToken();
+ assertNotEndOfInput(p, jsonToken);
+ if (jsonToken == JsonToken.END_OBJECT) {
+ break;
+ }
+
+ final String fieldName = p.getValueAsString();
+ switch (fieldName) {
+ case ApplicationResultSerializer.FIELD_NAME_APPLICATION_ID:
+ assertNextToken(p, JsonToken.VALUE_STRING);
+ applicationId = applicationIdDeserializer.deserialize(p, ctxt);
+ break;
+ case ApplicationResultSerializer.FIELD_NAME_APPLICATION_STATE:
+ assertNextToken(p, JsonToken.VALUE_STRING);
+ applicationState = ApplicationState.valueOf(p.getValueAsString());
+ break;
+ case ApplicationResultSerializer.FIELD_NAME_APPLICATION_NAME:
+ assertNextToken(p, JsonToken.VALUE_STRING);
+ applicationName = p.getValueAsString();
+ break;
+ case ApplicationResultSerializer.FIELD_NAME_START_TIME:
+ assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
+ startTime = p.getLongValue();
+ break;
+ case ApplicationResultSerializer.FIELD_NAME_END_TIME:
+ assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
+ endTime = p.getLongValue();
+ break;
+ default:
+ // ignore unknown fields
+ }
+ }
+
+ try {
+ return new ApplicationResult.Builder()
+ .applicationId(applicationId)
+ .applicationState(applicationState)
+ .applicationName(applicationName)
+ .startTime(startTime)
+ .endTime(endTime)
+ .build();
+ } catch (final RuntimeException e) {
+ throw new JsonMappingException(
+ null, "Could not deserialize " + ApplicationResult.class.getSimpleName(), e);
+ }
+ }
+
+ /** Asserts that the provided JsonToken is not null, i.e., not at the end of the input. */
+ private static void assertNotEndOfInput(
+ final JsonParser p, @Nullable final JsonToken jsonToken) {
+ checkState(jsonToken != null, "Unexpected end of input at %s", p.getCurrentLocation());
+ }
+
+ /** Advances the token and asserts that it matches the required {@link JsonToken}. */
+ private static void assertNextToken(final JsonParser p, final JsonToken requiredJsonToken)
+ throws IOException {
+ final JsonToken jsonToken = p.nextToken();
+ if (jsonToken != requiredJsonToken) {
+ throw new JsonMappingException(
+ p, String.format("Expected token %s (was %s)", requiredJsonToken, jsonToken));
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultSerializer.java
new file mode 100644
index 0000000000000..dc73a18f4913a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ApplicationResultSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.highavailability.ApplicationResult;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link ApplicationResult}.
+ *
+ * @see ApplicationResultDeserializer
+ */
+public class ApplicationResultSerializer extends StdSerializer {
+
+ private static final long serialVersionUID = 1L;
+
+ static final String FIELD_NAME_APPLICATION_ID = "application-id";
+
+ static final String FIELD_NAME_APPLICATION_STATE = "application-state";
+
+ static final String FIELD_NAME_APPLICATION_NAME = "application-name";
+
+ static final String FIELD_NAME_START_TIME = "start-time";
+
+ static final String FIELD_NAME_END_TIME = "end-time";
+
+ private final ApplicationIDSerializer applicationIdSerializer = new ApplicationIDSerializer();
+
+ public ApplicationResultSerializer() {
+ super(ApplicationResult.class);
+ }
+
+ @Override
+ public void serialize(
+ final ApplicationResult result,
+ final JsonGenerator gen,
+ final SerializerProvider provider)
+ throws IOException {
+
+ gen.writeStartObject();
+
+ gen.writeFieldName(FIELD_NAME_APPLICATION_ID);
+ applicationIdSerializer.serialize(result.getApplicationId(), gen, provider);
+
+ gen.writeFieldName(FIELD_NAME_APPLICATION_STATE);
+ gen.writeString(result.getApplicationState().name());
+
+ gen.writeFieldName(FIELD_NAME_APPLICATION_NAME);
+ gen.writeString(result.getApplicationName());
+
+ gen.writeNumberField(FIELD_NAME_START_TIME, result.getStartTime());
+ gen.writeNumberField(FIELD_NAME_END_TIME, result.getEndTime());
+
+ gen.writeEndObject();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index a1ed8f78f0e3e..e814c35987efa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.util;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -35,9 +36,13 @@
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
+import org.apache.flink.runtime.jobmanager.ApplicationStore;
+import org.apache.flink.runtime.jobmanager.ApplicationStoreEntry;
+import org.apache.flink.runtime.jobmanager.DefaultApplicationStore;
import org.apache.flink.runtime.jobmanager.DefaultExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmanager.ZooKeeperApplicationStoreUtil;
import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreUtil;
import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreWatcher;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
@@ -102,6 +107,9 @@ public class ZooKeeperUtils {
public static final String HA_STORAGE_SUBMITTED_EXECUTION_PLAN_PREFIX =
"submittedExecutionPlan";
+ /** The prefix of the submitted application file. */
+ public static final String HA_STORAGE_SUBMITTED_APPLICATION_PREFIX = "submittedApplication";
+
/** The prefix of the completed checkpoint file. */
public static final String HA_STORAGE_COMPLETED_CHECKPOINT = "completedCheckpoint";
@@ -561,6 +569,44 @@ public static ExecutionPlanStore createExecutionPlans(
ZooKeeperExecutionPlanStoreUtil.INSTANCE);
}
+ /**
+ * Creates a {@link DefaultApplicationStore} instance with {@link ZooKeeperStateHandleStore},
+ * and {@link ZooKeeperApplicationStoreUtil}.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object
+ * @return {@link DefaultApplicationStore} instance
+ * @throws Exception if the submitted application store cannot be created
+ */
+ public static ApplicationStore createApplicationStore(
+ CuratorFramework client, Configuration configuration) throws Exception {
+
+ checkNotNull(configuration, "Configuration");
+
+ RetrievableStateStorageHelper stateStorage =
+ createFileSystemStateStorage(
+ configuration, HA_STORAGE_SUBMITTED_APPLICATION_PREFIX);
+
+ // ZooKeeper submitted applications root dir
+ String zooKeeperApplicationsPath =
+ configuration.get(HighAvailabilityOptions.HA_ZOOKEEPER_APPLICATIONS_PATH);
+
+ // Ensure that the applications path exists
+ client.newNamespaceAwareEnsurePath(zooKeeperApplicationsPath)
+ .ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ CuratorFramework facade =
+ client.usingNamespace(client.getNamespace() + zooKeeperApplicationsPath);
+
+ final ZooKeeperStateHandleStore
+ zooKeeperApplicationStateHandleStore =
+ new ZooKeeperStateHandleStore<>(facade, stateStorage);
+
+ return new DefaultApplicationStore<>(
+ zooKeeperApplicationStateHandleStore, ZooKeeperApplicationStoreUtil.INSTANCE);
+ }
+
/**
* Creates a {@link DefaultCompletedCheckpointStore} instance with {@link
* ZooKeeperStateHandleStore}.
@@ -616,6 +662,12 @@ public static String getPathForJob(JobID jobId) {
return String.format("/%s", jobId);
}
+ /** Returns the ApplicationID as a String (with leading slash). */
+ public static String getPathForApplication(ApplicationID applicationId) {
+ checkNotNull(applicationId, "Application ID");
+ return String.format("/%s", applicationId);
+ }
+
/**
* Creates an instance of {@link ZooKeeperStateHandleStore}.
*
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
index b50a1826367a0..8c16371c29a4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
@@ -26,8 +26,10 @@
import org.apache.flink.runtime.dispatcher.cleanup.CheckpointResourcesCleanupRunnerFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
+import org.apache.flink.runtime.highavailability.EmbeddedApplicationResultStore;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
+import org.apache.flink.runtime.jobmanager.StandaloneApplicationStore;
import org.apache.flink.runtime.jobmanager.StandaloneExecutionPlanStore;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.rpc.RpcUtils;
@@ -97,6 +99,8 @@ public void setUp() throws Exception {
haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
haServices.setExecutionPlanStore(new StandaloneExecutionPlanStore());
haServices.setJobResultStore(new EmbeddedJobResultStore());
+ haServices.setApplicationStore(new StandaloneApplicationStore());
+ haServices.setApplicationResultStore(new EmbeddedApplicationResultStore());
configuration = new Configuration();
blobServer =
@@ -110,6 +114,8 @@ protected TestingDispatcher.Builder createTestingDispatcherBuilder() {
.setHighAvailabilityServices(haServices)
.setExecutionPlanWriter(haServices.getExecutionPlanStore())
.setJobResultStore(haServices.getJobResultStore())
+ .setApplicationWriter(haServices.getApplicationStore())
+ .setApplicationResultStore(haServices.getApplicationResultStore())
.setJobManagerRunnerFactory(JobMasterServiceLeadershipRunnerFactory.INSTANCE)
.setCleanupRunnerFactory(CheckpointResourcesCleanupRunnerFactory.INSTANCE)
.setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
new file mode 100644
index 0000000000000..269593e26e809
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationResourceCleanupTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
+import org.apache.flink.runtime.dispatcher.cleanup.TestingApplicationResourceCleanerFactory;
+import org.apache.flink.runtime.highavailability.ApplicationResultEntry;
+import org.apache.flink.runtime.highavailability.ApplicationResultStore;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testutils.TestingApplicationResultStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Reference;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Tests the application resource cleanup by the {@link Dispatcher}. */
+@ExtendWith(TestLoggerExtension.class)
+public class DispatcherApplicationResourceCleanupTest {
+
+ @TempDir public Path tempDir;
+
+ @RegisterExtension
+ final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource =
+ new TestingFatalErrorHandlerExtension();
+
+ private static final Duration timeout = Duration.ofSeconds(10L);
+
+ private static TestingRpcService rpcService;
+
+ private ApplicationID applicationId;
+
+ private TestingDispatcher dispatcher;
+
+ private DispatcherGateway dispatcherGateway;
+
+ private BlobServer blobServer;
+
+ private CompletableFuture globalCleanupFuture;
+
+ @BeforeAll
+ public static void setupClass() {
+ rpcService = new TestingRpcService();
+ }
+
+ @BeforeEach
+ void setup() throws Exception {
+ applicationId = new ApplicationID();
+
+ globalCleanupFuture = new CompletableFuture<>();
+
+ blobServer =
+ BlobUtils.createBlobServer(
+ new Configuration(),
+ Reference.owned(tempDir.toFile()),
+ new TestingBlobStoreBuilder().createTestingBlobStore());
+ }
+
+ @AfterEach
+ void teardown() throws Exception {
+ if (dispatcher != null) {
+ dispatcher.close();
+ }
+
+ if (blobServer != null) {
+ blobServer.close();
+ }
+ }
+
+ @AfterAll
+ static void teardownClass() throws ExecutionException, InterruptedException {
+ if (rpcService != null) {
+ rpcService.closeAsync().get();
+ }
+ }
+
+ @Test
+ void testGlobalCleanupWhenApplicationFinished() throws Exception {
+ startDispatcher(createTestingDispatcherBuilder());
+
+ submitApplicationAndWait();
+
+ dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
+
+ dispatcher
+ .getApplicationTerminationFuture(applicationId)
+ .get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+
+ assertGlobalCleanupTriggered(applicationId);
+ }
+
+ @Test
+ void testApplicationMarkedAsDirtyBeforeCleanup() throws Exception {
+ final OneShotLatch markAsDirtyLatch = new OneShotLatch();
+
+ final TestingDispatcher.Builder dispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setApplicationResultStore(
+ TestingApplicationResultStore.builder()
+ .withCreateDirtyResultConsumer(
+ ignoredApplicationResultEntry -> {
+ try {
+ markAsDirtyLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return FutureUtils.completedExceptionally(
+ e);
+ }
+ return FutureUtils.completedVoidFuture();
+ })
+ .build());
+ startDispatcher(dispatcherBuilder);
+
+ submitApplicationAndWait();
+
+ dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
+
+ assertThatNoCleanupWasTriggered();
+
+ // Trigger the latch to allow dirty result creation to complete
+ markAsDirtyLatch.trigger();
+
+ // Now cleanup should be triggered
+ assertGlobalCleanupTriggered(applicationId);
+ }
+
+ @Test
+ void testApplicationMarkedAsCleanAfterCleanup() throws Exception {
+ final CompletableFuture markAsCleanFuture = new CompletableFuture<>();
+
+ final ApplicationResultStore applicationResultStore =
+ TestingApplicationResultStore.builder()
+ .withMarkResultAsCleanConsumer(
+ applicationId -> {
+ markAsCleanFuture.complete(applicationId);
+ return FutureUtils.completedVoidFuture();
+ })
+ .build();
+
+ final OneShotLatch cleanupLatch = new OneShotLatch();
+
+ final TestingApplicationResourceCleanerFactory resourceCleanerFactory =
+ TestingApplicationResourceCleanerFactory.builder()
+ .withGloballyCleanableResource(
+ (applicationId, ignoredExecutor) -> {
+ try {
+ cleanupLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return FutureUtils.completedVoidFuture();
+ })
+ .build();
+
+ final TestingDispatcher.Builder dispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setApplicationResultStore(applicationResultStore)
+ .setApplicationResourceCleanerFactory(resourceCleanerFactory);
+
+ startDispatcher(dispatcherBuilder);
+
+ submitApplicationAndWait();
+
+ dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
+
+ // Mark as clean should not have been called yet
+ assertFalse(markAsCleanFuture.isDone());
+
+ // Trigger cleanup
+ cleanupLatch.trigger();
+
+ // Wait for cleanup to complete
+ dispatcher
+ .getApplicationTerminationFuture(applicationId)
+ .get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+
+ // Verify mark as clean was called
+ assertEquals(
+ applicationId, markAsCleanFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ void testDispatcherTerminationTerminatesRunningApplications() throws Exception {
+ startDispatcher(createTestingDispatcherBuilder());
+
+ submitApplicationAndWait();
+
+ dispatcher.closeAsync().get();
+
+ assertThrows(
+ CancellationException.class,
+ () -> dispatcher.getApplicationTerminationFuture(applicationId).get());
+ }
+
+ @Test
+ void testFatalErrorIfApplicationCannotBeMarkedDirtyInApplicationResultStore() throws Exception {
+ final ApplicationResultStore applicationResultStore =
+ TestingApplicationResultStore.builder()
+ .withCreateDirtyResultConsumer(
+ applicationResult ->
+ FutureUtils.completedExceptionally(
+ new IOException("Expected IOException.")))
+ .build();
+
+ startDispatcher(
+ createTestingDispatcherBuilder().setApplicationResultStore(applicationResultStore));
+
+ submitApplicationAndWait();
+
+ dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
+
+ // Fatal error should be reported
+ final CompletableFuture extends Throwable> errorFuture =
+ testingFatalErrorHandlerResource.getTestingFatalErrorHandler().getErrorFuture();
+ assertThat(errorFuture.get()).isInstanceOf(FlinkException.class);
+
+ testingFatalErrorHandlerResource.getTestingFatalErrorHandler().clearError();
+ }
+
+ @Test
+ void testErrorHandlingIfApplicationCannotBeMarkedAsCleanInApplicationResultStore()
+ throws Exception {
+ final CompletableFuture dirtyApplicationFuture =
+ new CompletableFuture<>();
+ final ApplicationResultStore applicationResultStore =
+ TestingApplicationResultStore.builder()
+ .withCreateDirtyResultConsumer(
+ applicationResultEntry -> {
+ dirtyApplicationFuture.complete(applicationResultEntry);
+ return FutureUtils.completedVoidFuture();
+ })
+ .withMarkResultAsCleanConsumer(
+ applicationId ->
+ FutureUtils.completedExceptionally(
+ new IOException("Expected IOException.")))
+ .build();
+
+ startDispatcher(
+ createTestingDispatcherBuilder().setApplicationResultStore(applicationResultStore));
+
+ submitApplicationAndWait();
+
+ dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
+
+ // No fatal error should be reported (mark as clean failure is handled gracefully)
+ final CompletableFuture extends Throwable> errorFuture =
+ testingFatalErrorHandlerResource.getTestingFatalErrorHandler().getErrorFuture();
+ try {
+ errorFuture.get(100, TimeUnit.MILLISECONDS);
+ fail("No error should have been reported.");
+ } catch (TimeoutException e) {
+ // expected
+ }
+
+ // Dirty result should have been created
+ assertEquals(
+ applicationId,
+ dirtyApplicationFuture
+ .get(timeout.toMillis(), TimeUnit.MILLISECONDS)
+ .getApplicationId());
+ }
+
+ @Test
+ void testArchivingFinishedApplicationToHistoryServer() throws Exception {
+ final CompletableFuture archiveFuture = new CompletableFuture<>();
+
+ final TestingDispatcher.Builder dispatcherBuilder =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(
+ TestingHistoryServerArchivist.builder()
+ .setArchiveApplicationFunction(
+ archivedApplication -> archiveFuture)
+ .build());
+
+ startDispatcher(dispatcherBuilder);
+
+ submitApplicationAndWait();
+
+ dispatcher.notifyApplicationStatusChange(applicationId, ApplicationState.FINISHED);
+
+ // Before the archiving is finished, the cleanup is not finished and the application is not
+ // terminated
+ assertThatNoCleanupWasTriggered();
+ assertFalse(dispatcher.getApplicationTerminationFuture(applicationId).isDone());
+
+ // Complete archiving
+ archiveFuture.complete(Acknowledge.get());
+
+ // Once the archive is finished, the cleanup is finished and the application is terminated.
+ assertGlobalCleanupTriggered(applicationId);
+ dispatcher.getApplicationTerminationFuture(applicationId).join();
+ }
+
+ private void startDispatcher(TestingDispatcher.Builder dispatcherBuilder) throws Exception {
+ dispatcher = dispatcherBuilder.build(rpcService);
+
+ dispatcher.start();
+
+ dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+ }
+
+ private TestingDispatcher.Builder createTestingDispatcherBuilder() {
+ return TestingDispatcher.builder()
+ .setBlobServer(blobServer)
+ .setFatalErrorHandler(
+ testingFatalErrorHandlerResource.getTestingFatalErrorHandler())
+ .setApplicationResourceCleanerFactory(
+ TestingApplicationResourceCleanerFactory.builder()
+ .withGloballyCleanableResource(
+ (applicationId, ignoredExecutor) -> {
+ globalCleanupFuture.complete(applicationId);
+ return FutureUtils.completedVoidFuture();
+ })
+ .build());
+ }
+
+ private void assertThatNoCleanupWasTriggered() {
+ assertThat(globalCleanupFuture.isDone()).isFalse();
+ }
+
+ private void assertGlobalCleanupTriggered(ApplicationID applicationId)
+ throws ExecutionException, InterruptedException, TimeoutException {
+ assertThat(globalCleanupFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS))
+ .isEqualTo(applicationId);
+ }
+
+ private CompletableFuture submitApplication() {
+ return dispatcherGateway.submitApplication(
+ TestingApplication.builder().setApplicationId(applicationId).build(), timeout);
+ }
+
+ private void submitApplicationAndWait() {
+ submitApplication().join();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
new file mode 100644
index 0000000000000..15748868ffd4c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherApplicationTest.java
@@ -0,0 +1,848 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.DuplicateApplicationSubmissionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.cleanup.CheckpointResourcesCleanupRunnerFactory;
+import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
+import org.apache.flink.runtime.highavailability.ApplicationResult;
+import org.apache.flink.runtime.highavailability.ApplicationResultEntry;
+import org.apache.flink.runtime.highavailability.EmbeddedApplicationResultStore;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobmanager.StandaloneApplicationStore;
+import org.apache.flink.runtime.jobmanager.StandaloneExecutionPlanStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkApplicationTerminatedWithoutCancellationException;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.testutils.TestingApplicationResultStore;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for the {@link Dispatcher} component that are related to applications. */
+@ExtendWith(TestLoggerExtension.class)
+public class DispatcherApplicationTest {
+
+ static TestingRpcService rpcService;
+
+ static final Duration TIMEOUT = Duration.ofMinutes(1L);
+
+ @TempDir public Path tempDir;
+
+ @RegisterExtension
+ final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource =
+ new TestingFatalErrorHandlerExtension();
+
+ Configuration configuration;
+
+ BlobServer blobServer;
+
+ TestingHighAvailabilityServices haServices;
+
+ private ApplicationID applicationId;
+
+ private JobGraph jobGraph;
+
+ private JobID jobId;
+
+ /** Instance under test. */
+ private TestingDispatcher dispatcher;
+
+ @BeforeAll
+ public static void setupClass() {
+ rpcService = new TestingRpcService();
+ }
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ haServices = new TestingHighAvailabilityServices();
+ haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+ haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
+ haServices.setExecutionPlanStore(new StandaloneExecutionPlanStore());
+ haServices.setJobResultStore(new EmbeddedJobResultStore());
+ haServices.setApplicationStore(new StandaloneApplicationStore());
+ haServices.setApplicationResultStore(new EmbeddedApplicationResultStore());
+
+ configuration = new Configuration();
+ blobServer = new BlobServer(configuration, tempDir.toFile(), new VoidBlobStore());
+ jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
+ jobId = jobGraph.getJobID();
+ applicationId = ApplicationID.fromHexString(jobId.toHexString());
+ jobGraph.setApplicationId(applicationId);
+ }
+
+ @AfterEach
+ void teardown() throws Exception {
+ if (dispatcher != null) {
+ dispatcher.close();
+ }
+
+ if (blobServer != null) {
+ blobServer.close();
+ }
+ }
+
+ @AfterAll
+ static void teardownClass() throws ExecutionException, InterruptedException {
+ if (rpcService != null) {
+ rpcService.closeAsync().get();
+ }
+ }
+
+ @Test
+ void testApplicationStatusChange_ArchiveNotCalledForNonTerminalStatus() throws Exception {
+ final CompletableFuture archiveApplicationFuture = new CompletableFuture<>();
+ dispatcher =
+ createTestingDispatcherBuilder()
+ .setHistoryServerArchivist(
+ TestingHistoryServerArchivist.builder()
+ .setArchiveApplicationFunction(
+ archivedApplication -> {
+ archiveApplicationFuture.complete(null);
+ return CompletableFuture.completedFuture(null);
+ })
+ .build())
+ .build(rpcService);
+ dispatcher.start();
+ submitApplicationAndMockApplicationStatusChange(ApplicationState.RUNNING);
+
+ // verify that archive application is not called
+ assertFalse(archiveApplicationFuture.isDone());
+ assertFalse(dispatcher.getApplicationTerminationFuture(applicationId).isDone());
+ }
+
+ @Test
+ void testApplicationStatusChange_ArchiveCalledForTerminalStatus() throws Exception {
+ final CompletableFuture