COMPLETED_APPLICATION_STORE_MAX_CAPACITY =
+ key("completed-application-store.max-capacity")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDeprecatedKeys("jobstore.max-capacity")
+ .withDescription(
+ "The max number of completed applications that can be kept in the store. "
+ + "NOTICE: if memory store keeps too many applications in session cluster, it may cause FullGC or OOM in jm.");
+
+ /**
+ * Config parameter determining the store implementation in session cluster.
+ *
+ * FLINK-38845 replaces job store with application store because every job is now associated
+ * with an application. The legacy job store expires jobs individually, risking partial loss of
+ * an application's job history and breaking application-level consistency. The application
+ * store manages and expires applications (with all its jobs) as a whole, ensuring complete,
+ * consistent, and queryable application state until explicitly discarded.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+ public static final ConfigOption
+ COMPLETED_APPLICATION_STORE_TYPE =
+ key("completed-application-store.type")
+ .enumType(ArchivedApplicationStoreType.class)
+ .defaultValue(ArchivedApplicationStoreType.File)
+ .withDeprecatedKeys("jobstore.type")
+ .withDescription(
+ Description.builder()
+ .text(
+ "Determines which store implementation is used in session cluster. Accepted values are:")
+ .list(
+ text(
+ "'File': the file store keeps the completed applications in files"),
+ text(
+ "'Memory': the memory store keeps the completed applications in memory. You"
+ + " may need to limit the %s to mitigate FullGC or OOM when there are too many applications",
+ code(
+ COMPLETED_APPLICATION_STORE_MAX_CAPACITY
+ .key())))
+ .build());
+
+ /** Type of archived application store implementation. */
+ public enum ArchivedApplicationStoreType {
+ File,
+ Memory
+ }
+
/**
* Flag indicating whether JobManager would retrieve canonical host name of TaskManager during
* registration.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
index 4acadee9a2d20..aef3f17b893d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java
@@ -31,10 +31,12 @@
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -63,6 +65,14 @@ public abstract class AbstractApplication implements Serializable {
private final Set jobs = new HashSet<>();
+ /**
+ * List of registered application status listeners that will be notified via {@link
+ * ApplicationStatusListener#notifyApplicationStatusChange} when the application state changes.
+ * For example, the Dispatcher registers itself as a listener to perform operations such as
+ * archiving when the application reaches a terminal state.
+ */
+ private final transient List statusListeners = new ArrayList<>();
+
public AbstractApplication(ApplicationID applicationId) {
this.applicationId = checkNotNull(applicationId);
this.statusTimestamps = new long[ApplicationState.values().length];
@@ -130,6 +140,15 @@ public ApplicationState getApplicationStatus() {
return applicationState;
}
+ /**
+ * Registers a status listener.
+ *
+ * This method is not thread-safe and should not be called concurrently.
+ */
+ public void registerStatusListener(ApplicationStatusListener listener) {
+ statusListeners.add(listener);
+ }
+
// ------------------------------------------------------------------------
// State Transitions
// ------------------------------------------------------------------------
@@ -196,6 +215,8 @@ void transitionState(ApplicationState targetState) {
targetState);
this.statusTimestamps[targetState.ordinal()] = System.currentTimeMillis();
this.applicationState = targetState;
+ statusListeners.forEach(
+ listener -> listener.notifyApplicationStatusChange(applicationId, targetState));
}
private void validateTransition(ApplicationState targetState) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationStatusListener.java
new file mode 100644
index 0000000000000..b4c9996af8085
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationStatusListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.application;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+
+/** Interface for listeners that monitor the status of an application. */
+public interface ApplicationStatusListener {
+
+ /**
+ * This method is called whenever the status of the application changes.
+ *
+ * @param applicationId The ID of the job.
+ * @param newStatus The status the application switched to.
+ */
+ void notifyApplicationStatusChange(ApplicationID applicationId, ApplicationState newStatus);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java
index f9c2fa8b85871..771366bc22aef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java
@@ -20,11 +20,12 @@
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ApplicationState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.Map;
/** Read-only information about an {@link AbstractApplication}. */
public class ArchivedApplication implements Serializable {
@@ -39,14 +40,14 @@ public class ArchivedApplication implements Serializable {
private final long[] statusTimestamps;
- private final Collection jobs;
+ private final Map jobs;
public ArchivedApplication(
ApplicationID applicationId,
String applicationName,
ApplicationState applicationState,
long[] statusTimestamps,
- Collection jobs) {
+ Map jobs) {
this.applicationId = applicationId;
this.applicationName = applicationName;
this.applicationState = applicationState;
@@ -70,7 +71,7 @@ public long getStatusTimestamp(ApplicationState status) {
return this.statusTimestamps[status.ordinal()];
}
- public Collection getJobs() {
+ public Map getJobs() {
return jobs;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStore.java
similarity index 52%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStore.java
index 1d6356773a485..a5b6315466303 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStore.java
@@ -18,64 +18,71 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
-import javax.annotation.Nullable;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.Optional;
-/** Interface for a {@link ExecutionGraphInfo} store. */
-public interface ExecutionGraphInfoStore extends Closeable {
+/** Interface for a {@link ArchivedApplication} store. */
+public interface ArchivedApplicationStore extends Closeable {
/**
- * Returns the current number of stored {@link ExecutionGraphInfo} instances.
+ * Returns the current number of stored {@link ArchivedApplication} instances.
*
- * @return Current number of stored {@link ExecutionGraphInfo} instances
+ * @return Current number of stored {@link ArchivedApplication} instances
*/
int size();
/**
- * Get the {@link ExecutionGraphInfo} for the given job id. Null if it isn't stored.
+ * Get the {@link ArchivedApplication} for the given application id. Empty if it isn't stored.
*
- * @param jobId identifying the serializable execution graph to retrieve
- * @return The stored serializable execution graph or null
+ * @param applicationId identifying the serializable application to retrieve
+ * @return The stored serializable application or empty
*/
- @Nullable
- ExecutionGraphInfo get(JobID jobId);
+ Optional get(ApplicationID applicationId);
/**
- * Store the given {@link ExecutionGraphInfo} in the store.
+ * Store the given {@link ArchivedApplication} in the store.
*
- * @param executionGraphInfo to store
- * @throws IOException if the serializable execution graph could not be stored in the store
+ * @param archivedApplication to store
+ * @throws IOException if the serializable archived application could not be stored in the store
*/
- void put(ExecutionGraphInfo executionGraphInfo) throws IOException;
+ void put(ArchivedApplication archivedApplication) throws IOException;
/**
- * Return the {@link JobsOverview} for all stored/past jobs.
+ * Return the collection of {@link ApplicationDetails} of all currently stored applications.
*
- * @return Jobs overview for all stored/past jobs
+ * @return Collection of application details of all currently stored applications
*/
- JobsOverview getStoredJobsOverview();
+ Collection getApplicationDetails();
+
+ /**
+ * Get the {@link ExecutionGraphInfo} for the given job id. Empty if it isn't stored.
+ *
+ * @param jobId identifying the serializable execution graph to retrieve
+ * @return The stored serializable execution graph or empty
+ */
+ Optional getExecutionGraphInfo(JobID jobId);
/**
* Return the collection of {@link JobDetails} of all currently stored jobs.
*
* @return Collection of job details of all currently stored jobs
*/
- Collection getAvailableJobDetails();
+ Collection getJobDetails();
/**
- * Return the {@link JobDetails}} for the given job.
+ * Return the {@link JobsOverview} for all stored/past jobs.
*
- * @param jobId identifying the job for which to retrieve the {@link JobDetails}
- * @return {@link JobDetails} of the requested job or null if the job is not available
+ * @return Jobs overview for all stored/past jobs
*/
- @Nullable
- JobDetails getAvailableJobDetails(JobID jobId);
+ JobsOverview getJobsOverview();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b31a743793ffe..05e74a1f20e79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -39,6 +39,7 @@
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.application.AbstractApplication;
+import org.apache.flink.runtime.application.ApplicationStatusListener;
import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.application.SingleJobApplication;
import org.apache.flink.runtime.blob.BlobServer;
@@ -143,6 +144,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -153,7 +155,7 @@
* case of a master failure. Furthermore, it knows about the state of the Flink session cluster.
*/
public abstract class Dispatcher extends FencedRpcEndpoint
- implements DispatcherGateway {
+ implements DispatcherGateway, ApplicationStatusListener {
@VisibleForTesting @Internal
public static final ConfigOption CLIENT_ALIVENESS_CHECK_DURATION =
@@ -187,7 +189,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint
private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
- private final ExecutionGraphInfoStore executionGraphInfoStore;
+ private final ArchivedApplicationStore archivedApplicationStore;
private final JobManagerRunnerFactory jobManagerRunnerFactory;
private final CleanupRunnerFactory cleanupRunnerFactory;
@@ -229,6 +231,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint
private final Map> recoveredApplicationJobIds = new HashMap<>();
+ /** ExecutionGraphInfo for the terminated job whose application is not terminated yet. */
+ private final Map> partialExecutionGraphInfoStore =
+ new HashMap<>();
+
/** Enum to distinguish between initial job submission and re-submission for recovery. */
protected enum ExecutionType {
SUBMISSION,
@@ -311,7 +317,7 @@ protected Dispatcher(
this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
- this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();
+ this.archivedApplicationStore = dispatcherServices.getArchivedApplicationStore();
this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
this.cleanupRunnerFactory = dispatcherServices.getCleanupRunnerFactory();
@@ -659,6 +665,7 @@ private CompletableFuture internalSubmitApplication(
if (jobs != null) {
jobs.forEach(application::addJob);
}
+ application.registerStatusListener(this);
return application.execute(
getSelfGateway(DispatcherGateway.class),
getRpcService().getScheduledExecutor(),
@@ -666,6 +673,72 @@ private CompletableFuture internalSubmitApplication(
this::onFatalError);
}
+ @Override
+ public void notifyApplicationStatusChange(
+ ApplicationID applicationId, ApplicationState newStatus) {
+ if (newStatus.isTerminalState()) {
+ checkState(
+ applications.containsKey(applicationId),
+ "Application %s does not exist.",
+ applicationId);
+
+ AbstractApplication application = applications.get(applicationId);
+ long[] stateTimestamps = new long[ApplicationState.values().length];
+ for (ApplicationState applicationState : ApplicationState.values()) {
+ final int ordinal = applicationState.ordinal();
+ stateTimestamps[ordinal] = application.getStatusTimestamp(applicationState);
+ }
+
+ List> jobFutures =
+ application.getJobs().stream()
+ .map(
+ jobId -> {
+ checkState(
+ partialExecutionGraphInfoStore.containsKey(jobId),
+ "ExecutionGraphInfo for job %s does not exist.",
+ jobId);
+ return partialExecutionGraphInfoStore.get(jobId);
+ })
+ .collect(Collectors.toList());
+
+ // wait for all jobs to be stored
+ FutureUtils.combineAll(jobFutures)
+ .thenAcceptAsync(
+ combinedJobs -> {
+ Map jobs = new HashMap<>();
+ for (ExecutionGraphInfo executionGraphInfo : combinedJobs) {
+ jobs.put(executionGraphInfo.getJobId(), executionGraphInfo);
+ partialExecutionGraphInfoStore.remove(
+ executionGraphInfo.getJobId());
+ }
+
+ ArchivedApplication archivedApplication =
+ new ArchivedApplication(
+ application.getApplicationId(),
+ application.getName(),
+ application.getApplicationStatus(),
+ stateTimestamps,
+ jobs);
+
+ applications.remove(applicationId);
+ writeToArchivedApplicationStore(archivedApplication);
+ },
+ getMainThreadExecutor());
+ }
+ }
+
+ private void writeToArchivedApplicationStore(ArchivedApplication archivedApplication) {
+ try {
+ archivedApplicationStore.put(archivedApplication);
+ } catch (IOException e) {
+ log.info(
+ "Could not store completed application {}({}).",
+ archivedApplication.getApplicationName(),
+ archivedApplication.getApplicationId(),
+ e);
+ }
+ }
+
@VisibleForTesting
Map getApplications() {
return applications;
@@ -811,6 +884,8 @@ private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionTy
final JobID jobId = jobManagerRunner.getJobID();
+ partialExecutionGraphInfoStore.put(jobId, new CompletableFuture<>());
+
final CompletableFuture cleanupJobStateFuture =
jobManagerRunner
.getResultFuture()
@@ -947,8 +1022,10 @@ public CompletableFuture cancelJob(JobID jobId, Duration timeout) {
return maybeJob.get().cancel(timeout);
}
- final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
- if (executionGraphInfo != null) {
+ final Optional optionalExecutionGraphInfo =
+ getExecutionGraphInfoFromStore(jobId);
+ if (optionalExecutionGraphInfo.isPresent()) {
+ final ExecutionGraphInfo executionGraphInfo = optionalExecutionGraphInfo.get();
final JobStatus jobStatus = executionGraphInfo.getArchivedExecutionGraph().getState();
if (jobStatus == JobStatus.CANCELED) {
return CompletableFuture.completedFuture(Acknowledge.get());
@@ -1001,7 +1078,7 @@ public CompletableFuture requestClusterOverview(Duration timeou
CompletableFuture> allJobsFuture =
allOptionalJobsFuture.thenApply(this::flattenOptionalCollection);
- final JobsOverview completedJobsOverview = executionGraphInfoStore.getStoredJobsOverview();
+ final JobsOverview completedJobsOverview = getCompletedJobsOverview();
return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
@@ -1012,6 +1089,16 @@ public CompletableFuture requestClusterOverview(Duration timeou
});
}
+ private JobsOverview getCompletedJobsOverview() {
+ Collection allJobStatus =
+ getPartialExecutionGraphInfo()
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
+ .map(ArchivedExecutionGraph::getState)
+ .collect(Collectors.toList());
+ return JobsOverview.create(allJobStatus)
+ .combine(archivedApplicationStore.getJobsOverview());
+ }
+
@Override
public CompletableFuture requestMultipleJobDetails(Duration timeout) {
List>> individualOptionalJobDetails =
@@ -1024,8 +1111,7 @@ public CompletableFuture requestMultipleJobDetails(Duration
CompletableFuture> combinedJobDetails =
optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection);
- final Collection completedJobDetails =
- executionGraphInfoStore.getAvailableJobDetails();
+ final Collection completedJobDetails = getCompletedJobDetails();
return combinedJobDetails.thenApply(
(Collection runningJobDetails) -> {
@@ -1049,6 +1135,34 @@ public CompletableFuture requestMultipleJobDetails(Duration
});
}
+ private Collection getCompletedJobDetails() {
+ return Stream.concat(
+ getPartialExecutionGraphInfo()
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
+ .map(JobDetails::createDetailsForJob),
+ archivedApplicationStore.getJobDetails().stream())
+ .collect(Collectors.toList());
+ }
+
+ private Stream getPartialExecutionGraphInfo() {
+ return partialExecutionGraphInfoStore.values().stream()
+ .filter(CompletableFuture::isDone)
+ .map(
+ executionGraphInfoFuture -> {
+ try {
+ ExecutionGraphInfo executionGraphInfo =
+ executionGraphInfoFuture.getNow(null);
+ if (executionGraphInfo != null) {
+ return executionGraphInfo;
+ }
+ } catch (Exception e) {
+ log.error("ExecutionGraphInfo future completed with exception", e);
+ }
+ return null;
+ })
+ .filter(Objects::nonNull);
+ }
+
@Override
public CompletableFuture requestMultipleApplicationDetails(
Duration timeout) {
@@ -1061,23 +1175,38 @@ public CompletableFuture requestMultipleApplication
ApplicationDetails
::fromArchivedApplication))
.collect(Collectors.toList());
+
+ final Collection completedApplicationDetails =
+ archivedApplicationStore.getApplicationDetails();
+
return FutureUtils.combineAll(applicationDetailsFutures)
.thenCompose(
- combinedApplicationDetails ->
- CompletableFuture.completedFuture(
- new MultipleApplicationsDetails(
- combinedApplicationDetails)));
+ runningApplicationDetails -> {
+ Collection combinedApplicationDetails =
+ Stream.concat(
+ runningApplicationDetails.stream(),
+ completedApplicationDetails.stream())
+ .collect(Collectors.toList());
+ return CompletableFuture.completedFuture(
+ new MultipleApplicationsDetails(combinedApplicationDetails));
+ });
}
@Override
public CompletableFuture requestApplication(
ApplicationID applicationId, Duration timeout) {
- if (!applications.containsKey(applicationId)) {
- return FutureUtils.completedExceptionally(
- new FlinkApplicationNotFoundException(applicationId));
+ if (applications.containsKey(applicationId)) {
+ return requestApplication(applications.get(applicationId), timeout);
}
- return requestApplication(applications.get(applicationId), timeout);
+ // is it a completed application?
+ return archivedApplicationStore
+ .get(applicationId)
+ .map(CompletableFuture::completedFuture)
+ .orElseGet(
+ () ->
+ FutureUtils.completedExceptionally(
+ new FlinkApplicationNotFoundException(applicationId)));
}
private CompletableFuture requestApplication(
@@ -1088,28 +1217,9 @@ private CompletableFuture requestApplication(
stateTimestamps[ordinal] = application.getStatusTimestamp(applicationState);
}
- List> jobFutures =
+ List> jobFutures =
application.getJobs().stream()
- .map(
- jobId ->
- requestExecutionGraphInfo(jobId, timeout)
- .thenApply(
- ExecutionGraphInfo
- ::getArchivedExecutionGraph)
- .exceptionally(
- t -> {
- if (t
- instanceof
- FlinkJobNotFoundException) {
- log.warn(
- "Ignore job {} which may be expired when requesting application {}",
- jobId,
- application
- .getApplicationId());
- return null;
- }
- throw new CompletionException(t);
- }))
+ .map(jobId -> requestExecutionGraphInfo(jobId, timeout))
.collect(Collectors.toList());
return FutureUtils.combineAll(jobFutures)
@@ -1122,25 +1232,12 @@ private CompletableFuture requestApplication(
application.getApplicationStatus(),
stateTimestamps,
combinedJobs.stream()
- .filter(Objects::nonNull)
- .collect(Collectors.toSet()))));
- }
-
- private CompletableFuture requestJobDetails(JobID jobId, Duration timeout) {
- Optional maybeJob = getJobManagerRunner(jobId);
- return maybeJob.map(job -> job.requestJobDetails(timeout))
- .orElseGet(
- () -> {
- // is it a completed job?
- final JobDetails jobDetails =
- executionGraphInfoStore.getAvailableJobDetails(jobId);
- if (jobDetails == null) {
- return FutureUtils.completedExceptionally(
- new FlinkJobNotFoundException(jobId));
- } else {
- return CompletableFuture.completedFuture(jobDetails);
- }
- });
+ .collect(
+ Collectors.toMap(
+ ExecutionGraphInfo
+ ::getJobId,
+ executionGraphInfo ->
+ executionGraphInfo)))));
}
@Override
@@ -1150,14 +1247,19 @@ public CompletableFuture requestJobStatus(JobID jobId, Duration timeo
.orElseGet(
() -> {
// is it a completed job?
- final JobDetails jobDetails =
- executionGraphInfoStore.getAvailableJobDetails(jobId);
- if (jobDetails == null) {
- return FutureUtils.completedExceptionally(
- new FlinkJobNotFoundException(jobId));
- } else {
- return CompletableFuture.completedFuture(jobDetails.getStatus());
- }
+ final Optional optionalJobDetails =
+ getExecutionGraphInfoFromStore(jobId)
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
+ .map(JobDetails::createDetailsForJob);
+ return optionalJobDetails
+ .map(
+ jobDetails ->
+ CompletableFuture.completedFuture(
+ jobDetails.getStatus()))
+ .orElseGet(
+ () ->
+ FutureUtils.completedExceptionally(
+ new FlinkJobNotFoundException(jobId)));
});
}
@@ -1172,12 +1274,31 @@ public CompletableFuture requestExecutionGraphInfo(
private ExecutionGraphInfo getExecutionGraphInfoFromStore(Throwable t, JobID jobId) {
// check whether it is a completed job
- final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
- if (executionGraphInfo == null) {
- throw new CompletionException(ExceptionUtils.stripCompletionException(t));
- } else {
- return executionGraphInfo;
+ Optional optionalExecutionGraphInfo =
+ getExecutionGraphInfoFromStore(jobId);
+ if (optionalExecutionGraphInfo.isPresent()) {
+ return optionalExecutionGraphInfo.get();
}
+
+ throw new CompletionException(ExceptionUtils.stripCompletionException(t));
+ }
+
+ private Optional getExecutionGraphInfoFromStore(JobID jobId) {
+ final CompletableFuture executionGraphInfoFuture =
+ partialExecutionGraphInfoStore.get(jobId);
+ if (executionGraphInfoFuture != null && executionGraphInfoFuture.isDone()) {
+ try {
+ ExecutionGraphInfo executionGraphInfo = executionGraphInfoFuture.getNow(null);
+ if (executionGraphInfo != null) {
+ return Optional.of(executionGraphInfo);
+ }
+ } catch (Exception e) {
+ log.error("Error while retrieving ExecutionGraphInfo for job {}", jobId, e);
+ }
+ }
+
+ // check whether the job belongs to a completed application
+ return archivedApplicationStore.getExecutionGraphInfo(jobId);
}
@Override
@@ -1195,14 +1316,20 @@ public CompletableFuture requestCheckpointStats(
@Override
public CompletableFuture requestJobResult(JobID jobId, Duration timeout) {
if (!jobManagerRunnerRegistry.isRegistered(jobId)) {
- final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId);
-
- if (executionGraphInfo == null) {
- return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
- } else {
- return CompletableFuture.completedFuture(
- JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
- }
+ final Optional optionalExecutionGraphInfo =
+ getExecutionGraphInfoFromStore(jobId);
+
+ return optionalExecutionGraphInfo
+ .map(
+ executionGraphInfo ->
+ CompletableFuture.completedFuture(
+ JobResult.createFrom(
+ executionGraphInfo
+ .getArchivedExecutionGraph())))
+ .orElseGet(
+ () ->
+ FutureUtils.completedExceptionally(
+ new FlinkJobNotFoundException(jobId)));
}
final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId);
@@ -1732,16 +1859,28 @@ private CompletableFuture createDirtyJobResultEntryAsync(
new JobResultEntry(JobResult.createFrom(executionGraph)));
}
+ /**
+ * Writes the ExecutionGraphInfo to the temporary store, which is for jobs whose associated
+ * application has not yet reached a terminal state. The ExecutionGraphInfo will be transferred
+ * to the ArchivedApplicationStore together with the ArchivedApplication when the application
+ * reaches a terminal state.
+ *
+ * @param executionGraphInfo the execution graph information to be stored temporarily
+ */
private void writeToExecutionGraphInfoStore(ExecutionGraphInfo executionGraphInfo) {
- try {
- executionGraphInfoStore.put(executionGraphInfo);
- } catch (IOException e) {
- log.info(
- "Could not store completed job {}({}).",
- executionGraphInfo.getArchivedExecutionGraph().getJobName(),
- executionGraphInfo.getArchivedExecutionGraph().getJobID(),
- e);
+ final JobID jobId = executionGraphInfo.getJobId();
+ if (!partialExecutionGraphInfoStore.containsKey(jobId)) {
+ // This can only occur in tests for job termination without submitting the job
+ final CompletableFuture executionGraphInfoFuture =
+ new CompletableFuture<>();
+ executionGraphInfoFuture.complete(executionGraphInfo);
+ partialExecutionGraphInfoStore.put(jobId, executionGraphInfoFuture);
+ return;
}
+
+ partialExecutionGraphInfoStore
+ .get(executionGraphInfo.getJobId())
+ .complete(executionGraphInfo);
}
private CompletableFuture archiveExecutionGraphToHistoryServer(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
index 600573b52dbe6..a4faf1300fe16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
@@ -52,7 +52,7 @@ public class DispatcherServices {
private final JobManagerMetricGroup jobManagerMetricGroup;
- private final ExecutionGraphInfoStore executionGraphInfoStore;
+ private final ArchivedApplicationStore archivedApplicationStroe;
private final FatalErrorHandler fatalErrorHandler;
@@ -80,7 +80,7 @@ public class DispatcherServices {
GatewayRetriever resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
- ExecutionGraphInfoStore executionGraphInfoStore,
+ ArchivedApplicationStore archivedApplicationStroe,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@@ -100,8 +100,8 @@ public class DispatcherServices {
resourceManagerGatewayRetriever, "ResourceManagerGatewayRetriever");
this.blobServer = Preconditions.checkNotNull(blobServer, "BlobServer");
this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices, "HeartBeatServices");
- this.executionGraphInfoStore =
- Preconditions.checkNotNull(executionGraphInfoStore, "ExecutionGraphInfoStore");
+ this.archivedApplicationStroe =
+ Preconditions.checkNotNull(archivedApplicationStroe, "ArchivedApplicationStore");
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler, "FatalErrorHandler");
this.historyServerArchivist =
Preconditions.checkNotNull(historyServerArchivist, "HistoryServerArchivist");
@@ -143,8 +143,8 @@ public JobManagerMetricGroup getJobManagerMetricGroup() {
return jobManagerMetricGroup;
}
- public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
- return executionGraphInfoStore;
+ public ArchivedApplicationStore getArchivedApplicationStore() {
+ return archivedApplicationStroe;
}
public FatalErrorHandler getFatalErrorHandler() {
@@ -200,8 +200,7 @@ public static DispatcherServices from(
.getResourceManagerGatewayRetriever(),
partialDispatcherServicesWithJobPersistenceComponents.getBlobServer(),
partialDispatcherServicesWithJobPersistenceComponents.getHeartbeatServices(),
- partialDispatcherServicesWithJobPersistenceComponents
- .getArchivedExecutionGraphStore(),
+ partialDispatcherServicesWithJobPersistenceComponents.getArchivedApplicationStore(),
partialDispatcherServicesWithJobPersistenceComponents.getFatalErrorHandler(),
partialDispatcherServicesWithJobPersistenceComponents.getHistoryServerArchivist(),
partialDispatcherServicesWithJobPersistenceComponents
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
new file mode 100644
index 0000000000000..002c816b88057
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava33.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava33.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache;
+import org.apache.flink.shaded.guava33.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Store for {@link ArchivedApplication} instances. The store writes the archived application to
+ * disk and keeps the most recently used applications in a memory cache for faster serving.
+ * Moreover, the stored applications are periodically cleaned up.
+ */
+public class FileArchivedApplicationStore implements ArchivedApplicationStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileArchivedApplicationStore.class);
+
+ private final File storageDir;
+
+ private final Cache applicationDetailsCache;
+
+ private final Cache jobDetailsCache;
+
+ private final LoadingCache archivedApplicationCache;
+
+ private final Map jobIdToApplicationId = new HashMap<>();
+
+ private final Map> applicationIdToJobIds = new HashMap<>();
+
+ private final ScheduledFuture> cleanupFuture;
+
+ private int numFinishedJobs;
+
+ private int numFailedJobs;
+
+ private int numCanceledJobs;
+
+ public FileArchivedApplicationStore(
+ File rootDir,
+ Duration expirationTime,
+ int maximumCapacity,
+ long maximumCacheSizeBytes,
+ ScheduledExecutor scheduledExecutor,
+ Ticker ticker)
+ throws IOException {
+
+ final File storageDirectory = initArchivedApplicationStorageDirectory(rootDir);
+
+ LOG.info(
+ "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
+ FileArchivedApplicationStore.class.getSimpleName(),
+ storageDirectory,
+ expirationTime.toMillis(),
+ maximumCacheSizeBytes);
+
+ this.storageDir = Preconditions.checkNotNull(storageDirectory);
+ Preconditions.checkArgument(
+ storageDirectory.exists() && storageDirectory.isDirectory(),
+ "The storage directory must exist and be a directory.");
+ this.applicationDetailsCache =
+ CacheBuilder.newBuilder()
+ .expireAfterWrite(expirationTime.toMillis(), TimeUnit.MILLISECONDS)
+ .maximumSize(maximumCapacity)
+ .removalListener(
+ (RemovalListener)
+ notification ->
+ deleteArchivedApplication(notification.getKey()))
+ .ticker(ticker)
+ .build();
+ this.jobDetailsCache = CacheBuilder.newBuilder().build();
+
+ this.archivedApplicationCache =
+ CacheBuilder.newBuilder()
+ .maximumWeight(maximumCacheSizeBytes)
+ .weigher(this::calculateSize)
+ .build(
+ new CacheLoader() {
+ @Override
+ public ArchivedApplication load(ApplicationID applicationId)
+ throws Exception {
+ return loadArchivedApplication(applicationId);
+ }
+ });
+
+ this.cleanupFuture =
+ scheduledExecutor.scheduleWithFixedDelay(
+ () -> {
+ applicationDetailsCache.cleanUp();
+ jobDetailsCache.cleanUp();
+ },
+ expirationTime.toMillis(),
+ expirationTime.toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ this.numFinishedJobs = 0;
+ this.numFailedJobs = 0;
+ this.numCanceledJobs = 0;
+ }
+
+ @Override
+ public int size() {
+ return Math.toIntExact(applicationDetailsCache.size());
+ }
+
+ @Override
+ public Optional get(ApplicationID applicationId) {
+ try {
+ return Optional.of(archivedApplicationCache.get(applicationId));
+ } catch (ExecutionException e) {
+ LOG.debug(
+ "Could not load archived application for application id {}.", applicationId, e);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void put(ArchivedApplication archivedApplication) throws IOException {
+ final ApplicationID applicationId = archivedApplication.getApplicationId();
+
+ final ApplicationState status = archivedApplication.getApplicationStatus();
+ final String name = archivedApplication.getApplicationName();
+
+ Preconditions.checkArgument(
+ status.isTerminalState(),
+ "The application "
+ + name
+ + '('
+ + applicationId
+ + ") is not in a terminal state. Instead it is in state "
+ + status
+ + '.');
+
+ for (ExecutionGraphInfo executionGraphInfo : archivedApplication.getJobs().values()) {
+ final JobID jobId = executionGraphInfo.getArchivedExecutionGraph().getJobID();
+
+ final JobStatus jobStatus = executionGraphInfo.getArchivedExecutionGraph().getState();
+ switch (jobStatus) {
+ case FINISHED:
+ numFinishedJobs++;
+ break;
+ case CANCELED:
+ numCanceledJobs++;
+ break;
+ case FAILED:
+ numFailedJobs++;
+ break;
+ case SUSPENDED:
+ break;
+ default:
+ throw new IllegalStateException(
+ "The job "
+ + executionGraphInfo.getArchivedExecutionGraph().getJobName()
+ + '('
+ + jobId
+ + ") should have been in a known terminal state. "
+ + "Instead it was in state "
+ + jobStatus
+ + '.');
+ }
+
+ jobIdToApplicationId.put(jobId, applicationId);
+ applicationIdToJobIds
+ .computeIfAbsent(applicationId, ignored -> new ArrayList<>())
+ .add(jobId);
+ jobDetailsCache.put(
+ jobId,
+ JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph()));
+ }
+
+ storeArchivedApplication(archivedApplication);
+
+ final ApplicationDetails detailsForApplication =
+ ApplicationDetails.fromArchivedApplication(archivedApplication);
+
+ applicationDetailsCache.put(applicationId, detailsForApplication);
+ archivedApplicationCache.put(applicationId, archivedApplication);
+ }
+
+ @Override
+ public Collection getApplicationDetails() {
+ return applicationDetailsCache.asMap().values();
+ }
+
+ @Override
+ public Optional getExecutionGraphInfo(JobID jobId) {
+ try {
+ final ApplicationID applicationId = jobIdToApplicationId.get(jobId);
+ if (applicationId == null) {
+ return Optional.empty();
+ }
+ return Optional.of(archivedApplicationCache.get(applicationId))
+ .map(archivedApplication -> archivedApplication.getJobs().get(jobId));
+ } catch (ExecutionException e) {
+ LOG.debug(
+ "Could not load archived execution graph information for job id {}.", jobId, e);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Collection getJobDetails() {
+ return jobDetailsCache.asMap().values();
+ }
+
+ @Override
+ public JobsOverview getJobsOverview() {
+ return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
+ }
+
+ @Override
+ public void close() throws IOException {
+ cleanupFuture.cancel(false);
+
+ applicationDetailsCache.invalidateAll();
+
+ // clean up the storage directory
+ FileUtils.deleteFileOrDirectory(storageDir);
+ }
+
+ // --------------------------------------------------------------
+ // Internal methods
+ // --------------------------------------------------------------
+
+ private int calculateSize(
+ ApplicationID applicationId, ArchivedApplication archivedApplication) {
+ final File archivedApplicationFile = getArchivedApplicationFile(applicationId);
+
+ if (archivedApplicationFile.exists()) {
+ return Math.toIntExact(archivedApplicationFile.length());
+ } else {
+ LOG.debug(
+ "Could not find archived application file for {}. Estimating the size instead.",
+ applicationId);
+ int estimatedSize = 0;
+ for (ExecutionGraphInfo executionGraphInfo : archivedApplication.getJobs().values()) {
+ final ArchivedExecutionGraph archivedExecutionGraph =
+ executionGraphInfo.getArchivedExecutionGraph();
+ estimatedSize +=
+ archivedExecutionGraph.getAllVertices().size() * 1000
+ + archivedExecutionGraph.getAccumulatorsSerialized().size() * 1000;
+ }
+ return estimatedSize;
+ }
+ }
+
+ private ArchivedApplication loadArchivedApplication(ApplicationID applicationId)
+ throws IOException, ClassNotFoundException {
+ final File archivedApplicationFile = getArchivedApplicationFile(applicationId);
+
+ if (archivedApplicationFile.exists()) {
+ try (FileInputStream fileInputStream = new FileInputStream(archivedApplicationFile)) {
+ return InstantiationUtil.deserializeObject(
+ fileInputStream, getClass().getClassLoader());
+ }
+ } else {
+ throw new FileNotFoundException(
+ "Could not find file for archived application "
+ + applicationId
+ + ". This indicates that the file either has been deleted or never written.");
+ }
+ }
+
+ private void storeArchivedApplication(ArchivedApplication archivedApplication)
+ throws IOException {
+ final File archivedApplicationFile =
+ getArchivedApplicationFile(archivedApplication.getApplicationId());
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(archivedApplicationFile)) {
+ InstantiationUtil.serializeObject(fileOutputStream, archivedApplication);
+ }
+ }
+
+ private File getArchivedApplicationFile(ApplicationID applicationId) {
+ return new File(storageDir, applicationId.toString());
+ }
+
+ private void deleteArchivedApplication(ApplicationID applicationId) {
+ Preconditions.checkNotNull(applicationId);
+
+ final File archivedApplicationFile = getArchivedApplicationFile(applicationId);
+
+ try {
+ FileUtils.deleteFileOrDirectory(archivedApplicationFile);
+ } catch (IOException e) {
+ LOG.debug("Could not delete file {}.", archivedApplicationFile, e);
+ }
+
+ archivedApplicationCache.invalidate(applicationId);
+ applicationDetailsCache.invalidate(applicationId);
+ List jobIds = applicationIdToJobIds.remove(applicationId);
+ if (jobIds != null) {
+ jobIds.forEach(
+ jobId -> {
+ jobIdToApplicationId.remove(jobId);
+ jobDetailsCache.invalidate(jobId);
+ });
+ }
+ }
+
+ private static File initArchivedApplicationStorageDirectory(File tmpDir) throws IOException {
+ final int maxAttempts = 10;
+
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ final File storageDirectory =
+ new File(tmpDir, "archivedApplicationStore-" + UUID.randomUUID());
+
+ if (storageDirectory.mkdir()) {
+ return storageDirectory;
+ }
+ }
+
+ throw new IOException(
+ "Could not create archivedApplicationStorage directory in " + tmpDir + '.');
+ }
+
+ // --------------------------------------------------------------
+ // Testing methods
+ // --------------------------------------------------------------
+
+ @VisibleForTesting
+ File getStorageDir() {
+ return storageDir;
+ }
+
+ @VisibleForTesting
+ LoadingCache getArchivedApplicationCache() {
+ return archivedApplicationCache;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
deleted file mode 100644
index 518a151ed2553..0000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-
-import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
-import org.apache.flink.shaded.guava33.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava33.com.google.common.cache.CacheLoader;
-import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache;
-import org.apache.flink.shaded.guava33.com.google.common.cache.RemovalListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Store for {@link ExecutionGraphInfo} instances. The store writes the archived execution graph
- * information to disk and keeps the most recently used execution graphs in a memory cache for
- * faster serving. Moreover, the stored execution graphs are periodically cleaned up.
- */
-public class FileExecutionGraphInfoStore implements ExecutionGraphInfoStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(FileExecutionGraphInfoStore.class);
-
- private final File storageDir;
-
- private final Cache jobDetailsCache;
-
- private final LoadingCache executionGraphInfoCache;
-
- private final ScheduledFuture> cleanupFuture;
-
- private int numFinishedJobs;
-
- private int numFailedJobs;
-
- private int numCanceledJobs;
-
- public FileExecutionGraphInfoStore(
- File rootDir,
- Duration expirationTime,
- int maximumCapacity,
- long maximumCacheSizeBytes,
- ScheduledExecutor scheduledExecutor,
- Ticker ticker)
- throws IOException {
-
- final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);
-
- LOG.info(
- "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
- FileExecutionGraphInfoStore.class.getSimpleName(),
- storageDirectory,
- expirationTime.toMillis(),
- maximumCacheSizeBytes);
-
- this.storageDir = Preconditions.checkNotNull(storageDirectory);
- Preconditions.checkArgument(
- storageDirectory.exists() && storageDirectory.isDirectory(),
- "The storage directory must exist and be a directory.");
- this.jobDetailsCache =
- CacheBuilder.newBuilder()
- .expireAfterWrite(expirationTime.toMillis(), TimeUnit.MILLISECONDS)
- .maximumSize(maximumCapacity)
- .removalListener(
- (RemovalListener)
- notification ->
- deleteExecutionGraphFile(notification.getKey()))
- .ticker(ticker)
- .build();
-
- this.executionGraphInfoCache =
- CacheBuilder.newBuilder()
- .maximumWeight(maximumCacheSizeBytes)
- .weigher(this::calculateSize)
- .build(
- new CacheLoader() {
- @Override
- public ExecutionGraphInfo load(JobID jobId) throws Exception {
- return loadExecutionGraph(jobId);
- }
- });
-
- this.cleanupFuture =
- scheduledExecutor.scheduleWithFixedDelay(
- jobDetailsCache::cleanUp,
- expirationTime.toMillis(),
- expirationTime.toMillis(),
- TimeUnit.MILLISECONDS);
-
- this.numFinishedJobs = 0;
- this.numFailedJobs = 0;
- this.numCanceledJobs = 0;
- }
-
- @Override
- public int size() {
- return Math.toIntExact(jobDetailsCache.size());
- }
-
- @Override
- @Nullable
- public ExecutionGraphInfo get(JobID jobId) {
- try {
- return executionGraphInfoCache.get(jobId);
- } catch (ExecutionException e) {
- LOG.debug(
- "Could not load archived execution graph information for job id {}.", jobId, e);
- return null;
- }
- }
-
- @Override
- public void put(ExecutionGraphInfo executionGraphInfo) throws IOException {
- final JobID jobId = executionGraphInfo.getJobId();
-
- final ArchivedExecutionGraph archivedExecutionGraph =
- executionGraphInfo.getArchivedExecutionGraph();
- final JobStatus jobStatus = archivedExecutionGraph.getState();
- final String jobName = archivedExecutionGraph.getJobName();
-
- Preconditions.checkArgument(
- jobStatus.isTerminalState(),
- "The job "
- + jobName
- + '('
- + jobId
- + ") is not in a terminal state. Instead it is in state "
- + jobStatus
- + '.');
-
- switch (jobStatus) {
- case FINISHED:
- numFinishedJobs++;
- break;
- case CANCELED:
- numCanceledJobs++;
- break;
- case FAILED:
- numFailedJobs++;
- break;
- case SUSPENDED:
- break;
- default:
- throw new IllegalStateException(
- "The job "
- + jobName
- + '('
- + jobId
- + ") should have been in a known terminal state. "
- + "Instead it was in state "
- + jobStatus
- + '.');
- }
-
- // write the ArchivedExecutionGraph to disk
- storeExecutionGraphInfo(executionGraphInfo);
-
- final JobDetails detailsForJob = JobDetails.createDetailsForJob(archivedExecutionGraph);
-
- jobDetailsCache.put(jobId, detailsForJob);
- executionGraphInfoCache.put(jobId, executionGraphInfo);
- }
-
- @Override
- public JobsOverview getStoredJobsOverview() {
- return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
- }
-
- @Override
- public Collection getAvailableJobDetails() {
- return jobDetailsCache.asMap().values();
- }
-
- @Nullable
- @Override
- public JobDetails getAvailableJobDetails(JobID jobId) {
- return jobDetailsCache.getIfPresent(jobId);
- }
-
- @Override
- public void close() throws IOException {
- cleanupFuture.cancel(false);
-
- jobDetailsCache.invalidateAll();
-
- // clean up the storage directory
- FileUtils.deleteFileOrDirectory(storageDir);
- }
-
- // --------------------------------------------------------------
- // Internal methods
- // --------------------------------------------------------------
-
- private int calculateSize(JobID jobId, ExecutionGraphInfo serializableExecutionGraphInfo) {
- final File executionGraphInfoFile = getExecutionGraphFile(jobId);
-
- if (executionGraphInfoFile.exists()) {
- return Math.toIntExact(executionGraphInfoFile.length());
- } else {
- LOG.debug(
- "Could not find execution graph information file for {}. Estimating the size instead.",
- jobId);
- final ArchivedExecutionGraph serializableExecutionGraph =
- serializableExecutionGraphInfo.getArchivedExecutionGraph();
- return serializableExecutionGraph.getAllVertices().size() * 1000
- + serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
- }
- }
-
- private ExecutionGraphInfo loadExecutionGraph(JobID jobId)
- throws IOException, ClassNotFoundException {
- final File executionGraphInfoFile = getExecutionGraphFile(jobId);
-
- if (executionGraphInfoFile.exists()) {
- try (FileInputStream fileInputStream = new FileInputStream(executionGraphInfoFile)) {
- return InstantiationUtil.deserializeObject(
- fileInputStream, getClass().getClassLoader());
- }
- } else {
- throw new FileNotFoundException(
- "Could not find file for archived execution graph "
- + jobId
- + ". This indicates that the file either has been deleted or never written.");
- }
- }
-
- private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) throws IOException {
- final File archivedExecutionGraphFile =
- getExecutionGraphFile(executionGraphInfo.getJobId());
-
- try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
- InstantiationUtil.serializeObject(fileOutputStream, executionGraphInfo);
- }
- }
-
- private File getExecutionGraphFile(JobID jobId) {
- return new File(storageDir, jobId.toString());
- }
-
- private void deleteExecutionGraphFile(JobID jobId) {
- Preconditions.checkNotNull(jobId);
-
- final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
-
- try {
- FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
- } catch (IOException e) {
- LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
- }
-
- executionGraphInfoCache.invalidate(jobId);
- jobDetailsCache.invalidate(jobId);
- }
-
- private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
- final int maxAttempts = 10;
-
- for (int attempt = 0; attempt < maxAttempts; attempt++) {
- final File storageDirectory =
- new File(tmpDir, "executionGraphStore-" + UUID.randomUUID());
-
- if (storageDirectory.mkdir()) {
- return storageDirectory;
- }
- }
-
- throw new IOException(
- "Could not create executionGraphStorage directory in " + tmpDir + '.');
- }
-
- // --------------------------------------------------------------
- // Testing methods
- // --------------------------------------------------------------
-
- @VisibleForTesting
- File getStorageDir() {
- return storageDir;
- }
-
- @VisibleForTesting
- LoadingCache getExecutionGraphInfoCache() {
- return executionGraphInfoCache;
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
similarity index 55%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
index e008e0401342d..3dec94c60d6ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java
@@ -18,9 +18,12 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
@@ -30,35 +33,35 @@
import org.apache.flink.shaded.guava33.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
- * {@link ExecutionGraphInfoStore} implementation which stores the {@link ArchivedExecutionGraph} in
- * memory. The memory store support to keep maximum job graphs and remove the timeout ones.
+ * {@link ArchivedApplicationStore} implementation which stores the {@link ArchivedApplication} in
+ * memory. The memory store support to keep maximum applications and remove the timeout ones.
*/
-public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
+public class MemoryArchivedApplicationStore implements ArchivedApplicationStore {
- private static final Logger LOG = LoggerFactory.getLogger(MemoryExecutionGraphInfoStore.class);
+ private final Cache archivedApplicationCache;
- private final Cache serializableExecutionGraphInfos;
+ private final Map jobIdToApplicationId = new HashMap<>();
@Nullable private final ScheduledFuture> cleanupFuture;
- public MemoryExecutionGraphInfoStore() {
+ public MemoryArchivedApplicationStore() {
this(Duration.ofMillis(0), 0, null, null);
}
- public MemoryExecutionGraphInfoStore(
+ public MemoryArchivedApplicationStore(
Duration expirationTime,
int maximumCapacity,
@Nullable ScheduledExecutor scheduledExecutor,
@@ -75,11 +78,11 @@ public MemoryExecutionGraphInfoStore(
cacheBuilder.ticker(ticker);
}
- this.serializableExecutionGraphInfos = cacheBuilder.build();
+ this.archivedApplicationCache = cacheBuilder.build();
if (scheduledExecutor != null) {
this.cleanupFuture =
scheduledExecutor.scheduleWithFixedDelay(
- serializableExecutionGraphInfos::cleanUp,
+ archivedApplicationCache::cleanUp,
expirationTime.toMillis(),
expirationTime.toMillis(),
TimeUnit.MILLISECONDS);
@@ -90,52 +93,65 @@ public MemoryExecutionGraphInfoStore(
@Override
public int size() {
- return Math.toIntExact(serializableExecutionGraphInfos.size());
+ return Math.toIntExact(archivedApplicationCache.size());
}
- @Nullable
@Override
- public ExecutionGraphInfo get(JobID jobId) {
- return serializableExecutionGraphInfos.getIfPresent(jobId);
+ public Optional get(ApplicationID applicationId) {
+ return Optional.ofNullable(archivedApplicationCache.getIfPresent(applicationId));
}
@Override
- public void put(ExecutionGraphInfo serializableExecutionGraphInfo) throws IOException {
- serializableExecutionGraphInfos.put(
- serializableExecutionGraphInfo.getJobId(), serializableExecutionGraphInfo);
+ public void put(ArchivedApplication archivedApplication) throws IOException {
+ final ApplicationID applicationId = archivedApplication.getApplicationId();
+ archivedApplication
+ .getJobs()
+ .keySet()
+ .forEach(jobId -> jobIdToApplicationId.put(jobId, applicationId));
+
+ archivedApplicationCache.put(archivedApplication.getApplicationId(), archivedApplication);
}
@Override
- public JobsOverview getStoredJobsOverview() {
- Collection allJobStatus =
- serializableExecutionGraphInfos.asMap().values().stream()
- .map(ExecutionGraphInfo::getArchivedExecutionGraph)
- .map(ArchivedExecutionGraph::getState)
- .collect(Collectors.toList());
+ public Collection getApplicationDetails() {
+ return archivedApplicationCache.asMap().values().stream()
+ .map(ApplicationDetails::fromArchivedApplication)
+ .collect(Collectors.toList());
+ }
- return JobsOverview.create(allJobStatus);
+ @Override
+ public Optional getExecutionGraphInfo(JobID jobId) {
+ final ApplicationID applicationId = jobIdToApplicationId.get(jobId);
+ if (applicationId == null) {
+ return Optional.empty();
+ }
+ final ArchivedApplication archivedApplication =
+ archivedApplicationCache.getIfPresent(applicationId);
+ return Optional.ofNullable(archivedApplication)
+ .map(application -> application.getJobs().get(jobId));
}
@Override
- public Collection getAvailableJobDetails() {
- return serializableExecutionGraphInfos.asMap().values().stream()
+ public Collection getJobDetails() {
+ return archivedApplicationCache.asMap().values().stream()
+ .flatMap(archivedApplication -> archivedApplication.getJobs().values().stream())
.map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
}
- @Nullable
@Override
- public JobDetails getAvailableJobDetails(JobID jobId) {
- final ExecutionGraphInfo archivedExecutionGraphInfo =
- serializableExecutionGraphInfos.getIfPresent(jobId);
+ public JobsOverview getJobsOverview() {
+ Collection allJobStatus =
+ archivedApplicationCache.asMap().values().stream()
+ .flatMap(
+ archivedApplication ->
+ archivedApplication.getJobs().values().stream())
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
+ .map(ArchivedExecutionGraph::getState)
+ .collect(Collectors.toList());
- if (archivedExecutionGraphInfo != null) {
- return JobDetails.createDetailsForJob(
- archivedExecutionGraphInfo.getArchivedExecutionGraph());
- } else {
- return null;
- }
+ return JobsOverview.create(allJobStatus);
}
@Override
@@ -144,6 +160,6 @@ public void close() throws IOException {
cleanupFuture.cancel(false);
}
- serializableExecutionGraphInfos.invalidateAll();
+ archivedApplicationCache.invalidateAll();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
index 9ee850f6ec660..8a10f5f547aea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
@@ -51,7 +51,7 @@ public class PartialDispatcherServices {
@Nonnull private final JobManagerMetricGroupFactory jobManagerMetricGroupFactory;
- @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore;
+ @Nonnull private final ArchivedApplicationStore archivedApplicationStore;
@Nonnull private final FatalErrorHandler fatalErrorHandler;
@@ -72,7 +72,7 @@ public PartialDispatcherServices(
@Nonnull BlobServer blobServer,
@Nonnull HeartbeatServices heartbeatServices,
@Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
- @Nonnull ExecutionGraphInfoStore executionGraphInfoStore,
+ @Nonnull ArchivedApplicationStore archivedApplicationStore,
@Nonnull FatalErrorHandler fatalErrorHandler,
@Nonnull HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@@ -85,7 +85,7 @@ public PartialDispatcherServices(
this.blobServer = blobServer;
this.heartbeatServices = heartbeatServices;
this.jobManagerMetricGroupFactory = jobManagerMetricGroupFactory;
- this.executionGraphInfoStore = executionGraphInfoStore;
+ this.archivedApplicationStore = archivedApplicationStore;
this.fatalErrorHandler = fatalErrorHandler;
this.historyServerArchivist = historyServerArchivist;
this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -125,8 +125,8 @@ public JobManagerMetricGroupFactory getJobManagerMetricGroupFactory() {
}
@Nonnull
- public ExecutionGraphInfoStore getArchivedExecutionGraphStore() {
- return executionGraphInfoStore;
+ public ArchivedApplicationStore getArchivedApplicationStore() {
+ return archivedApplicationStore;
}
@Nonnull
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
index 2c967a526491d..4f701bbffd0cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
@@ -48,7 +48,7 @@ private PartialDispatcherServicesWithJobPersistenceComponents(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
- ExecutionGraphInfoStore executionGraphInfoStore,
+ ArchivedApplicationStore archivedApplicationStore,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@@ -64,7 +64,7 @@ private PartialDispatcherServicesWithJobPersistenceComponents(
blobServer,
heartbeatServices,
jobManagerMetricGroupFactory,
- executionGraphInfoStore,
+ archivedApplicationStore,
fatalErrorHandler,
historyServerArchivist,
metricQueryServiceAddress,
@@ -94,7 +94,7 @@ public static PartialDispatcherServicesWithJobPersistenceComponents from(
partialDispatcherServices.getBlobServer(),
partialDispatcherServices.getHeartbeatServices(),
partialDispatcherServices.getJobManagerMetricGroupFactory(),
- partialDispatcherServices.getArchivedExecutionGraphStore(),
+ partialDispatcherServices.getArchivedApplicationStore(),
partialDispatcherServices.getFatalErrorHandler(),
partialDispatcherServices.getHistoryServerArchivist(),
partialDispatcherServices.getMetricQueryServiceAddress(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ced8edeb072b5..de80e508eeac3 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -42,7 +42,7 @@
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
@@ -169,7 +169,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
@GuardedBy("lock")
private DeterminismEnvelope workingDirectory;
- private ExecutionGraphInfoStore executionGraphInfoStore;
+ private ArchivedApplicationStore archivedApplicationStore;
private final Thread shutDownHook;
private RpcSystem rpcSystem;
@@ -301,7 +301,7 @@ private void runCluster(Configuration configuration, PluginManager pluginManager
heartbeatServices,
delegationTokenManager,
metricRegistry,
- executionGraphInfoStore,
+ archivedApplicationStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
failureEnrichers,
@@ -419,8 +419,8 @@ protected void initializeServices(Configuration configuration, PluginManager plu
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
- executionGraphInfoStore =
- createSerializableExecutionGraphStore(
+ archivedApplicationStore =
+ createArchivedApplicationStore(
configuration, commonRpcService.getScheduledExecutor());
}
}
@@ -512,10 +512,10 @@ protected CompletableFuture stopClusterServices(boolean cleanupHaData) {
}
}
- if (executionGraphInfoStore != null) {
+ if (archivedApplicationStore != null) {
try {
- executionGraphInfoStore.close();
- executionGraphInfoStore = null;
+ archivedApplicationStore.close();
+ archivedApplicationStore = null;
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
@@ -693,7 +693,7 @@ protected void cleanupDirectories(ShutdownBehaviour shutdownBehaviour) throws IO
createDispatcherResourceManagerComponentFactory(Configuration configuration)
throws IOException;
- protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore(
+ protected abstract ArchivedApplicationStore createArchivedApplicationStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;
public static EntrypointClusterConfiguration parseArguments(String[] args)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 193eef0937b2a..5e603807cd0e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -21,9 +21,10 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
-import org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
+import org.apache.flink.runtime.dispatcher.FileArchivedApplicationStore;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
+import org.apache.flink.runtime.util.ArchivedApplicationStoreUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
@@ -40,23 +41,25 @@ public SessionClusterEntrypoint(Configuration configuration) {
}
@Override
- protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
+ protected ArchivedApplicationStore createArchivedApplicationStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException {
- final JobManagerOptions.JobStoreType jobStoreType =
- configuration.get(JobManagerOptions.JOB_STORE_TYPE);
+ final JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType =
+ configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE);
final Duration expirationTime =
- Duration.ofSeconds(configuration.get(JobManagerOptions.JOB_STORE_EXPIRATION_TIME));
- final int maximumCapacity = configuration.get(JobManagerOptions.JOB_STORE_MAX_CAPACITY);
+ ArchivedApplicationStoreUtils.getExpirationTime(configuration);
+ final int maximumCapacity =
+ configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_MAX_CAPACITY);
- switch (jobStoreType) {
+ switch (archivedApplicationStoreType) {
case File:
{
final File tmpDir =
new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);
final long maximumCacheSizeBytes =
- configuration.get(JobManagerOptions.JOB_STORE_CACHE_SIZE);
+ configuration.get(
+ JobManagerOptions.COMPLETED_APPLICATION_STORE_CACHE_SIZE);
- return new FileExecutionGraphInfoStore(
+ return new FileArchivedApplicationStore(
tmpDir,
expirationTime,
maximumCapacity,
@@ -66,7 +69,7 @@ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
}
case Memory:
{
- return new MemoryExecutionGraphInfoStore(
+ return new MemoryArchivedApplicationStore(
expirationTime,
maximumCapacity,
scheduledExecutor,
@@ -75,7 +78,8 @@ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
default:
{
throw new IllegalArgumentException(
- "Unsupported job store type " + jobStoreType);
+ "Unsupported archived application store type "
+ + archivedApplicationStoreType);
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index c1178130c9f87..cf5b39a0322dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -24,10 +24,10 @@
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
@@ -111,7 +111,7 @@ public DispatcherResourceManagerComponent create(
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
MetricRegistry metricRegistry,
- ExecutionGraphInfoStore executionGraphInfoStore,
+ ArchivedApplicationStore archivedApplicationStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
Collection failureEnrichers,
FatalErrorHandler fatalErrorHandler)
@@ -213,7 +213,7 @@ public DispatcherResourceManagerComponent create(
() ->
JobManagerMetricGroup.createJobManagerMetricGroup(
metricRegistry, hostname),
- executionGraphInfoStore,
+ archivedApplicationStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
index 52ee8d3bc7e96..49232ce2f0b21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -22,7 +22,7 @@
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -47,7 +47,7 @@ DispatcherResourceManagerComponent create(
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
MetricRegistry metricRegistry,
- ExecutionGraphInfoStore executionGraphInfoStore,
+ ArchivedApplicationStore archivedApplicationStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
Collection failureEnrichers,
FatalErrorHandler fatalErrorHandler)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java
index d567173ee4d25..91155099c85fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java
@@ -181,8 +181,10 @@ public static ApplicationDetails fromArchivedApplication(
: -1L;
long duration = (endTime >= 0L ? endTime : System.currentTimeMillis()) - startTime;
Map jobInfo = new HashMap<>();
- archivedApplication.getJobs().stream()
- .map(archivedExecutionGraph -> archivedExecutionGraph.getState().name())
+ archivedApplication.getJobs().values().stream()
+ .map(
+ executionGraphInfo ->
+ executionGraphInfo.getArchivedExecutionGraph().getState().name())
.forEach(
status ->
jobInfo.compute(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
index 5669e5d0f00d0..abea8264b2fbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java
@@ -24,6 +24,7 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
@@ -212,7 +213,8 @@ public static ApplicationDetailsInfo fromArchivedApplication(
timestamps.put(status.toString(), archivedApplication.getStatusTimestamp(status));
}
final Collection jobs =
- archivedApplication.getJobs().stream()
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
return new ApplicationDetailsInfo(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 4bd331407cc85..a0327245bd10f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -34,6 +34,7 @@
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.application.AbstractApplication;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
@@ -44,7 +45,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -561,7 +562,7 @@ private void setupDispatcherResourceManagerComponents(
heartbeatServices,
delegationTokenManager,
metricRegistry,
- new MemoryExecutionGraphInfoStore(),
+ new MemoryArchivedApplicationStore(),
metricQueryServiceRetriever,
Collections.emptySet(),
fatalErrorHandler);
@@ -1063,6 +1064,13 @@ public JobExecutionResult executeJobBlocking(JobGraph job)
}
}
+ public CompletableFuture submitApplication(AbstractApplication application) {
+ final CompletableFuture dispatcherGatewayFuture =
+ getDispatcherGatewayFuture();
+ return dispatcherGatewayFuture.thenCompose(
+ dispatcherGateway -> dispatcherGateway.submitApplication(application, rpcTimeout));
+ }
+
public CompletableFuture submitJob(ExecutionPlan executionPlan) {
if (executionPlan instanceof StreamGraph) {
try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ArchivedApplicationStoreUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ArchivedApplicationStoreUtils.java
new file mode 100644
index 0000000000000..433dc857b8b38
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ArchivedApplicationStoreUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/** Utilities for configuring of {@link ArchivedApplicationStore}. */
+public final class ArchivedApplicationStoreUtils {
+
+ public static Duration getExpirationTime(Configuration configuration) {
+ Optional optionalJobStoreExpirationTime =
+ configuration.getOptional(JobManagerOptions.JOB_STORE_EXPIRATION_TIME);
+ Optional optionalCompletedApplicationStoreExpirationTime =
+ configuration.getOptional(
+ JobManagerOptions.COMPLETED_APPLICATION_STORE_EXPIRATION_TIME);
+ if (optionalJobStoreExpirationTime.isPresent()
+ && optionalCompletedApplicationStoreExpirationTime.isEmpty()) {
+ return Duration.ofSeconds(optionalJobStoreExpirationTime.get());
+ }
+
+ return configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_EXPIRATION_TIME);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
index ef609e41c3ead..a68b5453a3c95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java
@@ -30,6 +30,9 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -212,6 +215,20 @@ void testTransitionFromFailed(ApplicationState targetState) {
assertThrows(IllegalStateException.class, () -> application.transitionState(targetState));
}
+ @Test
+ void testApplicationStatusChange() {
+ AbstractApplication application = new MockApplication(new ApplicationID());
+ MockApplicationStatusListener listener = new MockApplicationStatusListener();
+ application.registerStatusListener(listener);
+
+ application.transitionToRunning();
+ application.transitionToFinished();
+
+ assertEquals(
+ Arrays.asList(ApplicationState.RUNNING, ApplicationState.FINISHED),
+ listener.getTargetStates());
+ }
+
private static class MockApplication extends AbstractApplication {
public MockApplication(ApplicationID applicationId) {
super(applicationId);
@@ -237,4 +254,18 @@ public String getName() {
return "Mock Application";
}
}
+
+ private static class MockApplicationStatusListener implements ApplicationStatusListener {
+ private final List targetStates = new ArrayList<>();
+
+ @Override
+ public void notifyApplicationStatusChange(
+ ApplicationID applicationId, ApplicationState newStatus) {
+ targetStates.add(newStatus);
+ }
+
+ public List getTargetStates() {
+ return targetStates;
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreITCase.java
new file mode 100644
index 0000000000000..f2c1337d8947c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreITCase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.application.SingleJobApplication;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the {@link ArchivedApplicationStore}. */
+class ArchivedApplicationStoreITCase {
+
+ @RegisterExtension
+ static final TestExecutorExtension EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
+
+ @TempDir File temporaryFolder;
+
+ @ParameterizedTest
+ @EnumSource(value = JobManagerOptions.ArchivedApplicationStoreType.class)
+ void testArchivedApplicationStore(
+ JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType)
+ throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.set(
+ JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE, archivedApplicationStoreType);
+ try (final MiniCluster miniCluster =
+ new ArchivedApplicationStoreTestUtils.PersistingMiniCluster(
+ new MiniClusterConfiguration.Builder()
+ .withRandomPorts()
+ .setConfiguration(configuration)
+ .build(),
+ temporaryFolder,
+ new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
+ miniCluster.start();
+
+ final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
+ miniCluster.submitApplication(new SingleJobApplication(jobGraph)).get();
+
+ JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).get();
+ assertEquals(JobStatus.FINISHED, jobResult.getJobStatus().orElse(null));
+
+ Collection jobs = miniCluster.listJobs().get();
+ assertEquals(1, jobs.size());
+ assertEquals(jobGraph.getJobID(), jobs.iterator().next().getJobId());
+ }
+ }
+
+ /** Tests that a session cluster can terminate gracefully when jobs are still running. */
+ @ParameterizedTest
+ @EnumSource(value = JobManagerOptions.ArchivedApplicationStoreType.class)
+ void testSuspendedJobOnClusterShutdown(
+ JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType)
+ throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.set(
+ JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE, archivedApplicationStoreType);
+ try (final MiniCluster miniCluster =
+ new ArchivedApplicationStoreTestUtils.PersistingMiniCluster(
+ new MiniClusterConfiguration.Builder()
+ .withRandomPorts()
+ .setConfiguration(configuration)
+ .build(),
+ temporaryFolder,
+ new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
+ miniCluster.start();
+ final JobVertex vertex = new JobVertex("blockingVertex");
+ // The adaptive scheduler expects that every vertex has a configured parallelism
+ vertex.setParallelism(1);
+ vertex.setInvokableClass(
+ ArchivedApplicationStoreTestUtils.SignallingBlockingNoOpInvokable.class);
+ final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
+ miniCluster.submitJob(jobGraph);
+ ArchivedApplicationStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await();
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
similarity index 66%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
index a143827f75004..61e027f343ae9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java
@@ -18,10 +18,14 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.application.ArchivedApplication;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
@@ -31,6 +35,7 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -59,18 +64,24 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
-/** Test utils class for {@link FileExecutionGraphInfoStore}. */
-public class ExecutionGraphInfoStoreTestUtils {
+/** Test utils class for {@link ArchivedApplicationStore}. */
+public class ArchivedApplicationStoreTestUtils {
static final List GLOBALLY_TERMINAL_JOB_STATUS =
Arrays.stream(JobStatus.values())
.filter(JobStatus::isGloballyTerminalState)
.collect(Collectors.toList());
+ static final List TERMINAL_APPLICATION_STATUS =
+ Arrays.stream(ApplicationState.values())
+ .filter(ApplicationState::isTerminalState)
+ .collect(Collectors.toList());
+
/**
* Generate a specified of ExecutionGraphInfo.
*
@@ -93,6 +104,38 @@ static Collection generateTerminalExecutionGraphInfos(int nu
return executionGraphInfos;
}
+ /**
+ * Generate a specified of ArchivedApplication.
+ *
+ * @param number the given number
+ * @return the result ArchivedApplication collection
+ */
+ static Collection generateTerminalArchivedApplications(int number) {
+ final Collection archivedApplications = new ArrayList<>(number);
+
+ for (int i = 0; i < number; i++) {
+ final ApplicationState state =
+ TERMINAL_APPLICATION_STATUS.get(
+ ThreadLocalRandom.current()
+ .nextInt(TERMINAL_APPLICATION_STATUS.size()));
+ final Map jobs =
+ generateTerminalExecutionGraphInfos(1).stream()
+ .collect(
+ Collectors.toMap(
+ ExecutionGraphInfo::getJobId,
+ executionGraphInfo -> executionGraphInfo));
+ archivedApplications.add(
+ new ArchivedApplication(
+ ApplicationID.generate(),
+ "test-application-" + i,
+ state,
+ new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
+ jobs));
+ }
+
+ return archivedApplications;
+ }
+
/** Compare whether two ExecutionGraphInfo instances are equivalent. */
static final class PartialExecutionGraphInfoMatcher extends BaseMatcher {
@@ -142,18 +185,64 @@ public void describeTo(Description description) {
}
}
+ /** Compare whether two ArchivedApplication instances are equivalent. */
+ static final class PartialArchivedApplicationMatcher extends BaseMatcher {
+
+ private final ArchivedApplication expectedArchivedApplication;
+
+ PartialArchivedApplicationMatcher(ArchivedApplication expectedArchivedApplication) {
+ this.expectedArchivedApplication =
+ Preconditions.checkNotNull(expectedArchivedApplication);
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ if (expectedArchivedApplication == o) {
+ return true;
+ }
+ if (o == null || expectedArchivedApplication.getClass() != o.getClass()) {
+ return false;
+ }
+ ArchivedApplication that = (ArchivedApplication) o;
+
+ return Objects.equals(
+ expectedArchivedApplication.getApplicationId(), that.getApplicationId())
+ && Objects.equals(
+ expectedArchivedApplication.getApplicationName(),
+ that.getApplicationName())
+ && expectedArchivedApplication.getApplicationStatus()
+ == that.getApplicationStatus()
+ && Objects.equals(
+ expectedArchivedApplication.getJobs().size(), that.getJobs().size());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(
+ "Matches against " + ArchivedApplication.class.getSimpleName() + '.');
+ }
+ }
+
static Collection generateJobDetails(
- Collection executionGraphInfos) {
- return executionGraphInfos.stream()
+ Collection archivedApplications) {
+ return archivedApplications.stream()
+ .flatMap(archivedApplication -> archivedApplication.getJobs().values().stream())
.map(ExecutionGraphInfo::getArchivedExecutionGraph)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
}
+ static Collection generateApplicationDetails(
+ Collection archivedApplications) {
+ return archivedApplications.stream()
+ .map(ApplicationDetails::fromArchivedApplication)
+ .collect(Collectors.toList());
+ }
+
/**
* Invokable which signals with {@link
- * ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable#LATCH} when it is invoked
- * and blocks forever afterwards.
+ * ArchivedApplicationStoreTestUtils.SignallingBlockingNoOpInvokable#LATCH} when it is invoked
+ * and blocks forever afterward.
*/
public static class SignallingBlockingNoOpInvokable extends AbstractInvokable {
@@ -171,7 +260,7 @@ public void invoke() throws Exception {
}
}
- /** MiniCluster with specified {@link ExecutionGraphInfoStore}. */
+ /** MiniCluster with specified {@link ArchivedApplicationStore}. */
static class PersistingMiniCluster extends MiniCluster {
@Nullable private final File rootDir;
private final ScheduledExecutor scheduledExecutor;
@@ -209,25 +298,26 @@ static class PersistingMiniCluster extends MiniCluster {
.createSessionComponentFactory(
StandaloneResourceManagerFactory.getInstance());
- JobManagerOptions.JobStoreType jobStoreType =
- configuration.get(JobManagerOptions.JOB_STORE_TYPE);
- final ExecutionGraphInfoStore executionGraphInfoStore;
- switch (jobStoreType) {
+ JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType =
+ configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE);
+ final ArchivedApplicationStore archivedApplicationStore;
+ switch (archivedApplicationStoreType) {
case File:
{
- executionGraphInfoStore =
- createDefaultExecutionGraphInfoStore(rootDir, scheduledExecutor);
+ archivedApplicationStore =
+ createFileArchivedApplicationStore(rootDir, scheduledExecutor);
break;
}
case Memory:
{
- executionGraphInfoStore = new MemoryExecutionGraphInfoStore();
+ archivedApplicationStore = new MemoryArchivedApplicationStore();
break;
}
default:
{
throw new UnsupportedOperationException(
- "Unsupported job store type " + jobStoreType);
+ "Unsupported archived application store type "
+ + archivedApplicationStoreType);
}
}
@@ -242,16 +332,16 @@ static class PersistingMiniCluster extends MiniCluster {
heartbeatServices,
delegationTokenManager,
metricRegistry,
- executionGraphInfoStore,
+ archivedApplicationStore,
metricQueryServiceRetriever,
Collections.emptySet(),
fatalErrorHandler));
}
}
- static FileExecutionGraphInfoStore createDefaultExecutionGraphInfoStore(
+ static FileArchivedApplicationStore createFileArchivedApplicationStore(
File storageDirectory, ScheduledExecutor scheduledExecutor) throws IOException {
- return new FileExecutionGraphInfoStore(
+ return new FileArchivedApplicationStore(
storageDirectory,
Duration.ofHours(1L),
Integer.MAX_VALUE,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStoreTest.java
new file mode 100644
index 0000000000000..d810a8ee77b4d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStoreTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.util.ManualTicker;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.createFileArchivedApplicationStore;
+import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateTerminalArchivedApplications;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Tests for the {@link FileArchivedApplicationStore}. */
+class FileArchivedApplicationStoreTest extends MemoryArchivedApplicationStoreTest {
+
+ @RegisterExtension
+ static final TestExecutorExtension EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
+
+ @TempDir File temporaryFolder;
+
+ /** Tests that an expired application is removed from the store. */
+ @Test
+ void testArchivedApplicationExpiration() throws Exception {
+ final ArchivedApplication archivedApplication =
+ generateTerminalArchivedApplications(1).iterator().next();
+ final ExecutionGraphInfo executionGraphInfo =
+ archivedApplication.getJobs().values().iterator().next();
+ final ApplicationID applicationId = archivedApplication.getApplicationId();
+ final JobID jobId = executionGraphInfo.getJobId();
+
+ final Duration expirationTime = Duration.ofMillis(1L);
+
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+
+ final ManualTicker manualTicker = new ManualTicker();
+
+ try (final FileArchivedApplicationStore archivedApplicationStore =
+ new FileArchivedApplicationStore(
+ temporaryFolder,
+ expirationTime,
+ Integer.MAX_VALUE,
+ 10000L,
+ scheduledExecutor,
+ manualTicker)) {
+
+ archivedApplicationStore.put(archivedApplication);
+
+ // there should one application
+ assertEquals(1, archivedApplicationStore.size());
+
+ manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS);
+
+ // this should trigger the cleanup after expiration
+ scheduledExecutor.triggerScheduledTasks();
+
+ // check that the store is empty
+ assertEquals(0, archivedApplicationStore.size());
+
+ assertNull(archivedApplicationStore.get(applicationId).orElse(null));
+
+ assertNull(archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null));
+
+ final File storageDirectory = archivedApplicationStore.getStorageDir();
+
+ // check that the persisted file has been deleted
+ assertEquals(0, storageDirectory.listFiles().length);
+ }
+ }
+
+ /** Tests that all applications are cleaned up after closing the store. */
+ @Test
+ void testCloseCleansUp() throws IOException {
+ assertEquals(0, temporaryFolder.listFiles().length);
+
+ final ArchivedApplication archivedApplication =
+ generateTerminalArchivedApplications(1).iterator().next();
+
+ try (final FileArchivedApplicationStore archivedApplicationStore =
+ createFileArchivedApplicationStore(
+ temporaryFolder,
+ new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
+
+ assertEquals(1, temporaryFolder.listFiles().length);
+
+ final File storageDirectory = archivedApplicationStore.getStorageDir();
+
+ assertEquals(0, storageDirectory.listFiles().length);
+
+ archivedApplicationStore.put(archivedApplication);
+
+ assertEquals(1, storageDirectory.listFiles().length);
+
+ archivedApplicationStore.close();
+
+ assertEquals(0, temporaryFolder.listFiles().length);
+ }
+ }
+
+ /** Tests that evicted {@link ArchivedApplication} are loaded from disk again. */
+ @Test
+ void testCacheLoading() throws IOException {
+ try (final FileArchivedApplicationStore archivedApplicationStore =
+ new FileArchivedApplicationStore(
+ temporaryFolder,
+ Duration.ofHours(1L),
+ Integer.MAX_VALUE,
+ 100L << 10,
+ new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()),
+ Ticker.systemTicker())) {
+
+ final LoadingCache archivedApplicationCache =
+ archivedApplicationStore.getArchivedApplicationCache();
+
+ Collection archivedApplications = new ArrayList<>(64);
+
+ boolean continueInserting = true;
+
+ // insert applications until the first one got evicted
+ while (continueInserting) {
+ // has roughly a size of 3.1 KB
+ final ArchivedApplication archivedApplication =
+ generateTerminalArchivedApplications(1).iterator().next();
+
+ archivedApplicationStore.put(archivedApplication);
+
+ archivedApplications.add(archivedApplication);
+
+ continueInserting = archivedApplicationCache.size() == archivedApplications.size();
+ }
+
+ final File storageDirectory = archivedApplicationStore.getStorageDir();
+
+ assertEquals(archivedApplications.size(), storageDirectory.listFiles().length);
+
+ for (ArchivedApplication archivedApplication : archivedApplications) {
+ final ExecutionGraphInfo executionGraphInfo =
+ archivedApplication.getJobs().values().iterator().next();
+ final ApplicationID applicationId = archivedApplication.getApplicationId();
+ final JobID jobId = executionGraphInfo.getJobId();
+
+ assertThat(
+ archivedApplicationStore.get(applicationId).orElse(null),
+ new ArchivedApplicationStoreTestUtils.PartialArchivedApplicationMatcher(
+ archivedApplication));
+
+ assertThat(
+ archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null),
+ new ArchivedApplicationStoreTestUtils.PartialExecutionGraphInfoMatcher(
+ executionGraphInfo));
+ }
+ }
+ }
+
+ @Override
+ ArchivedApplicationStore createDefaultArchivedApplicationStore() throws IOException {
+ return createFileArchivedApplicationStore(
+ temporaryFolder,
+ new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));
+ }
+
+ @Override
+ ArchivedApplicationStore createArchivedApplicationStore(int maximumCapacity)
+ throws IOException {
+ return new FileArchivedApplicationStore(
+ temporaryFolder,
+ Duration.ofHours(1L),
+ maximumCapacity,
+ 10000L,
+ new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()),
+ Ticker.systemTicker());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
deleted file mode 100644
index ed16309be77e9..0000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
-import org.apache.flink.runtime.util.ManualTicker;
-import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
-import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache;
-
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.createDefaultExecutionGraphInfoStore;
-import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateJobDetails;
-import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/** Tests for the {@link FileExecutionGraphInfoStore}. */
-public class FileExecutionGraphInfoStoreTest extends TestLogger {
-
- @ClassRule
- public static final TestExecutorResource EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
-
- @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- /**
- * Tests that we can put {@link ExecutionGraphInfo} into the {@link FileExecutionGraphInfoStore}
- * and that the graph is persisted.
- */
- @Test
- public void testPut() throws IOException {
- assertPutJobGraphWithStatus(JobStatus.FINISHED);
- }
-
- /** Tests that a SUSPENDED job can be persisted. */
- @Test
- public void testPutSuspendedJob() throws IOException {
- assertPutJobGraphWithStatus(JobStatus.SUSPENDED);
- }
-
- /** Tests that null is returned if we request an unknown JobID. */
- @Test
- public void testUnknownGet() throws IOException {
- final File rootDir = temporaryFolder.newFolder();
-
- try (final FileExecutionGraphInfoStore executionGraphStore =
- createDefaultExecutionGraphInfoStore(
- rootDir,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
- assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue());
- }
- }
-
- /** Tests that we obtain the correct jobs overview. */
- @Test
- public void testStoredJobsOverview() throws IOException {
- final int numberExecutionGraphs = 10;
- final Collection executionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
-
- final List jobStatuses =
- executionGraphInfos.stream()
- .map(ExecutionGraphInfo::getArchivedExecutionGraph)
- .map(ArchivedExecutionGraph::getState)
- .collect(Collectors.toList());
-
- final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses);
-
- final File rootDir = temporaryFolder.newFolder();
-
- try (final FileExecutionGraphInfoStore executionGraphInfoStore =
- createDefaultExecutionGraphInfoStore(
- rootDir,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
- for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- }
-
- assertThat(
- executionGraphInfoStore.getStoredJobsOverview(),
- Matchers.equalTo(expectedJobsOverview));
- }
- }
-
- /** Tests that we obtain the correct collection of available job details. */
- @Test
- public void testAvailableJobDetails() throws IOException {
- final int numberExecutionGraphs = 10;
- final Collection executionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
-
- final Collection jobDetails = generateJobDetails(executionGraphInfos);
-
- final File rootDir = temporaryFolder.newFolder();
-
- try (final FileExecutionGraphInfoStore executionGraphInfoStore =
- createDefaultExecutionGraphInfoStore(
- rootDir,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
- for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- }
-
- assertThat(
- executionGraphInfoStore.getAvailableJobDetails(),
- Matchers.containsInAnyOrder(jobDetails.toArray()));
- }
- }
-
- /** Tests that an expired execution graph is removed from the execution graph store. */
- @Test
- public void testExecutionGraphExpiration() throws Exception {
- final File rootDir = temporaryFolder.newFolder();
-
- final Duration expirationTime = Duration.ofMillis(1L);
-
- final ManuallyTriggeredScheduledExecutor scheduledExecutor =
- new ManuallyTriggeredScheduledExecutor();
-
- final ManualTicker manualTicker = new ManualTicker();
-
- try (final FileExecutionGraphInfoStore executionGraphInfoStore =
- new FileExecutionGraphInfoStore(
- rootDir,
- expirationTime,
- Integer.MAX_VALUE,
- 10000L,
- scheduledExecutor,
- manualTicker)) {
-
- final ExecutionGraphInfo executionGraphInfo =
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setState(JobStatus.FINISHED)
- .build());
-
- executionGraphInfoStore.put(executionGraphInfo);
-
- // there should one execution graph
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1));
-
- manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS);
-
- // this should trigger the cleanup after expiration
- scheduledExecutor.triggerScheduledTasks();
-
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
-
- assertThat(
- executionGraphInfoStore.get(executionGraphInfo.getJobId()),
- Matchers.nullValue());
-
- final File storageDirectory = executionGraphInfoStore.getStorageDir();
-
- // check that the persisted file has been deleted
- assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
- }
- }
-
- /** Tests that all persisted files are cleaned up after closing the store. */
- @Test
- public void testCloseCleansUp() throws IOException {
- final File rootDir = temporaryFolder.newFolder();
-
- assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
-
- try (final FileExecutionGraphInfoStore executionGraphInfoStore =
- createDefaultExecutionGraphInfoStore(
- rootDir,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
-
- assertThat(rootDir.listFiles().length, Matchers.equalTo(1));
-
- final File storageDirectory = executionGraphInfoStore.getStorageDir();
-
- assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
-
- executionGraphInfoStore.put(
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setState(JobStatus.FINISHED)
- .build()));
-
- assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
- }
-
- assertThat(rootDir.listFiles().length, Matchers.equalTo(0));
- }
-
- /** Tests that evicted {@link ExecutionGraphInfo} are loaded from disk again. */
- @Test
- public void testCacheLoading() throws IOException {
- final File rootDir = temporaryFolder.newFolder();
-
- try (final FileExecutionGraphInfoStore executionGraphInfoStore =
- new FileExecutionGraphInfoStore(
- rootDir,
- Duration.ofHours(1L),
- Integer.MAX_VALUE,
- 100L << 10,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()),
- Ticker.systemTicker())) {
-
- final LoadingCache executionGraphInfoCache =
- executionGraphInfoStore.getExecutionGraphInfoCache();
-
- Collection executionGraphInfos = new ArrayList<>(64);
-
- boolean continueInserting = true;
-
- // insert execution graphs until the first one got evicted
- while (continueInserting) {
- // has roughly a size of 1.4 KB
- final ExecutionGraphInfo executionGraphInfo =
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setState(JobStatus.FINISHED)
- .build());
-
- executionGraphInfoStore.put(executionGraphInfo);
-
- executionGraphInfos.add(executionGraphInfo);
-
- continueInserting = executionGraphInfoCache.size() == executionGraphInfos.size();
- }
-
- final File storageDirectory = executionGraphInfoStore.getStorageDir();
-
- assertThat(
- storageDirectory.listFiles().length,
- Matchers.equalTo(executionGraphInfos.size()));
-
- for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
- assertThat(
- executionGraphInfoStore.get(executionGraphInfo.getJobId()),
- matchesPartiallyWith(executionGraphInfo));
- }
- }
- }
-
- /**
- * Tests that the size of {@link FileExecutionGraphInfoStore} is no more than the configured max
- * capacity and the old execution graphs will be purged if the total added number exceeds the
- * max capacity.
- */
- @Test
- public void testMaximumCapacity() throws IOException {
- final File rootDir = temporaryFolder.newFolder();
-
- final int maxCapacity = 10;
- final int numberExecutionGraphs = 10;
-
- final Collection oldExecutionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
- final Collection newExecutionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
-
- final Collection jobDetails = generateJobDetails(newExecutionGraphInfos);
-
- try (final FileExecutionGraphInfoStore executionGraphInfoStore =
- new FileExecutionGraphInfoStore(
- rootDir,
- Duration.ofHours(1L),
- maxCapacity,
- 10000L,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()),
- Ticker.systemTicker())) {
-
- for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- // no more than the configured maximum capacity
- assertTrue(executionGraphInfoStore.size() <= maxCapacity);
- }
-
- for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- // equals to the configured maximum capacity
- assertEquals(maxCapacity, executionGraphInfoStore.size());
- }
-
- // the older execution graphs are purged
- assertThat(
- executionGraphInfoStore.getAvailableJobDetails(),
- Matchers.containsInAnyOrder(jobDetails.toArray()));
- }
- }
-
- /** Tests that a session cluster can terminate gracefully when jobs are still running. */
- @Test
- public void testPutSuspendedJobOnClusterShutdown() throws Exception {
- File rootDir = temporaryFolder.newFolder();
- try (final MiniCluster miniCluster =
- new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster(
- new MiniClusterConfiguration.Builder().withRandomPorts().build(),
- rootDir,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
- miniCluster.start();
- final JobVertex vertex = new JobVertex("blockingVertex");
- // The adaptive scheduler expects that every vertex has a configured parallelism
- vertex.setParallelism(1);
- vertex.setInvokableClass(
- ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class);
- final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
- miniCluster.submitJob(jobGraph);
- ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await();
- }
- }
-
- private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws IOException {
- final ExecutionGraphInfo dummyExecutionGraphInfo =
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder().setState(jobStatus).build());
- final File rootDir = temporaryFolder.newFolder();
-
- try (final FileExecutionGraphInfoStore executionGraphStore =
- createDefaultExecutionGraphInfoStore(
- rootDir,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
-
- final File storageDirectory = executionGraphStore.getStorageDir();
-
- // check that the storage directory is empty
- assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0));
-
- executionGraphStore.put(dummyExecutionGraphInfo);
-
- // check that we have persisted the given execution graph
- assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1));
-
- assertThat(
- executionGraphStore.get(dummyExecutionGraphInfo.getJobId()),
- new ExecutionGraphInfoStoreTestUtils.PartialExecutionGraphInfoMatcher(
- dummyExecutionGraphInfo));
- }
- }
-
- private static Matcher matchesPartiallyWith(
- ExecutionGraphInfo executionGraphInfo) {
- return new ExecutionGraphInfoStoreTestUtils.PartialExecutionGraphInfoMatcher(
- executionGraphInfo);
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStoreTest.java
new file mode 100644
index 0000000000000..a79b2776e95c9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStoreTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.util.ManualTicker;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
+
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateApplicationDetails;
+import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateJobDetails;
+import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateTerminalArchivedApplications;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for the {@link MemoryArchivedApplicationStore}. */
+class MemoryArchivedApplicationStoreTest extends TestLogger {
+
+ @RegisterExtension
+ static final TestExecutorExtension EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
+
+ /**
+ * Tests that we can put {@link ArchivedApplication} into the {@link ArchivedApplicationStore}
+ * and that the application is persisted.
+ */
+ @Test
+ void testPut() throws IOException {
+ final ArchivedApplication expectedArchivedApplication =
+ generateTerminalArchivedApplications(1).iterator().next();
+ final ExecutionGraphInfo expectedExecutionGraphInfo =
+ expectedArchivedApplication.getJobs().values().iterator().next();
+ final ApplicationID applicationId = expectedArchivedApplication.getApplicationId();
+ final JobID jobId = expectedExecutionGraphInfo.getJobId();
+
+ try (final ArchivedApplicationStore archivedApplicationStore =
+ createDefaultArchivedApplicationStore()) {
+
+ // check that the store is empty
+ assertEquals(0, archivedApplicationStore.size());
+
+ archivedApplicationStore.put(expectedArchivedApplication);
+
+ // check that we have persisted the given application
+ assertEquals(1, archivedApplicationStore.size());
+
+ assertThat(
+ archivedApplicationStore.get(applicationId).orElse(null),
+ new ArchivedApplicationStoreTestUtils.PartialArchivedApplicationMatcher(
+ expectedArchivedApplication));
+
+ assertThat(
+ archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null),
+ new ArchivedApplicationStoreTestUtils.PartialExecutionGraphInfoMatcher(
+ expectedExecutionGraphInfo));
+ }
+ }
+
+ /** Tests that empty is returned if we request an unknown ApplicationID. */
+ @Test
+ void testUnknownGet() throws IOException {
+ try (final ArchivedApplicationStore archivedApplicationStore =
+ createDefaultArchivedApplicationStore()) {
+ assertNull(archivedApplicationStore.get(new ApplicationID()).orElse(null));
+ }
+ }
+
+ /** Tests that we obtain the correct jobs overview. */
+ @Test
+ void testJobsOverview() throws IOException {
+ final int numberArchivedApplications = 10;
+ final Collection archivedApplications =
+ generateTerminalArchivedApplications(numberArchivedApplications);
+
+ final List jobStatuses =
+ archivedApplications.stream()
+ .flatMap(
+ archivedApplication ->
+ archivedApplication.getJobs().values().stream())
+ .map(ExecutionGraphInfo::getArchivedExecutionGraph)
+ .map(ArchivedExecutionGraph::getState)
+ .collect(Collectors.toList());
+
+ final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses);
+
+ try (final ArchivedApplicationStore archivedApplicationStore =
+ createDefaultArchivedApplicationStore()) {
+ for (ArchivedApplication executionGraphInfo : archivedApplications) {
+ archivedApplicationStore.put(executionGraphInfo);
+ }
+
+ assertEquals(expectedJobsOverview, archivedApplicationStore.getJobsOverview());
+ }
+ }
+
+ /** Tests that we obtain the correct collection of available jobs. */
+ @Test
+ void testGetJobDetails() throws IOException {
+ final int numberArchivedApplications = 10;
+ final Collection archivedApplications =
+ generateTerminalArchivedApplications(numberArchivedApplications);
+
+ final Collection jobDetails = generateJobDetails(archivedApplications);
+
+ try (final ArchivedApplicationStore archivedApplicationStore =
+ createDefaultArchivedApplicationStore()) {
+ for (ArchivedApplication archivedApplication : archivedApplications) {
+ archivedApplicationStore.put(archivedApplication);
+ }
+
+ assertThat(
+ archivedApplicationStore.getJobDetails(),
+ Matchers.containsInAnyOrder(jobDetails.toArray()));
+ }
+ }
+
+ /** Tests that we obtain the correct collection of available application details. */
+ @Test
+ void testGetApplicationDetails() throws IOException {
+ final int numberArchivedApplications = 10;
+ final Collection archivedApplications =
+ generateTerminalArchivedApplications(numberArchivedApplications);
+
+ final Collection applicationDetails =
+ generateApplicationDetails(archivedApplications);
+
+ try (final ArchivedApplicationStore archivedApplicationStore =
+ createDefaultArchivedApplicationStore()) {
+ for (ArchivedApplication archivedApplication : archivedApplications) {
+ archivedApplicationStore.put(archivedApplication);
+ }
+
+ assertThat(
+ archivedApplicationStore.getApplicationDetails(),
+ Matchers.containsInAnyOrder(applicationDetails.toArray()));
+ }
+ }
+
+ /** Tests that an expired application is removed from the store. */
+ @Test
+ void testArchivedApplicationExpiration() throws Exception {
+ final ArchivedApplication archivedApplication =
+ generateTerminalArchivedApplications(1).iterator().next();
+ final ExecutionGraphInfo executionGraphInfo =
+ archivedApplication.getJobs().values().iterator().next();
+ final ApplicationID applicationId = archivedApplication.getApplicationId();
+ final JobID jobId = executionGraphInfo.getJobId();
+
+ final Duration expirationTime = Duration.ofMillis(1L);
+
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+
+ final ManualTicker manualTicker = new ManualTicker();
+
+ try (final MemoryArchivedApplicationStore archivedApplicationStore =
+ new MemoryArchivedApplicationStore(
+ expirationTime, Integer.MAX_VALUE, scheduledExecutor, manualTicker)) {
+
+ archivedApplicationStore.put(archivedApplication);
+
+ // there should one application
+ assertEquals(1, archivedApplicationStore.size());
+
+ manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS);
+
+ // this should trigger the cleanup after expiration
+ scheduledExecutor.triggerScheduledTasks();
+
+ // check that the store is empty
+ assertEquals(0, archivedApplicationStore.size());
+
+ assertNull(archivedApplicationStore.get(applicationId).orElse(null));
+
+ assertNull(archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null));
+ }
+ }
+
+ /** Tests that all applications are cleaned up after closing the store. */
+ @Test
+ void testCloseCleansUp() throws IOException {
+ final ArchivedApplication archivedApplication =
+ generateTerminalArchivedApplications(1).iterator().next();
+
+ try (final ArchivedApplicationStore archivedApplicationStore =
+ createDefaultArchivedApplicationStore()) {
+
+ assertEquals(0, archivedApplicationStore.size());
+
+ archivedApplicationStore.put(archivedApplication);
+
+ assertEquals(1, archivedApplicationStore.size());
+
+ archivedApplicationStore.close();
+ assertEquals(0, archivedApplicationStore.size());
+ }
+ }
+
+ /**
+ * Tests that the size of {@link ArchivedApplicationStore} is no more than the configured max
+ * capacity and the old applications will be purged if the total added number exceeds the max
+ * capacity.
+ */
+ @Test
+ void testMaximumCapacity() throws IOException {
+ final int maxCapacity = 10;
+ final int numberArchivedApplications = 10;
+
+ final Collection oldArchivedApplications =
+ generateTerminalArchivedApplications(numberArchivedApplications);
+ final Collection newArchivedApplications =
+ generateTerminalArchivedApplications(numberArchivedApplications);
+
+ final Collection applicationDetails =
+ generateApplicationDetails(newArchivedApplications);
+
+ try (final ArchivedApplicationStore archivedApplicationStore =
+ createArchivedApplicationStore(maxCapacity)) {
+
+ for (ArchivedApplication archivedApplication : oldArchivedApplications) {
+ archivedApplicationStore.put(archivedApplication);
+ // no more than the configured maximum capacity
+ assertTrue(archivedApplicationStore.size() <= maxCapacity);
+ }
+
+ for (ArchivedApplication archivedApplication : newArchivedApplications) {
+ archivedApplicationStore.put(archivedApplication);
+ // equals to the configured maximum capacity
+ assertEquals(maxCapacity, archivedApplicationStore.size());
+ }
+
+ // the older applications are purged
+ assertThat(
+ archivedApplicationStore.getApplicationDetails(),
+ Matchers.containsInAnyOrder(applicationDetails.toArray()));
+ }
+ }
+
+ ArchivedApplicationStore createDefaultArchivedApplicationStore() throws IOException {
+ return new MemoryArchivedApplicationStore();
+ }
+
+ ArchivedApplicationStore createArchivedApplicationStore(int maximumCapacity)
+ throws IOException {
+ return new MemoryArchivedApplicationStore(
+ Duration.ofHours(1),
+ maximumCapacity,
+ new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()),
+ Ticker.systemTicker());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java
deleted file mode 100644
index 668d8d6781512..0000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/** Tests for the {@link MemoryExecutionGraphInfoStore}. */
-public class MemoryExecutionGraphInfoStoreITCase extends TestLogger {
-
- @ClassRule
- public static final TestExecutorResource EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
-
- /** Tests that a session cluster can terminate gracefully when jobs are still running. */
- @Test
- public void testPutSuspendedJobOnClusterShutdown() throws Exception {
- Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.JOB_STORE_TYPE, JobManagerOptions.JobStoreType.Memory);
- try (final MiniCluster miniCluster =
- new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster(
- new MiniClusterConfiguration.Builder()
- .withRandomPorts()
- .setConfiguration(configuration)
- .build(),
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
- miniCluster.start();
- final JobVertex vertex = new JobVertex("blockingVertex");
- // The adaptive scheduler expects that every vertex has a configured parallelism
- vertex.setParallelism(1);
- vertex.setInvokableClass(
- ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class);
- final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
- miniCluster.submitJob(jobGraph);
- ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await();
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java
deleted file mode 100644
index cc6cd97734489..0000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.dispatcher;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
-import org.apache.flink.runtime.util.ManualTicker;
-import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
-
-import org.hamcrest.Matchers;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateJobDetails;
-import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/** Tests for the {@link MemoryExecutionGraphInfoStore}. */
-public class MemoryExecutionGraphInfoStoreTest extends TestLogger {
-
- @ClassRule
- public static final TestExecutorResource EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
-
- /**
- * Tests that we can put {@link ExecutionGraphInfo} into the {@link
- * MemoryExecutionGraphInfoStore} and that the graph is persisted.
- */
- @Test
- public void testPut() throws IOException {
- assertPutJobGraphWithStatus(JobStatus.FINISHED);
- }
-
- /** Tests that a SUSPENDED job can be persisted. */
- @Test
- public void testPutSuspendedJob() throws IOException {
- assertPutJobGraphWithStatus(JobStatus.SUSPENDED);
- }
-
- /** Tests that null is returned if we request an unknown JobID. */
- @Test
- public void testUnknownGet() throws IOException {
-
- try (final MemoryExecutionGraphInfoStore executionGraphStore =
- createMemoryExecutionGraphInfoStore()) {
- assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue());
- }
- }
-
- /** Tests that we obtain the correct jobs overview. */
- @Test
- public void testStoredJobsOverview() throws IOException {
- final int numberExecutionGraphs = 10;
- final Collection executionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
-
- final List jobStatuses =
- executionGraphInfos.stream()
- .map(ExecutionGraphInfo::getArchivedExecutionGraph)
- .map(ArchivedExecutionGraph::getState)
- .collect(Collectors.toList());
-
- final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses);
-
- try (final MemoryExecutionGraphInfoStore executionGraphInfoStore =
- createMemoryExecutionGraphInfoStore()) {
- for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- }
-
- assertThat(
- executionGraphInfoStore.getStoredJobsOverview(),
- Matchers.equalTo(expectedJobsOverview));
- }
- }
-
- /** Tests that we obtain the correct collection of available job details. */
- @Test
- public void testAvailableJobDetails() throws IOException {
- final int numberExecutionGraphs = 10;
- final Collection executionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
-
- final Collection jobDetails = generateJobDetails(executionGraphInfos);
-
- try (final MemoryExecutionGraphInfoStore executionGraphInfoStore =
- createMemoryExecutionGraphInfoStore()) {
- for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- }
-
- assertThat(
- executionGraphInfoStore.getAvailableJobDetails(),
- Matchers.containsInAnyOrder(jobDetails.toArray()));
- }
- }
-
- /** Tests that an expired execution graph is removed from the execution graph store. */
- @Test
- public void testExecutionGraphExpiration() throws Exception {
- final Duration expirationTime = Duration.ofMillis(1L);
-
- final ManuallyTriggeredScheduledExecutor scheduledExecutor =
- new ManuallyTriggeredScheduledExecutor();
-
- final ManualTicker manualTicker = new ManualTicker();
-
- try (final MemoryExecutionGraphInfoStore executionGraphInfoStore =
- new MemoryExecutionGraphInfoStore(
- expirationTime, Integer.MAX_VALUE, scheduledExecutor, manualTicker)) {
-
- final ExecutionGraphInfo executionGraphInfo =
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setState(JobStatus.FINISHED)
- .build());
-
- executionGraphInfoStore.put(executionGraphInfo);
-
- // there should one execution graph
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1));
-
- manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS);
-
- // this should trigger the cleanup after expiration
- scheduledExecutor.triggerScheduledTasks();
-
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
-
- assertThat(
- executionGraphInfoStore.get(executionGraphInfo.getJobId()),
- Matchers.nullValue());
-
- // check that the store is empty
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
- }
- }
-
- /** Tests that all job graphs are cleaned up after closing the store. */
- @Test
- public void testCloseCleansUp() throws IOException {
- try (final MemoryExecutionGraphInfoStore executionGraphInfoStore =
- createMemoryExecutionGraphInfoStore()) {
-
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
-
- executionGraphInfoStore.put(
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder()
- .setState(JobStatus.FINISHED)
- .build()));
-
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1));
-
- executionGraphInfoStore.close();
- assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0));
- }
- }
-
- /**
- * Tests that the size of {@link MemoryExecutionGraphInfoStore} is no more than the configured
- * max capacity and the old execution graphs will be purged if the total added number exceeds
- * the max capacity.
- */
- @Test
- public void testMaximumCapacity() throws IOException {
- final int maxCapacity = 10;
- final int numberExecutionGraphs = 10;
-
- final Collection oldExecutionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
- final Collection newExecutionGraphInfos =
- generateTerminalExecutionGraphInfos(numberExecutionGraphs);
-
- final Collection jobDetails = generateJobDetails(newExecutionGraphInfos);
-
- try (final MemoryExecutionGraphInfoStore executionGraphInfoStore =
- createMemoryExecutionGraphInfoStore(Duration.ofHours(1L), maxCapacity)) {
-
- for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- // no more than the configured maximum capacity
- assertTrue(executionGraphInfoStore.size() <= maxCapacity);
- }
-
- for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) {
- executionGraphInfoStore.put(executionGraphInfo);
- // equals to the configured maximum capacity
- assertEquals(maxCapacity, executionGraphInfoStore.size());
- }
-
- // the older execution graphs are purged
- assertThat(
- executionGraphInfoStore.getAvailableJobDetails(),
- Matchers.containsInAnyOrder(jobDetails.toArray()));
- }
- }
-
- private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws IOException {
- final ExecutionGraphInfo dummyExecutionGraphInfo =
- new ExecutionGraphInfo(
- new ArchivedExecutionGraphBuilder().setState(jobStatus).build());
- try (final MemoryExecutionGraphInfoStore executionGraphStore =
- createMemoryExecutionGraphInfoStore()) {
-
- // check that the graph store is empty
- assertThat(executionGraphStore.size(), Matchers.equalTo(0));
-
- executionGraphStore.put(dummyExecutionGraphInfo);
-
- // check that we have persisted the given execution graph
- assertThat(executionGraphStore.size(), Matchers.equalTo(1));
-
- assertThat(
- executionGraphStore.get(dummyExecutionGraphInfo.getJobId()),
- new ExecutionGraphInfoStoreTestUtils.PartialExecutionGraphInfoMatcher(
- dummyExecutionGraphInfo));
- }
- }
-
- private MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore() {
- return new MemoryExecutionGraphInfoStore();
- }
-
- private MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore(
- Duration expirationTime, int maximumCapacity) {
- return new MemoryExecutionGraphInfoStore(
- expirationTime,
- maximumCapacity,
- new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()),
- Ticker.systemTicker());
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 81241e1ed2ca5..36efcf9de9531 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -82,7 +82,7 @@ private TestingDispatcher(
@Nullable String metricServiceQueryAddress,
Executor ioExecutor,
HistoryServerArchivist historyServerArchivist,
- ExecutionGraphInfoStore executionGraphInfoStore,
+ ArchivedApplicationStore archivedApplicationStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
CleanupRunnerFactory cleanupRunnerFactory,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
@@ -102,7 +102,7 @@ private TestingDispatcher(
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
- executionGraphInfoStore,
+ archivedApplicationStore,
fatalErrorHandler,
historyServerArchivist,
metricServiceQueryAddress,
@@ -192,8 +192,8 @@ public static class Builder {
@Nullable private String metricServiceQueryAddress = null;
private Executor ioExecutor = ForkJoinPool.commonPool();
private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
- private ExecutionGraphInfoStore executionGraphInfoStore =
- new MemoryExecutionGraphInfoStore();
+ private ArchivedApplicationStore archivedApplicationStore =
+ new MemoryArchivedApplicationStore();
private JobManagerRunnerFactory jobManagerRunnerFactory =
new TestingJobMasterServiceLeadershipRunnerFactory();
private CleanupRunnerFactory cleanupRunnerFactory = new TestingCleanupRunnerFactory();
@@ -288,8 +288,9 @@ public Builder setHistoryServerArchivist(HistoryServerArchivist historyServerArc
return this;
}
- public Builder setExecutionGraphInfoStore(ExecutionGraphInfoStore executionGraphInfoStore) {
- this.executionGraphInfoStore = executionGraphInfoStore;
+ public Builder setArchivedApplicationStore(
+ ArchivedApplicationStore archivedApplicationStore) {
+ this.archivedApplicationStore = archivedApplicationStore;
return this;
}
@@ -359,7 +360,7 @@ public TestingDispatcher build(RpcService rpcService) throws Exception {
metricServiceQueryAddress,
ioExecutor,
historyServerArchivist,
- executionGraphInfoStore,
+ archivedApplicationStore,
jobManagerRunnerFactory,
cleanupRunnerFactory,
dispatcherBootstrapFactory,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
index c8004311eb118..fa11efafe3ce7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
@@ -55,7 +55,7 @@ public TestingPartialDispatcherServices(
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroupFactory jobManagerMetricGroupFactory,
- ExecutionGraphInfoStore executionGraphInfoStore,
+ ArchivedApplicationStore archivedApplicationStore,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@@ -69,7 +69,7 @@ public TestingPartialDispatcherServices(
blobServer,
heartbeatServices,
jobManagerMetricGroupFactory,
- executionGraphInfoStore,
+ archivedApplicationStore,
fatalErrorHandler,
historyServerArchivist,
metricQueryServiceAddress,
@@ -93,8 +93,8 @@ public static class Builder {
private HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
private JobManagerMetricGroupFactory jobManagerMetricGroupFactory =
UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup;
- private ExecutionGraphInfoStore executionGraphInfoStore =
- new MemoryExecutionGraphInfoStore();
+ private ArchivedApplicationStore archivedApplicationStore =
+ new MemoryArchivedApplicationStore();
private FatalErrorHandler fatalErrorHandler = NoOpFatalErrorHandler.INSTANCE;
private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
@Nullable private String metricQueryServiceAddress = null;
@@ -131,9 +131,9 @@ public Builder withJobManagerMetricGroupFactory(
return this;
}
- public Builder withExecutionGraphInfoStore(
- ExecutionGraphInfoStore executionGraphInfoStore) {
- this.executionGraphInfoStore = executionGraphInfoStore;
+ public Builder withArchivedApplicationStore(
+ ArchivedApplicationStore archivedApplicationStore) {
+ this.archivedApplicationStore = archivedApplicationStore;
return this;
}
@@ -178,7 +178,7 @@ public TestingPartialDispatcherServices build(
blobServer,
heartbeatServices,
jobManagerMetricGroupFactory,
- executionGraphInfoStore,
+ archivedApplicationStore,
fatalErrorHandler,
historyServerArchivist,
metricQueryServiceAddress,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index 7271011a49aad..0e4f14d168991 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -28,7 +28,7 @@
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
@@ -174,7 +174,7 @@ void testResourceCleanupUnderLeadershipChange() throws Exception {
blobServer,
new TestingHeartbeatServices(),
UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup,
- new MemoryExecutionGraphInfoStore(),
+ new MemoryArchivedApplicationStore(),
fatalErrorHandler,
VoidHistoryServerArchivist.INSTANCE,
null,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 86796fdeba595..80bff9e750069 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -25,8 +25,8 @@
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
@@ -389,9 +389,9 @@ private TestingEntryPoint(
}
@Override
- protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
+ protected ArchivedApplicationStore createArchivedApplicationStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) {
- return new MemoryExecutionGraphInfoStore();
+ return new MemoryArchivedApplicationStore();
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java
index ae5178310b242..d6d5887d41770 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java
@@ -20,7 +20,7 @@
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
@@ -97,7 +97,7 @@ protected ClusterTestingEntrypoint(Configuration configuration) {
}
@Override
- protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
+ protected ArchivedApplicationStore createArchivedApplicationStore(
Configuration configuration, ScheduledExecutor scheduledExecutor)
throws IOException {
return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index fe02100f73d06..f09807690f86e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -22,7 +22,7 @@
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -175,7 +175,7 @@ protected HighAvailabilityServices createHighAvailabilityServices(
heartbeatServices,
delegationTokenManager,
metricRegistry,
- new MemoryExecutionGraphInfoStore(),
+ new MemoryArchivedApplicationStore(),
metricQueryServiceRetriever,
Collections.emptySet(),
fatalErrorHandler);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java
index 5f99ff4b04425..055cc412c026c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java
@@ -80,7 +80,7 @@ void setUp() throws HandlerRequestException {
"Test Application",
ApplicationState.FINISHED,
new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
- Collections.emptyList());
+ Collections.emptyMap());
handlerRequest = createRequest(archivedApplication.getApplicationId());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java
index de8e41dede7e6..be699f8842513 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java
@@ -77,7 +77,7 @@ void setUp() throws HandlerRequestException {
"Test Application",
ApplicationState.FINISHED,
new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
- Collections.emptyList());
+ Collections.emptyMap());
testingRestfulGateway =
new TestingRestfulGateway.Builder()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
index eadbfd2cee52d..5d11e5772c228 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
@@ -21,8 +21,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -117,10 +117,10 @@ protected TestingClusterEntrypoint(Configuration configuration, File markerFile)
}
@Override
- protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
+ protected ArchivedApplicationStore createArchivedApplicationStore(
Configuration configuration, ScheduledExecutor scheduledExecutor)
throws IOException {
- return new MemoryExecutionGraphInfoStore();
+ return new MemoryArchivedApplicationStore();
}
@Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index fb109741f9454..1cf9590f14e5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -32,7 +32,7 @@
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
@@ -159,7 +159,7 @@ void testCancelingOnProcessFailure() throws Throwable {
new HeartbeatServicesImpl(100L, 10000L, 2),
new NoOpDelegationTokenManager(),
NoOpMetricRegistry.INSTANCE,
- new MemoryExecutionGraphInfoStore(),
+ new MemoryArchivedApplicationStore(),
VoidMetricQueryServiceRetriever.INSTANCE,
Collections.emptySet(),
fatalErrorHandler);