diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index 33a16edd6482e..d16e2058cc6ec 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -8,6 +8,30 @@ + +
completed-application-store.cache-size
+ 52428800 + Long + The cache size in bytes which is used to keep completed applications in memory. + + +
completed-application-store.expiration-time
+ 1 h + Duration + The time after which a completed application expires and is purged from the store. + + +
completed-application-store.max-capacity
+ infinite + Integer + The max number of completed applications that can be kept in the store. NOTICE: if memory store keeps too many applications in session cluster, it may cause FullGC or OOM in jm. + + +
completed-application-store.type
+ File +

Enum

+ Determines which store implementation is used in session cluster. Accepted values are:

Possible values: +
jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling
30 s @@ -134,30 +158,6 @@

Enum

Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.

Possible values: - -
jobstore.cache-size
- 52428800 - Long - The job store cache size in bytes which is used to keep completed jobs in memory. - - -
jobstore.expiration-time
- 3600 - Long - The time in seconds after which a completed job expires and is purged from the job store. - - -
jobstore.max-capacity
- infinite - Integer - The max number of completed jobs that can be kept in the job store. NOTICE: if memory store keeps too many jobs in session cluster, it may cause FullGC or OOM in jm. - - -
jobstore.type
- File -

Enum

- Determines which job store implementation is used in session cluster. Accepted values are:

Possible values: -
web.exception-history-size
16 diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html index c7669226f1786..d197b268780c9 100644 --- a/docs/layouts/shortcodes/generated/job_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html @@ -8,6 +8,30 @@ + +
completed-application-store.cache-size
+ 52428800 + Long + The cache size in bytes which is used to keep completed applications in memory. + + +
completed-application-store.expiration-time
+ 1 h + Duration + The time after which a completed application expires and is purged from the store. + + +
completed-application-store.max-capacity
+ infinite + Integer + The max number of completed applications that can be kept in the store. NOTICE: if memory store keeps too many applications in session cluster, it may cause FullGC or OOM in jm. + + +
completed-application-store.type
+ File +

Enum

+ Determines which store implementation is used in session cluster. Accepted values are:

Possible values: +
jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling
30 s @@ -188,30 +212,6 @@

Enum

Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler.

Possible values: - -
jobstore.cache-size
- 52428800 - Long - The job store cache size in bytes which is used to keep completed jobs in memory. - - -
jobstore.expiration-time
- 3600 - Long - The time in seconds after which a completed job expires and is purged from the job store. - - -
jobstore.max-capacity
- infinite - Integer - The max number of completed jobs that can be kept in the job store. NOTICE: if memory store keeps too many jobs in session cluster, it may cause FullGC or OOM in jm. - - -
jobstore.type
- File -

Enum

- Determines which job store implementation is used in session cluster. Accepted values are:

Possible values: -
scheduler-mode
(none) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java index 745e1421790be..c6953d95f51ed 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationClusterEntryPoint.java @@ -26,8 +26,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; @@ -78,9 +78,9 @@ protected ApplicationClusterEntryPoint( } @Override - protected ExecutionGraphInfoStore createSerializableExecutionGraphStore( + protected ArchivedApplicationStore createArchivedApplicationStore( final Configuration configuration, final ScheduledExecutor scheduledExecutor) { - return new MemoryExecutionGraphInfoStore(); + return new MemoryArchivedApplicationStore(); } protected static void configureExecution( diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 85f048edd32ea..065de7e63d320 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -308,8 +308,11 @@ public class JobManagerOptions { .withDescription( "Directory for JobManager to store the archives of completed jobs."); - /** The job store cache size in bytes which is used to keep completed jobs in memory. */ - @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) + /** + * @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_CACHE_SIZE} + */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption JOB_STORE_CACHE_SIZE = key("jobstore.cache-size") .longType() @@ -317,7 +320,11 @@ public class JobManagerOptions { .withDescription( "The job store cache size in bytes which is used to keep completed jobs in memory."); - /** The time in seconds after which a completed job expires and is purged from the job store. */ + /** + * @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_EXPIRATION_TIME} + */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) public static final ConfigOption JOB_STORE_EXPIRATION_TIME = key("jobstore.expiration-time") @@ -326,9 +333,11 @@ public class JobManagerOptions { .withDescription( "The time in seconds after which a completed job expires and is purged from the job store."); - /** The max number of completed jobs that can be kept in the job store. */ - @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) - @Documentation.OverrideDefault("infinite") + /** + * @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_MAX_CAPACITY} + */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption JOB_STORE_MAX_CAPACITY = key("jobstore.max-capacity") .intType() @@ -337,8 +346,11 @@ public class JobManagerOptions { "The max number of completed jobs that can be kept in the job store. " + "NOTICE: if memory store keeps too many jobs in session cluster, it may cause FullGC or OOM in jm."); - /** Config parameter determining the job store implementation in session cluster. */ - @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) + /** + * @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_TYPE} + */ + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecated") public static final ConfigOption JOB_STORE_TYPE = key("jobstore.type") .enumType(JobStoreType.class) @@ -362,6 +374,74 @@ public enum JobStoreType { Memory } + /** The cache size in bytes which is used to keep completed applications in memory. */ + @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) + public static final ConfigOption COMPLETED_APPLICATION_STORE_CACHE_SIZE = + key("completed-application-store.cache-size") + .longType() + .defaultValue(50L * 1024L * 1024L) + .withDeprecatedKeys("jobstore.cache-size") + .withDescription( + "The cache size in bytes which is used to keep completed applications in memory."); + + /** The time after which a completed application expires and is purged from the store. */ + @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) + public static final ConfigOption COMPLETED_APPLICATION_STORE_EXPIRATION_TIME = + key("completed-application-store.expiration-time") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription( + "The time after which a completed application expires and is purged from the store."); + + /** The max number of completed applications that can be kept in the store. */ + @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) + @Documentation.OverrideDefault("infinite") + public static final ConfigOption COMPLETED_APPLICATION_STORE_MAX_CAPACITY = + key("completed-application-store.max-capacity") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDeprecatedKeys("jobstore.max-capacity") + .withDescription( + "The max number of completed applications that can be kept in the store. " + + "NOTICE: if memory store keeps too many applications in session cluster, it may cause FullGC or OOM in jm."); + + /** + * Config parameter determining the store implementation in session cluster. + * + *

FLINK-38845 replaces job store with application store because every job is now associated + * with an application. The legacy job store expires jobs individually, risking partial loss of + * an application's job history and breaking application-level consistency. The application + * store manages and expires applications (with all its jobs) as a whole, ensuring complete, + * consistent, and queryable application state until explicitly discarded. + */ + @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) + public static final ConfigOption + COMPLETED_APPLICATION_STORE_TYPE = + key("completed-application-store.type") + .enumType(ArchivedApplicationStoreType.class) + .defaultValue(ArchivedApplicationStoreType.File) + .withDeprecatedKeys("jobstore.type") + .withDescription( + Description.builder() + .text( + "Determines which store implementation is used in session cluster. Accepted values are:") + .list( + text( + "'File': the file store keeps the completed applications in files"), + text( + "'Memory': the memory store keeps the completed applications in memory. You" + + " may need to limit the %s to mitigate FullGC or OOM when there are too many applications", + code( + COMPLETED_APPLICATION_STORE_MAX_CAPACITY + .key()))) + .build()); + + /** Type of archived application store implementation. */ + public enum ArchivedApplicationStoreType { + File, + Memory + } + /** * Flag indicating whether JobManager would retrieve canonical host name of TaskManager during * registration. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java index 4acadee9a2d20..aef3f17b893d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java @@ -31,10 +31,12 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -63,6 +65,14 @@ public abstract class AbstractApplication implements Serializable { private final Set jobs = new HashSet<>(); + /** + * List of registered application status listeners that will be notified via {@link + * ApplicationStatusListener#notifyApplicationStatusChange} when the application state changes. + * For example, the Dispatcher registers itself as a listener to perform operations such as + * archiving when the application reaches a terminal state. + */ + private final transient List statusListeners = new ArrayList<>(); + public AbstractApplication(ApplicationID applicationId) { this.applicationId = checkNotNull(applicationId); this.statusTimestamps = new long[ApplicationState.values().length]; @@ -130,6 +140,15 @@ public ApplicationState getApplicationStatus() { return applicationState; } + /** + * Registers a status listener. + * + *

This method is not thread-safe and should not be called concurrently. + */ + public void registerStatusListener(ApplicationStatusListener listener) { + statusListeners.add(listener); + } + // ------------------------------------------------------------------------ // State Transitions // ------------------------------------------------------------------------ @@ -196,6 +215,8 @@ void transitionState(ApplicationState targetState) { targetState); this.statusTimestamps[targetState.ordinal()] = System.currentTimeMillis(); this.applicationState = targetState; + statusListeners.forEach( + listener -> listener.notifyApplicationStatusChange(applicationId, targetState)); } private void validateTransition(ApplicationState targetState) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationStatusListener.java new file mode 100644 index 0000000000000..b4c9996af8085 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ApplicationStatusListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; + +/** Interface for listeners that monitor the status of an application. */ +public interface ApplicationStatusListener { + + /** + * This method is called whenever the status of the application changes. + * + * @param applicationId The ID of the job. + * @param newStatus The status the application switched to. + */ + void notifyApplicationStatusChange(ApplicationID applicationId, ApplicationState newStatus); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java index f9c2fa8b85871..771366bc22aef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/application/ArchivedApplication.java @@ -20,11 +20,12 @@ import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.ApplicationState; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import java.io.Serializable; import java.util.Arrays; -import java.util.Collection; +import java.util.Map; /** Read-only information about an {@link AbstractApplication}. */ public class ArchivedApplication implements Serializable { @@ -39,14 +40,14 @@ public class ArchivedApplication implements Serializable { private final long[] statusTimestamps; - private final Collection jobs; + private final Map jobs; public ArchivedApplication( ApplicationID applicationId, String applicationName, ApplicationState applicationState, long[] statusTimestamps, - Collection jobs) { + Map jobs) { this.applicationId = applicationId; this.applicationName = applicationName; this.applicationState = applicationState; @@ -70,7 +71,7 @@ public long getStatusTimestamp(ApplicationState status) { return this.statusTimestamps[status.ordinal()]; } - public Collection getJobs() { + public Map getJobs() { return jobs; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStore.java similarity index 52% rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStore.java index 1d6356773a485..a5b6315466303 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStore.java @@ -18,64 +18,71 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import javax.annotation.Nullable; - import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.Optional; -/** Interface for a {@link ExecutionGraphInfo} store. */ -public interface ExecutionGraphInfoStore extends Closeable { +/** Interface for a {@link ArchivedApplication} store. */ +public interface ArchivedApplicationStore extends Closeable { /** - * Returns the current number of stored {@link ExecutionGraphInfo} instances. + * Returns the current number of stored {@link ArchivedApplication} instances. * - * @return Current number of stored {@link ExecutionGraphInfo} instances + * @return Current number of stored {@link ArchivedApplication} instances */ int size(); /** - * Get the {@link ExecutionGraphInfo} for the given job id. Null if it isn't stored. + * Get the {@link ArchivedApplication} for the given application id. Empty if it isn't stored. * - * @param jobId identifying the serializable execution graph to retrieve - * @return The stored serializable execution graph or null + * @param applicationId identifying the serializable application to retrieve + * @return The stored serializable application or empty */ - @Nullable - ExecutionGraphInfo get(JobID jobId); + Optional get(ApplicationID applicationId); /** - * Store the given {@link ExecutionGraphInfo} in the store. + * Store the given {@link ArchivedApplication} in the store. * - * @param executionGraphInfo to store - * @throws IOException if the serializable execution graph could not be stored in the store + * @param archivedApplication to store + * @throws IOException if the serializable archived application could not be stored in the store */ - void put(ExecutionGraphInfo executionGraphInfo) throws IOException; + void put(ArchivedApplication archivedApplication) throws IOException; /** - * Return the {@link JobsOverview} for all stored/past jobs. + * Return the collection of {@link ApplicationDetails} of all currently stored applications. * - * @return Jobs overview for all stored/past jobs + * @return Collection of application details of all currently stored applications */ - JobsOverview getStoredJobsOverview(); + Collection getApplicationDetails(); + + /** + * Get the {@link ExecutionGraphInfo} for the given job id. Empty if it isn't stored. + * + * @param jobId identifying the serializable execution graph to retrieve + * @return The stored serializable execution graph or empty + */ + Optional getExecutionGraphInfo(JobID jobId); /** * Return the collection of {@link JobDetails} of all currently stored jobs. * * @return Collection of job details of all currently stored jobs */ - Collection getAvailableJobDetails(); + Collection getJobDetails(); /** - * Return the {@link JobDetails}} for the given job. + * Return the {@link JobsOverview} for all stored/past jobs. * - * @param jobId identifying the job for which to retrieve the {@link JobDetails} - * @return {@link JobDetails} of the requested job or null if the job is not available + * @return Jobs overview for all stored/past jobs */ - @Nullable - JobDetails getAvailableJobDetails(JobID jobId); + JobsOverview getJobsOverview(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index b31a743793ffe..05e74a1f20e79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -39,6 +39,7 @@ import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.application.AbstractApplication; +import org.apache.flink.runtime.application.ApplicationStatusListener; import org.apache.flink.runtime.application.ArchivedApplication; import org.apache.flink.runtime.application.SingleJobApplication; import org.apache.flink.runtime.blob.BlobServer; @@ -143,6 +144,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -153,7 +155,7 @@ * case of a master failure. Furthermore, it knows about the state of the Flink session cluster. */ public abstract class Dispatcher extends FencedRpcEndpoint - implements DispatcherGateway { + implements DispatcherGateway, ApplicationStatusListener { @VisibleForTesting @Internal public static final ConfigOption CLIENT_ALIVENESS_CHECK_DURATION = @@ -187,7 +189,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint private final DispatcherBootstrapFactory dispatcherBootstrapFactory; - private final ExecutionGraphInfoStore executionGraphInfoStore; + private final ArchivedApplicationStore archivedApplicationStore; private final JobManagerRunnerFactory jobManagerRunnerFactory; private final CleanupRunnerFactory cleanupRunnerFactory; @@ -229,6 +231,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint private final Map> recoveredApplicationJobIds = new HashMap<>(); + /** ExecutionGraphInfo for the terminated job whose application is not terminated yet. */ + private final Map> partialExecutionGraphInfoStore = + new HashMap<>(); + /** Enum to distinguish between initial job submission and re-submission for recovery. */ protected enum ExecutionType { SUBMISSION, @@ -311,7 +317,7 @@ protected Dispatcher( this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist(); - this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore(); + this.archivedApplicationStore = dispatcherServices.getArchivedApplicationStore(); this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory(); this.cleanupRunnerFactory = dispatcherServices.getCleanupRunnerFactory(); @@ -659,6 +665,7 @@ private CompletableFuture internalSubmitApplication( if (jobs != null) { jobs.forEach(application::addJob); } + application.registerStatusListener(this); return application.execute( getSelfGateway(DispatcherGateway.class), getRpcService().getScheduledExecutor(), @@ -666,6 +673,72 @@ private CompletableFuture internalSubmitApplication( this::onFatalError); } + @Override + public void notifyApplicationStatusChange( + ApplicationID applicationId, ApplicationState newStatus) { + if (newStatus.isTerminalState()) { + checkState( + applications.containsKey(applicationId), + "Application %s does not exist.", + applicationId); + + AbstractApplication application = applications.get(applicationId); + long[] stateTimestamps = new long[ApplicationState.values().length]; + for (ApplicationState applicationState : ApplicationState.values()) { + final int ordinal = applicationState.ordinal(); + stateTimestamps[ordinal] = application.getStatusTimestamp(applicationState); + } + + List> jobFutures = + application.getJobs().stream() + .map( + jobId -> { + checkState( + partialExecutionGraphInfoStore.containsKey(jobId), + "ExecutionGraphInfo for job %s does not exist.", + jobId); + return partialExecutionGraphInfoStore.get(jobId); + }) + .collect(Collectors.toList()); + + // wait for all jobs to be stored + FutureUtils.combineAll(jobFutures) + .thenAcceptAsync( + combinedJobs -> { + Map jobs = new HashMap<>(); + for (ExecutionGraphInfo executionGraphInfo : combinedJobs) { + jobs.put(executionGraphInfo.getJobId(), executionGraphInfo); + partialExecutionGraphInfoStore.remove( + executionGraphInfo.getJobId()); + } + + ArchivedApplication archivedApplication = + new ArchivedApplication( + application.getApplicationId(), + application.getName(), + application.getApplicationStatus(), + stateTimestamps, + jobs); + + applications.remove(applicationId); + writeToArchivedApplicationStore(archivedApplication); + }, + getMainThreadExecutor()); + } + } + + private void writeToArchivedApplicationStore(ArchivedApplication archivedApplication) { + try { + archivedApplicationStore.put(archivedApplication); + } catch (IOException e) { + log.info( + "Could not store completed application {}({}).", + archivedApplication.getApplicationName(), + archivedApplication.getApplicationId(), + e); + } + } + @VisibleForTesting Map getApplications() { return applications; @@ -811,6 +884,8 @@ private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionTy final JobID jobId = jobManagerRunner.getJobID(); + partialExecutionGraphInfoStore.put(jobId, new CompletableFuture<>()); + final CompletableFuture cleanupJobStateFuture = jobManagerRunner .getResultFuture() @@ -947,8 +1022,10 @@ public CompletableFuture cancelJob(JobID jobId, Duration timeout) { return maybeJob.get().cancel(timeout); } - final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); - if (executionGraphInfo != null) { + final Optional optionalExecutionGraphInfo = + getExecutionGraphInfoFromStore(jobId); + if (optionalExecutionGraphInfo.isPresent()) { + final ExecutionGraphInfo executionGraphInfo = optionalExecutionGraphInfo.get(); final JobStatus jobStatus = executionGraphInfo.getArchivedExecutionGraph().getState(); if (jobStatus == JobStatus.CANCELED) { return CompletableFuture.completedFuture(Acknowledge.get()); @@ -1001,7 +1078,7 @@ public CompletableFuture requestClusterOverview(Duration timeou CompletableFuture> allJobsFuture = allOptionalJobsFuture.thenApply(this::flattenOptionalCollection); - final JobsOverview completedJobsOverview = executionGraphInfoStore.getStoredJobsOverview(); + final JobsOverview completedJobsOverview = getCompletedJobsOverview(); return allJobsFuture.thenCombine( taskManagerOverviewFuture, @@ -1012,6 +1089,16 @@ public CompletableFuture requestClusterOverview(Duration timeou }); } + private JobsOverview getCompletedJobsOverview() { + Collection allJobStatus = + getPartialExecutionGraphInfo() + .map(ExecutionGraphInfo::getArchivedExecutionGraph) + .map(ArchivedExecutionGraph::getState) + .collect(Collectors.toList()); + return JobsOverview.create(allJobStatus) + .combine(archivedApplicationStore.getJobsOverview()); + } + @Override public CompletableFuture requestMultipleJobDetails(Duration timeout) { List>> individualOptionalJobDetails = @@ -1024,8 +1111,7 @@ public CompletableFuture requestMultipleJobDetails(Duration CompletableFuture> combinedJobDetails = optionalCombinedJobDetails.thenApply(this::flattenOptionalCollection); - final Collection completedJobDetails = - executionGraphInfoStore.getAvailableJobDetails(); + final Collection completedJobDetails = getCompletedJobDetails(); return combinedJobDetails.thenApply( (Collection runningJobDetails) -> { @@ -1049,6 +1135,34 @@ public CompletableFuture requestMultipleJobDetails(Duration }); } + private Collection getCompletedJobDetails() { + return Stream.concat( + getPartialExecutionGraphInfo() + .map(ExecutionGraphInfo::getArchivedExecutionGraph) + .map(JobDetails::createDetailsForJob), + archivedApplicationStore.getJobDetails().stream()) + .collect(Collectors.toList()); + } + + private Stream getPartialExecutionGraphInfo() { + return partialExecutionGraphInfoStore.values().stream() + .filter(CompletableFuture::isDone) + .map( + executionGraphInfoFuture -> { + try { + ExecutionGraphInfo executionGraphInfo = + executionGraphInfoFuture.getNow(null); + if (executionGraphInfo != null) { + return executionGraphInfo; + } + } catch (Exception e) { + log.error("ExecutionGraphInfo future completed with exception", e); + } + return null; + }) + .filter(Objects::nonNull); + } + @Override public CompletableFuture requestMultipleApplicationDetails( Duration timeout) { @@ -1061,23 +1175,38 @@ public CompletableFuture requestMultipleApplication ApplicationDetails ::fromArchivedApplication)) .collect(Collectors.toList()); + + final Collection completedApplicationDetails = + archivedApplicationStore.getApplicationDetails(); + return FutureUtils.combineAll(applicationDetailsFutures) .thenCompose( - combinedApplicationDetails -> - CompletableFuture.completedFuture( - new MultipleApplicationsDetails( - combinedApplicationDetails))); + runningApplicationDetails -> { + Collection combinedApplicationDetails = + Stream.concat( + runningApplicationDetails.stream(), + completedApplicationDetails.stream()) + .collect(Collectors.toList()); + return CompletableFuture.completedFuture( + new MultipleApplicationsDetails(combinedApplicationDetails)); + }); } @Override public CompletableFuture requestApplication( ApplicationID applicationId, Duration timeout) { - if (!applications.containsKey(applicationId)) { - return FutureUtils.completedExceptionally( - new FlinkApplicationNotFoundException(applicationId)); + if (applications.containsKey(applicationId)) { + return requestApplication(applications.get(applicationId), timeout); } - return requestApplication(applications.get(applicationId), timeout); + // is it a completed application? + return archivedApplicationStore + .get(applicationId) + .map(CompletableFuture::completedFuture) + .orElseGet( + () -> + FutureUtils.completedExceptionally( + new FlinkApplicationNotFoundException(applicationId))); } private CompletableFuture requestApplication( @@ -1088,28 +1217,9 @@ private CompletableFuture requestApplication( stateTimestamps[ordinal] = application.getStatusTimestamp(applicationState); } - List> jobFutures = + List> jobFutures = application.getJobs().stream() - .map( - jobId -> - requestExecutionGraphInfo(jobId, timeout) - .thenApply( - ExecutionGraphInfo - ::getArchivedExecutionGraph) - .exceptionally( - t -> { - if (t - instanceof - FlinkJobNotFoundException) { - log.warn( - "Ignore job {} which may be expired when requesting application {}", - jobId, - application - .getApplicationId()); - return null; - } - throw new CompletionException(t); - })) + .map(jobId -> requestExecutionGraphInfo(jobId, timeout)) .collect(Collectors.toList()); return FutureUtils.combineAll(jobFutures) @@ -1122,25 +1232,12 @@ private CompletableFuture requestApplication( application.getApplicationStatus(), stateTimestamps, combinedJobs.stream() - .filter(Objects::nonNull) - .collect(Collectors.toSet())))); - } - - private CompletableFuture requestJobDetails(JobID jobId, Duration timeout) { - Optional maybeJob = getJobManagerRunner(jobId); - return maybeJob.map(job -> job.requestJobDetails(timeout)) - .orElseGet( - () -> { - // is it a completed job? - final JobDetails jobDetails = - executionGraphInfoStore.getAvailableJobDetails(jobId); - if (jobDetails == null) { - return FutureUtils.completedExceptionally( - new FlinkJobNotFoundException(jobId)); - } else { - return CompletableFuture.completedFuture(jobDetails); - } - }); + .collect( + Collectors.toMap( + ExecutionGraphInfo + ::getJobId, + executionGraphInfo -> + executionGraphInfo))))); } @Override @@ -1150,14 +1247,19 @@ public CompletableFuture requestJobStatus(JobID jobId, Duration timeo .orElseGet( () -> { // is it a completed job? - final JobDetails jobDetails = - executionGraphInfoStore.getAvailableJobDetails(jobId); - if (jobDetails == null) { - return FutureUtils.completedExceptionally( - new FlinkJobNotFoundException(jobId)); - } else { - return CompletableFuture.completedFuture(jobDetails.getStatus()); - } + final Optional optionalJobDetails = + getExecutionGraphInfoFromStore(jobId) + .map(ExecutionGraphInfo::getArchivedExecutionGraph) + .map(JobDetails::createDetailsForJob); + return optionalJobDetails + .map( + jobDetails -> + CompletableFuture.completedFuture( + jobDetails.getStatus())) + .orElseGet( + () -> + FutureUtils.completedExceptionally( + new FlinkJobNotFoundException(jobId))); }); } @@ -1172,12 +1274,31 @@ public CompletableFuture requestExecutionGraphInfo( private ExecutionGraphInfo getExecutionGraphInfoFromStore(Throwable t, JobID jobId) { // check whether it is a completed job - final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); - if (executionGraphInfo == null) { - throw new CompletionException(ExceptionUtils.stripCompletionException(t)); - } else { - return executionGraphInfo; + Optional optionalExecutionGraphInfo = + getExecutionGraphInfoFromStore(jobId); + if (optionalExecutionGraphInfo.isPresent()) { + return optionalExecutionGraphInfo.get(); } + + throw new CompletionException(ExceptionUtils.stripCompletionException(t)); + } + + private Optional getExecutionGraphInfoFromStore(JobID jobId) { + final CompletableFuture executionGraphInfoFuture = + partialExecutionGraphInfoStore.get(jobId); + if (executionGraphInfoFuture != null && executionGraphInfoFuture.isDone()) { + try { + ExecutionGraphInfo executionGraphInfo = executionGraphInfoFuture.getNow(null); + if (executionGraphInfo != null) { + return Optional.of(executionGraphInfo); + } + } catch (Exception e) { + log.error("Error while retrieving ExecutionGraphInfo for job {}", jobId, e); + } + } + + // check whether the job belongs to a completed application + return archivedApplicationStore.getExecutionGraphInfo(jobId); } @Override @@ -1195,14 +1316,20 @@ public CompletableFuture requestCheckpointStats( @Override public CompletableFuture requestJobResult(JobID jobId, Duration timeout) { if (!jobManagerRunnerRegistry.isRegistered(jobId)) { - final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); - - if (executionGraphInfo == null) { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); - } else { - return CompletableFuture.completedFuture( - JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph())); - } + final Optional optionalExecutionGraphInfo = + getExecutionGraphInfoFromStore(jobId); + + return optionalExecutionGraphInfo + .map( + executionGraphInfo -> + CompletableFuture.completedFuture( + JobResult.createFrom( + executionGraphInfo + .getArchivedExecutionGraph()))) + .orElseGet( + () -> + FutureUtils.completedExceptionally( + new FlinkJobNotFoundException(jobId))); } final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId); @@ -1732,16 +1859,28 @@ private CompletableFuture createDirtyJobResultEntryAsync( new JobResultEntry(JobResult.createFrom(executionGraph))); } + /** + * Writes the ExecutionGraphInfo to the temporary store, which is for jobs whose associated + * application has not yet reached a terminal state. The ExecutionGraphInfo will be transferred + * to the ArchivedApplicationStore together with the ArchivedApplication when the application + * reaches a terminal state. + * + * @param executionGraphInfo the execution graph information to be stored temporarily + */ private void writeToExecutionGraphInfoStore(ExecutionGraphInfo executionGraphInfo) { - try { - executionGraphInfoStore.put(executionGraphInfo); - } catch (IOException e) { - log.info( - "Could not store completed job {}({}).", - executionGraphInfo.getArchivedExecutionGraph().getJobName(), - executionGraphInfo.getArchivedExecutionGraph().getJobID(), - e); + final JobID jobId = executionGraphInfo.getJobId(); + if (!partialExecutionGraphInfoStore.containsKey(jobId)) { + // This can only occur in tests for job termination without submitting the job + final CompletableFuture executionGraphInfoFuture = + new CompletableFuture<>(); + executionGraphInfoFuture.complete(executionGraphInfo); + partialExecutionGraphInfoStore.put(jobId, executionGraphInfoFuture); + return; } + + partialExecutionGraphInfoStore + .get(executionGraphInfo.getJobId()) + .complete(executionGraphInfo); } private CompletableFuture archiveExecutionGraphToHistoryServer( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java index 600573b52dbe6..a4faf1300fe16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java @@ -52,7 +52,7 @@ public class DispatcherServices { private final JobManagerMetricGroup jobManagerMetricGroup; - private final ExecutionGraphInfoStore executionGraphInfoStore; + private final ArchivedApplicationStore archivedApplicationStroe; private final FatalErrorHandler fatalErrorHandler; @@ -80,7 +80,7 @@ public class DispatcherServices { GatewayRetriever resourceManagerGatewayRetriever, BlobServer blobServer, HeartbeatServices heartbeatServices, - ExecutionGraphInfoStore executionGraphInfoStore, + ArchivedApplicationStore archivedApplicationStroe, FatalErrorHandler fatalErrorHandler, HistoryServerArchivist historyServerArchivist, @Nullable String metricQueryServiceAddress, @@ -100,8 +100,8 @@ public class DispatcherServices { resourceManagerGatewayRetriever, "ResourceManagerGatewayRetriever"); this.blobServer = Preconditions.checkNotNull(blobServer, "BlobServer"); this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices, "HeartBeatServices"); - this.executionGraphInfoStore = - Preconditions.checkNotNull(executionGraphInfoStore, "ExecutionGraphInfoStore"); + this.archivedApplicationStroe = + Preconditions.checkNotNull(archivedApplicationStroe, "ArchivedApplicationStore"); this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler, "FatalErrorHandler"); this.historyServerArchivist = Preconditions.checkNotNull(historyServerArchivist, "HistoryServerArchivist"); @@ -143,8 +143,8 @@ public JobManagerMetricGroup getJobManagerMetricGroup() { return jobManagerMetricGroup; } - public ExecutionGraphInfoStore getArchivedExecutionGraphStore() { - return executionGraphInfoStore; + public ArchivedApplicationStore getArchivedApplicationStore() { + return archivedApplicationStroe; } public FatalErrorHandler getFatalErrorHandler() { @@ -200,8 +200,7 @@ public static DispatcherServices from( .getResourceManagerGatewayRetriever(), partialDispatcherServicesWithJobPersistenceComponents.getBlobServer(), partialDispatcherServicesWithJobPersistenceComponents.getHeartbeatServices(), - partialDispatcherServicesWithJobPersistenceComponents - .getArchivedExecutionGraphStore(), + partialDispatcherServicesWithJobPersistenceComponents.getArchivedApplicationStore(), partialDispatcherServicesWithJobPersistenceComponents.getFatalErrorHandler(), partialDispatcherServicesWithJobPersistenceComponents.getHistoryServerArchivist(), partialDispatcherServicesWithJobPersistenceComponents diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java new file mode 100644 index 0000000000000..002c816b88057 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStore.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.apache.flink.shaded.guava33.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava33.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava33.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava33.com.google.common.cache.RemovalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Store for {@link ArchivedApplication} instances. The store writes the archived application to + * disk and keeps the most recently used applications in a memory cache for faster serving. + * Moreover, the stored applications are periodically cleaned up. + */ +public class FileArchivedApplicationStore implements ArchivedApplicationStore { + + private static final Logger LOG = LoggerFactory.getLogger(FileArchivedApplicationStore.class); + + private final File storageDir; + + private final Cache applicationDetailsCache; + + private final Cache jobDetailsCache; + + private final LoadingCache archivedApplicationCache; + + private final Map jobIdToApplicationId = new HashMap<>(); + + private final Map> applicationIdToJobIds = new HashMap<>(); + + private final ScheduledFuture cleanupFuture; + + private int numFinishedJobs; + + private int numFailedJobs; + + private int numCanceledJobs; + + public FileArchivedApplicationStore( + File rootDir, + Duration expirationTime, + int maximumCapacity, + long maximumCacheSizeBytes, + ScheduledExecutor scheduledExecutor, + Ticker ticker) + throws IOException { + + final File storageDirectory = initArchivedApplicationStorageDirectory(rootDir); + + LOG.info( + "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", + FileArchivedApplicationStore.class.getSimpleName(), + storageDirectory, + expirationTime.toMillis(), + maximumCacheSizeBytes); + + this.storageDir = Preconditions.checkNotNull(storageDirectory); + Preconditions.checkArgument( + storageDirectory.exists() && storageDirectory.isDirectory(), + "The storage directory must exist and be a directory."); + this.applicationDetailsCache = + CacheBuilder.newBuilder() + .expireAfterWrite(expirationTime.toMillis(), TimeUnit.MILLISECONDS) + .maximumSize(maximumCapacity) + .removalListener( + (RemovalListener) + notification -> + deleteArchivedApplication(notification.getKey())) + .ticker(ticker) + .build(); + this.jobDetailsCache = CacheBuilder.newBuilder().build(); + + this.archivedApplicationCache = + CacheBuilder.newBuilder() + .maximumWeight(maximumCacheSizeBytes) + .weigher(this::calculateSize) + .build( + new CacheLoader() { + @Override + public ArchivedApplication load(ApplicationID applicationId) + throws Exception { + return loadArchivedApplication(applicationId); + } + }); + + this.cleanupFuture = + scheduledExecutor.scheduleWithFixedDelay( + () -> { + applicationDetailsCache.cleanUp(); + jobDetailsCache.cleanUp(); + }, + expirationTime.toMillis(), + expirationTime.toMillis(), + TimeUnit.MILLISECONDS); + + this.numFinishedJobs = 0; + this.numFailedJobs = 0; + this.numCanceledJobs = 0; + } + + @Override + public int size() { + return Math.toIntExact(applicationDetailsCache.size()); + } + + @Override + public Optional get(ApplicationID applicationId) { + try { + return Optional.of(archivedApplicationCache.get(applicationId)); + } catch (ExecutionException e) { + LOG.debug( + "Could not load archived application for application id {}.", applicationId, e); + return Optional.empty(); + } + } + + @Override + public void put(ArchivedApplication archivedApplication) throws IOException { + final ApplicationID applicationId = archivedApplication.getApplicationId(); + + final ApplicationState status = archivedApplication.getApplicationStatus(); + final String name = archivedApplication.getApplicationName(); + + Preconditions.checkArgument( + status.isTerminalState(), + "The application " + + name + + '(' + + applicationId + + ") is not in a terminal state. Instead it is in state " + + status + + '.'); + + for (ExecutionGraphInfo executionGraphInfo : archivedApplication.getJobs().values()) { + final JobID jobId = executionGraphInfo.getArchivedExecutionGraph().getJobID(); + + final JobStatus jobStatus = executionGraphInfo.getArchivedExecutionGraph().getState(); + switch (jobStatus) { + case FINISHED: + numFinishedJobs++; + break; + case CANCELED: + numCanceledJobs++; + break; + case FAILED: + numFailedJobs++; + break; + case SUSPENDED: + break; + default: + throw new IllegalStateException( + "The job " + + executionGraphInfo.getArchivedExecutionGraph().getJobName() + + '(' + + jobId + + ") should have been in a known terminal state. " + + "Instead it was in state " + + jobStatus + + '.'); + } + + jobIdToApplicationId.put(jobId, applicationId); + applicationIdToJobIds + .computeIfAbsent(applicationId, ignored -> new ArrayList<>()) + .add(jobId); + jobDetailsCache.put( + jobId, + JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph())); + } + + storeArchivedApplication(archivedApplication); + + final ApplicationDetails detailsForApplication = + ApplicationDetails.fromArchivedApplication(archivedApplication); + + applicationDetailsCache.put(applicationId, detailsForApplication); + archivedApplicationCache.put(applicationId, archivedApplication); + } + + @Override + public Collection getApplicationDetails() { + return applicationDetailsCache.asMap().values(); + } + + @Override + public Optional getExecutionGraphInfo(JobID jobId) { + try { + final ApplicationID applicationId = jobIdToApplicationId.get(jobId); + if (applicationId == null) { + return Optional.empty(); + } + return Optional.of(archivedApplicationCache.get(applicationId)) + .map(archivedApplication -> archivedApplication.getJobs().get(jobId)); + } catch (ExecutionException e) { + LOG.debug( + "Could not load archived execution graph information for job id {}.", jobId, e); + return Optional.empty(); + } + } + + @Override + public Collection getJobDetails() { + return jobDetailsCache.asMap().values(); + } + + @Override + public JobsOverview getJobsOverview() { + return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs); + } + + @Override + public void close() throws IOException { + cleanupFuture.cancel(false); + + applicationDetailsCache.invalidateAll(); + + // clean up the storage directory + FileUtils.deleteFileOrDirectory(storageDir); + } + + // -------------------------------------------------------------- + // Internal methods + // -------------------------------------------------------------- + + private int calculateSize( + ApplicationID applicationId, ArchivedApplication archivedApplication) { + final File archivedApplicationFile = getArchivedApplicationFile(applicationId); + + if (archivedApplicationFile.exists()) { + return Math.toIntExact(archivedApplicationFile.length()); + } else { + LOG.debug( + "Could not find archived application file for {}. Estimating the size instead.", + applicationId); + int estimatedSize = 0; + for (ExecutionGraphInfo executionGraphInfo : archivedApplication.getJobs().values()) { + final ArchivedExecutionGraph archivedExecutionGraph = + executionGraphInfo.getArchivedExecutionGraph(); + estimatedSize += + archivedExecutionGraph.getAllVertices().size() * 1000 + + archivedExecutionGraph.getAccumulatorsSerialized().size() * 1000; + } + return estimatedSize; + } + } + + private ArchivedApplication loadArchivedApplication(ApplicationID applicationId) + throws IOException, ClassNotFoundException { + final File archivedApplicationFile = getArchivedApplicationFile(applicationId); + + if (archivedApplicationFile.exists()) { + try (FileInputStream fileInputStream = new FileInputStream(archivedApplicationFile)) { + return InstantiationUtil.deserializeObject( + fileInputStream, getClass().getClassLoader()); + } + } else { + throw new FileNotFoundException( + "Could not find file for archived application " + + applicationId + + ". This indicates that the file either has been deleted or never written."); + } + } + + private void storeArchivedApplication(ArchivedApplication archivedApplication) + throws IOException { + final File archivedApplicationFile = + getArchivedApplicationFile(archivedApplication.getApplicationId()); + + try (FileOutputStream fileOutputStream = new FileOutputStream(archivedApplicationFile)) { + InstantiationUtil.serializeObject(fileOutputStream, archivedApplication); + } + } + + private File getArchivedApplicationFile(ApplicationID applicationId) { + return new File(storageDir, applicationId.toString()); + } + + private void deleteArchivedApplication(ApplicationID applicationId) { + Preconditions.checkNotNull(applicationId); + + final File archivedApplicationFile = getArchivedApplicationFile(applicationId); + + try { + FileUtils.deleteFileOrDirectory(archivedApplicationFile); + } catch (IOException e) { + LOG.debug("Could not delete file {}.", archivedApplicationFile, e); + } + + archivedApplicationCache.invalidate(applicationId); + applicationDetailsCache.invalidate(applicationId); + List jobIds = applicationIdToJobIds.remove(applicationId); + if (jobIds != null) { + jobIds.forEach( + jobId -> { + jobIdToApplicationId.remove(jobId); + jobDetailsCache.invalidate(jobId); + }); + } + } + + private static File initArchivedApplicationStorageDirectory(File tmpDir) throws IOException { + final int maxAttempts = 10; + + for (int attempt = 0; attempt < maxAttempts; attempt++) { + final File storageDirectory = + new File(tmpDir, "archivedApplicationStore-" + UUID.randomUUID()); + + if (storageDirectory.mkdir()) { + return storageDirectory; + } + } + + throw new IOException( + "Could not create archivedApplicationStorage directory in " + tmpDir + '.'); + } + + // -------------------------------------------------------------- + // Testing methods + // -------------------------------------------------------------- + + @VisibleForTesting + File getStorageDir() { + return storageDir; + } + + @VisibleForTesting + LoadingCache getArchivedApplicationCache() { + return archivedApplicationCache; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java deleted file mode 100644 index 518a151ed2553..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStore.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.messages.webmonitor.JobsOverview; -import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.util.FileUtils; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.concurrent.ScheduledExecutor; - -import org.apache.flink.shaded.guava33.com.google.common.base.Ticker; -import org.apache.flink.shaded.guava33.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder; -import org.apache.flink.shaded.guava33.com.google.common.cache.CacheLoader; -import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache; -import org.apache.flink.shaded.guava33.com.google.common.cache.RemovalListener; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.time.Duration; -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * Store for {@link ExecutionGraphInfo} instances. The store writes the archived execution graph - * information to disk and keeps the most recently used execution graphs in a memory cache for - * faster serving. Moreover, the stored execution graphs are periodically cleaned up. - */ -public class FileExecutionGraphInfoStore implements ExecutionGraphInfoStore { - - private static final Logger LOG = LoggerFactory.getLogger(FileExecutionGraphInfoStore.class); - - private final File storageDir; - - private final Cache jobDetailsCache; - - private final LoadingCache executionGraphInfoCache; - - private final ScheduledFuture cleanupFuture; - - private int numFinishedJobs; - - private int numFailedJobs; - - private int numCanceledJobs; - - public FileExecutionGraphInfoStore( - File rootDir, - Duration expirationTime, - int maximumCapacity, - long maximumCacheSizeBytes, - ScheduledExecutor scheduledExecutor, - Ticker ticker) - throws IOException { - - final File storageDirectory = initExecutionGraphStorageDirectory(rootDir); - - LOG.info( - "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", - FileExecutionGraphInfoStore.class.getSimpleName(), - storageDirectory, - expirationTime.toMillis(), - maximumCacheSizeBytes); - - this.storageDir = Preconditions.checkNotNull(storageDirectory); - Preconditions.checkArgument( - storageDirectory.exists() && storageDirectory.isDirectory(), - "The storage directory must exist and be a directory."); - this.jobDetailsCache = - CacheBuilder.newBuilder() - .expireAfterWrite(expirationTime.toMillis(), TimeUnit.MILLISECONDS) - .maximumSize(maximumCapacity) - .removalListener( - (RemovalListener) - notification -> - deleteExecutionGraphFile(notification.getKey())) - .ticker(ticker) - .build(); - - this.executionGraphInfoCache = - CacheBuilder.newBuilder() - .maximumWeight(maximumCacheSizeBytes) - .weigher(this::calculateSize) - .build( - new CacheLoader() { - @Override - public ExecutionGraphInfo load(JobID jobId) throws Exception { - return loadExecutionGraph(jobId); - } - }); - - this.cleanupFuture = - scheduledExecutor.scheduleWithFixedDelay( - jobDetailsCache::cleanUp, - expirationTime.toMillis(), - expirationTime.toMillis(), - TimeUnit.MILLISECONDS); - - this.numFinishedJobs = 0; - this.numFailedJobs = 0; - this.numCanceledJobs = 0; - } - - @Override - public int size() { - return Math.toIntExact(jobDetailsCache.size()); - } - - @Override - @Nullable - public ExecutionGraphInfo get(JobID jobId) { - try { - return executionGraphInfoCache.get(jobId); - } catch (ExecutionException e) { - LOG.debug( - "Could not load archived execution graph information for job id {}.", jobId, e); - return null; - } - } - - @Override - public void put(ExecutionGraphInfo executionGraphInfo) throws IOException { - final JobID jobId = executionGraphInfo.getJobId(); - - final ArchivedExecutionGraph archivedExecutionGraph = - executionGraphInfo.getArchivedExecutionGraph(); - final JobStatus jobStatus = archivedExecutionGraph.getState(); - final String jobName = archivedExecutionGraph.getJobName(); - - Preconditions.checkArgument( - jobStatus.isTerminalState(), - "The job " - + jobName - + '(' - + jobId - + ") is not in a terminal state. Instead it is in state " - + jobStatus - + '.'); - - switch (jobStatus) { - case FINISHED: - numFinishedJobs++; - break; - case CANCELED: - numCanceledJobs++; - break; - case FAILED: - numFailedJobs++; - break; - case SUSPENDED: - break; - default: - throw new IllegalStateException( - "The job " - + jobName - + '(' - + jobId - + ") should have been in a known terminal state. " - + "Instead it was in state " - + jobStatus - + '.'); - } - - // write the ArchivedExecutionGraph to disk - storeExecutionGraphInfo(executionGraphInfo); - - final JobDetails detailsForJob = JobDetails.createDetailsForJob(archivedExecutionGraph); - - jobDetailsCache.put(jobId, detailsForJob); - executionGraphInfoCache.put(jobId, executionGraphInfo); - } - - @Override - public JobsOverview getStoredJobsOverview() { - return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs); - } - - @Override - public Collection getAvailableJobDetails() { - return jobDetailsCache.asMap().values(); - } - - @Nullable - @Override - public JobDetails getAvailableJobDetails(JobID jobId) { - return jobDetailsCache.getIfPresent(jobId); - } - - @Override - public void close() throws IOException { - cleanupFuture.cancel(false); - - jobDetailsCache.invalidateAll(); - - // clean up the storage directory - FileUtils.deleteFileOrDirectory(storageDir); - } - - // -------------------------------------------------------------- - // Internal methods - // -------------------------------------------------------------- - - private int calculateSize(JobID jobId, ExecutionGraphInfo serializableExecutionGraphInfo) { - final File executionGraphInfoFile = getExecutionGraphFile(jobId); - - if (executionGraphInfoFile.exists()) { - return Math.toIntExact(executionGraphInfoFile.length()); - } else { - LOG.debug( - "Could not find execution graph information file for {}. Estimating the size instead.", - jobId); - final ArchivedExecutionGraph serializableExecutionGraph = - serializableExecutionGraphInfo.getArchivedExecutionGraph(); - return serializableExecutionGraph.getAllVertices().size() * 1000 - + serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000; - } - } - - private ExecutionGraphInfo loadExecutionGraph(JobID jobId) - throws IOException, ClassNotFoundException { - final File executionGraphInfoFile = getExecutionGraphFile(jobId); - - if (executionGraphInfoFile.exists()) { - try (FileInputStream fileInputStream = new FileInputStream(executionGraphInfoFile)) { - return InstantiationUtil.deserializeObject( - fileInputStream, getClass().getClassLoader()); - } - } else { - throw new FileNotFoundException( - "Could not find file for archived execution graph " - + jobId - + ". This indicates that the file either has been deleted or never written."); - } - } - - private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) throws IOException { - final File archivedExecutionGraphFile = - getExecutionGraphFile(executionGraphInfo.getJobId()); - - try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) { - InstantiationUtil.serializeObject(fileOutputStream, executionGraphInfo); - } - } - - private File getExecutionGraphFile(JobID jobId) { - return new File(storageDir, jobId.toString()); - } - - private void deleteExecutionGraphFile(JobID jobId) { - Preconditions.checkNotNull(jobId); - - final File archivedExecutionGraphFile = getExecutionGraphFile(jobId); - - try { - FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile); - } catch (IOException e) { - LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e); - } - - executionGraphInfoCache.invalidate(jobId); - jobDetailsCache.invalidate(jobId); - } - - private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException { - final int maxAttempts = 10; - - for (int attempt = 0; attempt < maxAttempts; attempt++) { - final File storageDirectory = - new File(tmpDir, "executionGraphStore-" + UUID.randomUUID()); - - if (storageDirectory.mkdir()) { - return storageDirectory; - } - } - - throw new IOException( - "Could not create executionGraphStorage directory in " + tmpDir + '.'); - } - - // -------------------------------------------------------------- - // Testing methods - // -------------------------------------------------------------- - - @VisibleForTesting - File getStorageDir() { - return storageDir; - } - - @VisibleForTesting - LoadingCache getExecutionGraphInfoCache() { - return executionGraphInfoCache; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java similarity index 55% rename from flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java index e008e0401342d..3dec94c60d6ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStore.java @@ -18,9 +18,12 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.application.ArchivedApplication; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; @@ -30,35 +33,35 @@ import org.apache.flink.shaded.guava33.com.google.common.cache.Cache; import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.IOException; import java.time.Duration; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** - * {@link ExecutionGraphInfoStore} implementation which stores the {@link ArchivedExecutionGraph} in - * memory. The memory store support to keep maximum job graphs and remove the timeout ones. + * {@link ArchivedApplicationStore} implementation which stores the {@link ArchivedApplication} in + * memory. The memory store support to keep maximum applications and remove the timeout ones. */ -public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore { +public class MemoryArchivedApplicationStore implements ArchivedApplicationStore { - private static final Logger LOG = LoggerFactory.getLogger(MemoryExecutionGraphInfoStore.class); + private final Cache archivedApplicationCache; - private final Cache serializableExecutionGraphInfos; + private final Map jobIdToApplicationId = new HashMap<>(); @Nullable private final ScheduledFuture cleanupFuture; - public MemoryExecutionGraphInfoStore() { + public MemoryArchivedApplicationStore() { this(Duration.ofMillis(0), 0, null, null); } - public MemoryExecutionGraphInfoStore( + public MemoryArchivedApplicationStore( Duration expirationTime, int maximumCapacity, @Nullable ScheduledExecutor scheduledExecutor, @@ -75,11 +78,11 @@ public MemoryExecutionGraphInfoStore( cacheBuilder.ticker(ticker); } - this.serializableExecutionGraphInfos = cacheBuilder.build(); + this.archivedApplicationCache = cacheBuilder.build(); if (scheduledExecutor != null) { this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay( - serializableExecutionGraphInfos::cleanUp, + archivedApplicationCache::cleanUp, expirationTime.toMillis(), expirationTime.toMillis(), TimeUnit.MILLISECONDS); @@ -90,52 +93,65 @@ public MemoryExecutionGraphInfoStore( @Override public int size() { - return Math.toIntExact(serializableExecutionGraphInfos.size()); + return Math.toIntExact(archivedApplicationCache.size()); } - @Nullable @Override - public ExecutionGraphInfo get(JobID jobId) { - return serializableExecutionGraphInfos.getIfPresent(jobId); + public Optional get(ApplicationID applicationId) { + return Optional.ofNullable(archivedApplicationCache.getIfPresent(applicationId)); } @Override - public void put(ExecutionGraphInfo serializableExecutionGraphInfo) throws IOException { - serializableExecutionGraphInfos.put( - serializableExecutionGraphInfo.getJobId(), serializableExecutionGraphInfo); + public void put(ArchivedApplication archivedApplication) throws IOException { + final ApplicationID applicationId = archivedApplication.getApplicationId(); + archivedApplication + .getJobs() + .keySet() + .forEach(jobId -> jobIdToApplicationId.put(jobId, applicationId)); + + archivedApplicationCache.put(archivedApplication.getApplicationId(), archivedApplication); } @Override - public JobsOverview getStoredJobsOverview() { - Collection allJobStatus = - serializableExecutionGraphInfos.asMap().values().stream() - .map(ExecutionGraphInfo::getArchivedExecutionGraph) - .map(ArchivedExecutionGraph::getState) - .collect(Collectors.toList()); + public Collection getApplicationDetails() { + return archivedApplicationCache.asMap().values().stream() + .map(ApplicationDetails::fromArchivedApplication) + .collect(Collectors.toList()); + } - return JobsOverview.create(allJobStatus); + @Override + public Optional getExecutionGraphInfo(JobID jobId) { + final ApplicationID applicationId = jobIdToApplicationId.get(jobId); + if (applicationId == null) { + return Optional.empty(); + } + final ArchivedApplication archivedApplication = + archivedApplicationCache.getIfPresent(applicationId); + return Optional.ofNullable(archivedApplication) + .map(application -> application.getJobs().get(jobId)); } @Override - public Collection getAvailableJobDetails() { - return serializableExecutionGraphInfos.asMap().values().stream() + public Collection getJobDetails() { + return archivedApplicationCache.asMap().values().stream() + .flatMap(archivedApplication -> archivedApplication.getJobs().values().stream()) .map(ExecutionGraphInfo::getArchivedExecutionGraph) .map(JobDetails::createDetailsForJob) .collect(Collectors.toList()); } - @Nullable @Override - public JobDetails getAvailableJobDetails(JobID jobId) { - final ExecutionGraphInfo archivedExecutionGraphInfo = - serializableExecutionGraphInfos.getIfPresent(jobId); + public JobsOverview getJobsOverview() { + Collection allJobStatus = + archivedApplicationCache.asMap().values().stream() + .flatMap( + archivedApplication -> + archivedApplication.getJobs().values().stream()) + .map(ExecutionGraphInfo::getArchivedExecutionGraph) + .map(ArchivedExecutionGraph::getState) + .collect(Collectors.toList()); - if (archivedExecutionGraphInfo != null) { - return JobDetails.createDetailsForJob( - archivedExecutionGraphInfo.getArchivedExecutionGraph()); - } else { - return null; - } + return JobsOverview.create(allJobStatus); } @Override @@ -144,6 +160,6 @@ public void close() throws IOException { cleanupFuture.cancel(false); } - serializableExecutionGraphInfos.invalidateAll(); + archivedApplicationCache.invalidateAll(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java index 9ee850f6ec660..8a10f5f547aea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java @@ -51,7 +51,7 @@ public class PartialDispatcherServices { @Nonnull private final JobManagerMetricGroupFactory jobManagerMetricGroupFactory; - @Nonnull private final ExecutionGraphInfoStore executionGraphInfoStore; + @Nonnull private final ArchivedApplicationStore archivedApplicationStore; @Nonnull private final FatalErrorHandler fatalErrorHandler; @@ -72,7 +72,7 @@ public PartialDispatcherServices( @Nonnull BlobServer blobServer, @Nonnull HeartbeatServices heartbeatServices, @Nonnull JobManagerMetricGroupFactory jobManagerMetricGroupFactory, - @Nonnull ExecutionGraphInfoStore executionGraphInfoStore, + @Nonnull ArchivedApplicationStore archivedApplicationStore, @Nonnull FatalErrorHandler fatalErrorHandler, @Nonnull HistoryServerArchivist historyServerArchivist, @Nullable String metricQueryServiceAddress, @@ -85,7 +85,7 @@ public PartialDispatcherServices( this.blobServer = blobServer; this.heartbeatServices = heartbeatServices; this.jobManagerMetricGroupFactory = jobManagerMetricGroupFactory; - this.executionGraphInfoStore = executionGraphInfoStore; + this.archivedApplicationStore = archivedApplicationStore; this.fatalErrorHandler = fatalErrorHandler; this.historyServerArchivist = historyServerArchivist; this.metricQueryServiceAddress = metricQueryServiceAddress; @@ -125,8 +125,8 @@ public JobManagerMetricGroupFactory getJobManagerMetricGroupFactory() { } @Nonnull - public ExecutionGraphInfoStore getArchivedExecutionGraphStore() { - return executionGraphInfoStore; + public ArchivedApplicationStore getArchivedApplicationStore() { + return archivedApplicationStore; } @Nonnull diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java index 2c967a526491d..4f701bbffd0cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java @@ -48,7 +48,7 @@ private PartialDispatcherServicesWithJobPersistenceComponents( BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroupFactory jobManagerMetricGroupFactory, - ExecutionGraphInfoStore executionGraphInfoStore, + ArchivedApplicationStore archivedApplicationStore, FatalErrorHandler fatalErrorHandler, HistoryServerArchivist historyServerArchivist, @Nullable String metricQueryServiceAddress, @@ -64,7 +64,7 @@ private PartialDispatcherServicesWithJobPersistenceComponents( blobServer, heartbeatServices, jobManagerMetricGroupFactory, - executionGraphInfoStore, + archivedApplicationStore, fatalErrorHandler, historyServerArchivist, metricQueryServiceAddress, @@ -94,7 +94,7 @@ public static PartialDispatcherServicesWithJobPersistenceComponents from( partialDispatcherServices.getBlobServer(), partialDispatcherServices.getHeartbeatServices(), partialDispatcherServices.getJobManagerMetricGroupFactory(), - partialDispatcherServices.getArchivedExecutionGraphStore(), + partialDispatcherServices.getArchivedApplicationStore(), partialDispatcherServices.getFatalErrorHandler(), partialDispatcherServices.getHistoryServerArchivist(), partialDispatcherServices.getMetricQueryServiceAddress(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index ced8edeb072b5..de80e508eeac3 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; @@ -169,7 +169,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro @GuardedBy("lock") private DeterminismEnvelope workingDirectory; - private ExecutionGraphInfoStore executionGraphInfoStore; + private ArchivedApplicationStore archivedApplicationStore; private final Thread shutDownHook; private RpcSystem rpcSystem; @@ -301,7 +301,7 @@ private void runCluster(Configuration configuration, PluginManager pluginManager heartbeatServices, delegationTokenManager, metricRegistry, - executionGraphInfoStore, + archivedApplicationStore, new RpcMetricQueryServiceRetriever( metricRegistry.getMetricQueryServiceRpcService()), failureEnrichers, @@ -419,8 +419,8 @@ protected void initializeServices(Configuration configuration, PluginManager plu ConfigurationUtils.getSystemResourceMetricsProbingInterval( configuration)); - executionGraphInfoStore = - createSerializableExecutionGraphStore( + archivedApplicationStore = + createArchivedApplicationStore( configuration, commonRpcService.getScheduledExecutor()); } } @@ -512,10 +512,10 @@ protected CompletableFuture stopClusterServices(boolean cleanupHaData) { } } - if (executionGraphInfoStore != null) { + if (archivedApplicationStore != null) { try { - executionGraphInfoStore.close(); - executionGraphInfoStore = null; + archivedApplicationStore.close(); + archivedApplicationStore = null; } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } @@ -693,7 +693,7 @@ protected void cleanupDirectories(ShutdownBehaviour shutdownBehaviour) throws IO createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException; - protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore( + protected abstract ArchivedApplicationStore createArchivedApplicationStore( Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException; public static EntrypointClusterConfiguration parseArguments(String[] args) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index 193eef0937b2a..5e603807cd0e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -21,9 +21,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; -import org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStore; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; +import org.apache.flink.runtime.dispatcher.FileArchivedApplicationStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; +import org.apache.flink.runtime.util.ArchivedApplicationStoreUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; import org.apache.flink.shaded.guava33.com.google.common.base.Ticker; @@ -40,23 +41,25 @@ public SessionClusterEntrypoint(Configuration configuration) { } @Override - protected ExecutionGraphInfoStore createSerializableExecutionGraphStore( + protected ArchivedApplicationStore createArchivedApplicationStore( Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException { - final JobManagerOptions.JobStoreType jobStoreType = - configuration.get(JobManagerOptions.JOB_STORE_TYPE); + final JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType = + configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE); final Duration expirationTime = - Duration.ofSeconds(configuration.get(JobManagerOptions.JOB_STORE_EXPIRATION_TIME)); - final int maximumCapacity = configuration.get(JobManagerOptions.JOB_STORE_MAX_CAPACITY); + ArchivedApplicationStoreUtils.getExpirationTime(configuration); + final int maximumCapacity = + configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_MAX_CAPACITY); - switch (jobStoreType) { + switch (archivedApplicationStoreType) { case File: { final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]); final long maximumCacheSizeBytes = - configuration.get(JobManagerOptions.JOB_STORE_CACHE_SIZE); + configuration.get( + JobManagerOptions.COMPLETED_APPLICATION_STORE_CACHE_SIZE); - return new FileExecutionGraphInfoStore( + return new FileArchivedApplicationStore( tmpDir, expirationTime, maximumCapacity, @@ -66,7 +69,7 @@ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore( } case Memory: { - return new MemoryExecutionGraphInfoStore( + return new MemoryArchivedApplicationStore( expirationTime, maximumCapacity, scheduledExecutor, @@ -75,7 +78,8 @@ protected ExecutionGraphInfoStore createSerializableExecutionGraphStore( default: { throw new IllegalArgumentException( - "Unsupported job store type " + jobStoreType); + "Unsupported archived application store type " + + archivedApplicationStoreType); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java index c1178130c9f87..cf5b39a0322dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java @@ -24,10 +24,10 @@ import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; import org.apache.flink.runtime.dispatcher.HistoryServerArchivist; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; @@ -111,7 +111,7 @@ public DispatcherResourceManagerComponent create( HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, MetricRegistry metricRegistry, - ExecutionGraphInfoStore executionGraphInfoStore, + ArchivedApplicationStore archivedApplicationStore, MetricQueryServiceRetriever metricQueryServiceRetriever, Collection failureEnrichers, FatalErrorHandler fatalErrorHandler) @@ -213,7 +213,7 @@ public DispatcherResourceManagerComponent create( () -> JobManagerMetricGroup.createJobManagerMetricGroup( metricRegistry, hostname), - executionGraphInfoStore, + archivedApplicationStore, fatalErrorHandler, historyServerArchivist, metricRegistry.getMetricQueryServiceGatewayRpcAddress(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java index 52ee8d3bc7e96..49232ce2f0b21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java @@ -22,7 +22,7 @@ import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -47,7 +47,7 @@ DispatcherResourceManagerComponent create( HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, MetricRegistry metricRegistry, - ExecutionGraphInfoStore executionGraphInfoStore, + ArchivedApplicationStore archivedApplicationStore, MetricQueryServiceRetriever metricQueryServiceRetriever, Collection failureEnrichers, FatalErrorHandler fatalErrorHandler) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java index d567173ee4d25..91155099c85fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetails.java @@ -181,8 +181,10 @@ public static ApplicationDetails fromArchivedApplication( : -1L; long duration = (endTime >= 0L ? endTime : System.currentTimeMillis()) - startTime; Map jobInfo = new HashMap<>(); - archivedApplication.getJobs().stream() - .map(archivedExecutionGraph -> archivedExecutionGraph.getState().name()) + archivedApplication.getJobs().values().stream() + .map( + executionGraphInfo -> + executionGraphInfo.getArchivedExecutionGraph().getState().name()) .forEach( status -> jobInfo.compute( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java index 5669e5d0f00d0..abea8264b2fbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ApplicationDetailsInfo.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer; import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Preconditions; @@ -212,7 +213,8 @@ public static ApplicationDetailsInfo fromArchivedApplication( timestamps.put(status.toString(), archivedApplication.getStatusTimestamp(status)); } final Collection jobs = - archivedApplication.getJobs().stream() + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getArchivedExecutionGraph) .map(JobDetails::createDetailsForJob) .collect(Collectors.toList()); return new ApplicationDetailsInfo( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 4bd331407cc85..a0327245bd10f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -34,6 +34,7 @@ import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.RecoveryClaimMode; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.application.AbstractApplication; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobServer; @@ -44,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -561,7 +562,7 @@ private void setupDispatcherResourceManagerComponents( heartbeatServices, delegationTokenManager, metricRegistry, - new MemoryExecutionGraphInfoStore(), + new MemoryArchivedApplicationStore(), metricQueryServiceRetriever, Collections.emptySet(), fatalErrorHandler); @@ -1063,6 +1064,13 @@ public JobExecutionResult executeJobBlocking(JobGraph job) } } + public CompletableFuture submitApplication(AbstractApplication application) { + final CompletableFuture dispatcherGatewayFuture = + getDispatcherGatewayFuture(); + return dispatcherGatewayFuture.thenCompose( + dispatcherGateway -> dispatcherGateway.submitApplication(application, rpcTimeout)); + } + public CompletableFuture submitJob(ExecutionPlan executionPlan) { if (executionPlan instanceof StreamGraph) { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ArchivedApplicationStoreUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ArchivedApplicationStoreUtils.java new file mode 100644 index 0000000000000..433dc857b8b38 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ArchivedApplicationStoreUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; + +import java.time.Duration; +import java.util.Optional; + +/** Utilities for configuring of {@link ArchivedApplicationStore}. */ +public final class ArchivedApplicationStoreUtils { + + public static Duration getExpirationTime(Configuration configuration) { + Optional optionalJobStoreExpirationTime = + configuration.getOptional(JobManagerOptions.JOB_STORE_EXPIRATION_TIME); + Optional optionalCompletedApplicationStoreExpirationTime = + configuration.getOptional( + JobManagerOptions.COMPLETED_APPLICATION_STORE_EXPIRATION_TIME); + if (optionalJobStoreExpirationTime.isPresent() + && optionalCompletedApplicationStoreExpirationTime.isEmpty()) { + return Duration.ofSeconds(optionalJobStoreExpirationTime.get()); + } + + return configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_EXPIRATION_TIME); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java index ef609e41c3ead..a68b5453a3c95 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/application/AbstractApplicationTest.java @@ -30,6 +30,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -212,6 +215,20 @@ void testTransitionFromFailed(ApplicationState targetState) { assertThrows(IllegalStateException.class, () -> application.transitionState(targetState)); } + @Test + void testApplicationStatusChange() { + AbstractApplication application = new MockApplication(new ApplicationID()); + MockApplicationStatusListener listener = new MockApplicationStatusListener(); + application.registerStatusListener(listener); + + application.transitionToRunning(); + application.transitionToFinished(); + + assertEquals( + Arrays.asList(ApplicationState.RUNNING, ApplicationState.FINISHED), + listener.getTargetStates()); + } + private static class MockApplication extends AbstractApplication { public MockApplication(ApplicationID applicationId) { super(applicationId); @@ -237,4 +254,18 @@ public String getName() { return "Mock Application"; } } + + private static class MockApplicationStatusListener implements ApplicationStatusListener { + private final List targetStates = new ArrayList<>(); + + @Override + public void notifyApplicationStatusChange( + ApplicationID applicationId, ApplicationState newStatus) { + targetStates.add(newStatus); + } + + public List getTargetStates() { + return targetStates; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreITCase.java new file mode 100644 index 0000000000000..f2c1337d8947c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreITCase.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.application.SingleJobApplication; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.File; +import java.util.Collection; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for the {@link ArchivedApplicationStore}. */ +class ArchivedApplicationStoreITCase { + + @RegisterExtension + static final TestExecutorExtension EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @TempDir File temporaryFolder; + + @ParameterizedTest + @EnumSource(value = JobManagerOptions.ArchivedApplicationStoreType.class) + void testArchivedApplicationStore( + JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType) + throws Exception { + Configuration configuration = new Configuration(); + configuration.set( + JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE, archivedApplicationStoreType); + try (final MiniCluster miniCluster = + new ArchivedApplicationStoreTestUtils.PersistingMiniCluster( + new MiniClusterConfiguration.Builder() + .withRandomPorts() + .setConfiguration(configuration) + .build(), + temporaryFolder, + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { + miniCluster.start(); + + final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); + miniCluster.submitApplication(new SingleJobApplication(jobGraph)).get(); + + JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).get(); + assertEquals(JobStatus.FINISHED, jobResult.getJobStatus().orElse(null)); + + Collection jobs = miniCluster.listJobs().get(); + assertEquals(1, jobs.size()); + assertEquals(jobGraph.getJobID(), jobs.iterator().next().getJobId()); + } + } + + /** Tests that a session cluster can terminate gracefully when jobs are still running. */ + @ParameterizedTest + @EnumSource(value = JobManagerOptions.ArchivedApplicationStoreType.class) + void testSuspendedJobOnClusterShutdown( + JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType) + throws Exception { + Configuration configuration = new Configuration(); + configuration.set( + JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE, archivedApplicationStoreType); + try (final MiniCluster miniCluster = + new ArchivedApplicationStoreTestUtils.PersistingMiniCluster( + new MiniClusterConfiguration.Builder() + .withRandomPorts() + .setConfiguration(configuration) + .build(), + temporaryFolder, + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { + miniCluster.start(); + final JobVertex vertex = new JobVertex("blockingVertex"); + // The adaptive scheduler expects that every vertex has a configured parallelism + vertex.setParallelism(1); + vertex.setInvokableClass( + ArchivedApplicationStoreTestUtils.SignallingBlockingNoOpInvokable.class); + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex); + miniCluster.submitJob(jobGraph); + ArchivedApplicationStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java similarity index 66% rename from flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java index a143827f75004..61e027f343ae9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ArchivedApplicationStoreTestUtils.java @@ -18,10 +18,14 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.application.ArchivedApplication; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; @@ -31,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -59,18 +64,24 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; -/** Test utils class for {@link FileExecutionGraphInfoStore}. */ -public class ExecutionGraphInfoStoreTestUtils { +/** Test utils class for {@link ArchivedApplicationStore}. */ +public class ArchivedApplicationStoreTestUtils { static final List GLOBALLY_TERMINAL_JOB_STATUS = Arrays.stream(JobStatus.values()) .filter(JobStatus::isGloballyTerminalState) .collect(Collectors.toList()); + static final List TERMINAL_APPLICATION_STATUS = + Arrays.stream(ApplicationState.values()) + .filter(ApplicationState::isTerminalState) + .collect(Collectors.toList()); + /** * Generate a specified of ExecutionGraphInfo. * @@ -93,6 +104,38 @@ static Collection generateTerminalExecutionGraphInfos(int nu return executionGraphInfos; } + /** + * Generate a specified of ArchivedApplication. + * + * @param number the given number + * @return the result ArchivedApplication collection + */ + static Collection generateTerminalArchivedApplications(int number) { + final Collection archivedApplications = new ArrayList<>(number); + + for (int i = 0; i < number; i++) { + final ApplicationState state = + TERMINAL_APPLICATION_STATUS.get( + ThreadLocalRandom.current() + .nextInt(TERMINAL_APPLICATION_STATUS.size())); + final Map jobs = + generateTerminalExecutionGraphInfos(1).stream() + .collect( + Collectors.toMap( + ExecutionGraphInfo::getJobId, + executionGraphInfo -> executionGraphInfo)); + archivedApplications.add( + new ArchivedApplication( + ApplicationID.generate(), + "test-application-" + i, + state, + new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, + jobs)); + } + + return archivedApplications; + } + /** Compare whether two ExecutionGraphInfo instances are equivalent. */ static final class PartialExecutionGraphInfoMatcher extends BaseMatcher { @@ -142,18 +185,64 @@ public void describeTo(Description description) { } } + /** Compare whether two ArchivedApplication instances are equivalent. */ + static final class PartialArchivedApplicationMatcher extends BaseMatcher { + + private final ArchivedApplication expectedArchivedApplication; + + PartialArchivedApplicationMatcher(ArchivedApplication expectedArchivedApplication) { + this.expectedArchivedApplication = + Preconditions.checkNotNull(expectedArchivedApplication); + } + + @Override + public boolean matches(Object o) { + if (expectedArchivedApplication == o) { + return true; + } + if (o == null || expectedArchivedApplication.getClass() != o.getClass()) { + return false; + } + ArchivedApplication that = (ArchivedApplication) o; + + return Objects.equals( + expectedArchivedApplication.getApplicationId(), that.getApplicationId()) + && Objects.equals( + expectedArchivedApplication.getApplicationName(), + that.getApplicationName()) + && expectedArchivedApplication.getApplicationStatus() + == that.getApplicationStatus() + && Objects.equals( + expectedArchivedApplication.getJobs().size(), that.getJobs().size()); + } + + @Override + public void describeTo(Description description) { + description.appendText( + "Matches against " + ArchivedApplication.class.getSimpleName() + '.'); + } + } + static Collection generateJobDetails( - Collection executionGraphInfos) { - return executionGraphInfos.stream() + Collection archivedApplications) { + return archivedApplications.stream() + .flatMap(archivedApplication -> archivedApplication.getJobs().values().stream()) .map(ExecutionGraphInfo::getArchivedExecutionGraph) .map(JobDetails::createDetailsForJob) .collect(Collectors.toList()); } + static Collection generateApplicationDetails( + Collection archivedApplications) { + return archivedApplications.stream() + .map(ApplicationDetails::fromArchivedApplication) + .collect(Collectors.toList()); + } + /** * Invokable which signals with {@link - * ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable#LATCH} when it is invoked - * and blocks forever afterwards. + * ArchivedApplicationStoreTestUtils.SignallingBlockingNoOpInvokable#LATCH} when it is invoked + * and blocks forever afterward. */ public static class SignallingBlockingNoOpInvokable extends AbstractInvokable { @@ -171,7 +260,7 @@ public void invoke() throws Exception { } } - /** MiniCluster with specified {@link ExecutionGraphInfoStore}. */ + /** MiniCluster with specified {@link ArchivedApplicationStore}. */ static class PersistingMiniCluster extends MiniCluster { @Nullable private final File rootDir; private final ScheduledExecutor scheduledExecutor; @@ -209,25 +298,26 @@ static class PersistingMiniCluster extends MiniCluster { .createSessionComponentFactory( StandaloneResourceManagerFactory.getInstance()); - JobManagerOptions.JobStoreType jobStoreType = - configuration.get(JobManagerOptions.JOB_STORE_TYPE); - final ExecutionGraphInfoStore executionGraphInfoStore; - switch (jobStoreType) { + JobManagerOptions.ArchivedApplicationStoreType archivedApplicationStoreType = + configuration.get(JobManagerOptions.COMPLETED_APPLICATION_STORE_TYPE); + final ArchivedApplicationStore archivedApplicationStore; + switch (archivedApplicationStoreType) { case File: { - executionGraphInfoStore = - createDefaultExecutionGraphInfoStore(rootDir, scheduledExecutor); + archivedApplicationStore = + createFileArchivedApplicationStore(rootDir, scheduledExecutor); break; } case Memory: { - executionGraphInfoStore = new MemoryExecutionGraphInfoStore(); + archivedApplicationStore = new MemoryArchivedApplicationStore(); break; } default: { throw new UnsupportedOperationException( - "Unsupported job store type " + jobStoreType); + "Unsupported archived application store type " + + archivedApplicationStoreType); } } @@ -242,16 +332,16 @@ static class PersistingMiniCluster extends MiniCluster { heartbeatServices, delegationTokenManager, metricRegistry, - executionGraphInfoStore, + archivedApplicationStore, metricQueryServiceRetriever, Collections.emptySet(), fatalErrorHandler)); } } - static FileExecutionGraphInfoStore createDefaultExecutionGraphInfoStore( + static FileArchivedApplicationStore createFileArchivedApplicationStore( File storageDirectory, ScheduledExecutor scheduledExecutor) throws IOException { - return new FileExecutionGraphInfoStore( + return new FileArchivedApplicationStore( storageDirectory, Duration.ofHours(1L), Integer.MAX_VALUE, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStoreTest.java new file mode 100644 index 0000000000000..d810a8ee77b4d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedApplicationStoreTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.util.ManualTicker; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.apache.flink.shaded.guava33.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.createFileArchivedApplicationStore; +import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateTerminalArchivedApplications; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** Tests for the {@link FileArchivedApplicationStore}. */ +class FileArchivedApplicationStoreTest extends MemoryArchivedApplicationStoreTest { + + @RegisterExtension + static final TestExecutorExtension EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @TempDir File temporaryFolder; + + /** Tests that an expired application is removed from the store. */ + @Test + void testArchivedApplicationExpiration() throws Exception { + final ArchivedApplication archivedApplication = + generateTerminalArchivedApplications(1).iterator().next(); + final ExecutionGraphInfo executionGraphInfo = + archivedApplication.getJobs().values().iterator().next(); + final ApplicationID applicationId = archivedApplication.getApplicationId(); + final JobID jobId = executionGraphInfo.getJobId(); + + final Duration expirationTime = Duration.ofMillis(1L); + + final ManuallyTriggeredScheduledExecutor scheduledExecutor = + new ManuallyTriggeredScheduledExecutor(); + + final ManualTicker manualTicker = new ManualTicker(); + + try (final FileArchivedApplicationStore archivedApplicationStore = + new FileArchivedApplicationStore( + temporaryFolder, + expirationTime, + Integer.MAX_VALUE, + 10000L, + scheduledExecutor, + manualTicker)) { + + archivedApplicationStore.put(archivedApplication); + + // there should one application + assertEquals(1, archivedApplicationStore.size()); + + manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS); + + // this should trigger the cleanup after expiration + scheduledExecutor.triggerScheduledTasks(); + + // check that the store is empty + assertEquals(0, archivedApplicationStore.size()); + + assertNull(archivedApplicationStore.get(applicationId).orElse(null)); + + assertNull(archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null)); + + final File storageDirectory = archivedApplicationStore.getStorageDir(); + + // check that the persisted file has been deleted + assertEquals(0, storageDirectory.listFiles().length); + } + } + + /** Tests that all applications are cleaned up after closing the store. */ + @Test + void testCloseCleansUp() throws IOException { + assertEquals(0, temporaryFolder.listFiles().length); + + final ArchivedApplication archivedApplication = + generateTerminalArchivedApplications(1).iterator().next(); + + try (final FileArchivedApplicationStore archivedApplicationStore = + createFileArchivedApplicationStore( + temporaryFolder, + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { + + assertEquals(1, temporaryFolder.listFiles().length); + + final File storageDirectory = archivedApplicationStore.getStorageDir(); + + assertEquals(0, storageDirectory.listFiles().length); + + archivedApplicationStore.put(archivedApplication); + + assertEquals(1, storageDirectory.listFiles().length); + + archivedApplicationStore.close(); + + assertEquals(0, temporaryFolder.listFiles().length); + } + } + + /** Tests that evicted {@link ArchivedApplication} are loaded from disk again. */ + @Test + void testCacheLoading() throws IOException { + try (final FileArchivedApplicationStore archivedApplicationStore = + new FileArchivedApplicationStore( + temporaryFolder, + Duration.ofHours(1L), + Integer.MAX_VALUE, + 100L << 10, + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()), + Ticker.systemTicker())) { + + final LoadingCache archivedApplicationCache = + archivedApplicationStore.getArchivedApplicationCache(); + + Collection archivedApplications = new ArrayList<>(64); + + boolean continueInserting = true; + + // insert applications until the first one got evicted + while (continueInserting) { + // has roughly a size of 3.1 KB + final ArchivedApplication archivedApplication = + generateTerminalArchivedApplications(1).iterator().next(); + + archivedApplicationStore.put(archivedApplication); + + archivedApplications.add(archivedApplication); + + continueInserting = archivedApplicationCache.size() == archivedApplications.size(); + } + + final File storageDirectory = archivedApplicationStore.getStorageDir(); + + assertEquals(archivedApplications.size(), storageDirectory.listFiles().length); + + for (ArchivedApplication archivedApplication : archivedApplications) { + final ExecutionGraphInfo executionGraphInfo = + archivedApplication.getJobs().values().iterator().next(); + final ApplicationID applicationId = archivedApplication.getApplicationId(); + final JobID jobId = executionGraphInfo.getJobId(); + + assertThat( + archivedApplicationStore.get(applicationId).orElse(null), + new ArchivedApplicationStoreTestUtils.PartialArchivedApplicationMatcher( + archivedApplication)); + + assertThat( + archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null), + new ArchivedApplicationStoreTestUtils.PartialExecutionGraphInfoMatcher( + executionGraphInfo)); + } + } + } + + @Override + ArchivedApplicationStore createDefaultArchivedApplicationStore() throws IOException { + return createFileArchivedApplicationStore( + temporaryFolder, + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor())); + } + + @Override + ArchivedApplicationStore createArchivedApplicationStore(int maximumCapacity) + throws IOException { + return new FileArchivedApplicationStore( + temporaryFolder, + Duration.ofHours(1L), + maximumCapacity, + 10000L, + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()), + Ticker.systemTicker()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java deleted file mode 100644 index ed16309be77e9..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileExecutionGraphInfoStoreTest.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.messages.webmonitor.JobsOverview; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; -import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.runtime.util.ManualTicker; -import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; -import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; -import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; - -import org.apache.flink.shaded.guava33.com.google.common.base.Ticker; -import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache; - -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.createDefaultExecutionGraphInfoStore; -import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateJobDetails; -import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -/** Tests for the {@link FileExecutionGraphInfoStore}. */ -public class FileExecutionGraphInfoStoreTest extends TestLogger { - - @ClassRule - public static final TestExecutorResource EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorResource(); - - @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - - /** - * Tests that we can put {@link ExecutionGraphInfo} into the {@link FileExecutionGraphInfoStore} - * and that the graph is persisted. - */ - @Test - public void testPut() throws IOException { - assertPutJobGraphWithStatus(JobStatus.FINISHED); - } - - /** Tests that a SUSPENDED job can be persisted. */ - @Test - public void testPutSuspendedJob() throws IOException { - assertPutJobGraphWithStatus(JobStatus.SUSPENDED); - } - - /** Tests that null is returned if we request an unknown JobID. */ - @Test - public void testUnknownGet() throws IOException { - final File rootDir = temporaryFolder.newFolder(); - - try (final FileExecutionGraphInfoStore executionGraphStore = - createDefaultExecutionGraphInfoStore( - rootDir, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue()); - } - } - - /** Tests that we obtain the correct jobs overview. */ - @Test - public void testStoredJobsOverview() throws IOException { - final int numberExecutionGraphs = 10; - final Collection executionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - - final List jobStatuses = - executionGraphInfos.stream() - .map(ExecutionGraphInfo::getArchivedExecutionGraph) - .map(ArchivedExecutionGraph::getState) - .collect(Collectors.toList()); - - final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses); - - final File rootDir = temporaryFolder.newFolder(); - - try (final FileExecutionGraphInfoStore executionGraphInfoStore = - createDefaultExecutionGraphInfoStore( - rootDir, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - } - - assertThat( - executionGraphInfoStore.getStoredJobsOverview(), - Matchers.equalTo(expectedJobsOverview)); - } - } - - /** Tests that we obtain the correct collection of available job details. */ - @Test - public void testAvailableJobDetails() throws IOException { - final int numberExecutionGraphs = 10; - final Collection executionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - - final Collection jobDetails = generateJobDetails(executionGraphInfos); - - final File rootDir = temporaryFolder.newFolder(); - - try (final FileExecutionGraphInfoStore executionGraphInfoStore = - createDefaultExecutionGraphInfoStore( - rootDir, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - } - - assertThat( - executionGraphInfoStore.getAvailableJobDetails(), - Matchers.containsInAnyOrder(jobDetails.toArray())); - } - } - - /** Tests that an expired execution graph is removed from the execution graph store. */ - @Test - public void testExecutionGraphExpiration() throws Exception { - final File rootDir = temporaryFolder.newFolder(); - - final Duration expirationTime = Duration.ofMillis(1L); - - final ManuallyTriggeredScheduledExecutor scheduledExecutor = - new ManuallyTriggeredScheduledExecutor(); - - final ManualTicker manualTicker = new ManualTicker(); - - try (final FileExecutionGraphInfoStore executionGraphInfoStore = - new FileExecutionGraphInfoStore( - rootDir, - expirationTime, - Integer.MAX_VALUE, - 10000L, - scheduledExecutor, - manualTicker)) { - - final ExecutionGraphInfo executionGraphInfo = - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder() - .setState(JobStatus.FINISHED) - .build()); - - executionGraphInfoStore.put(executionGraphInfo); - - // there should one execution graph - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1)); - - manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS); - - // this should trigger the cleanup after expiration - scheduledExecutor.triggerScheduledTasks(); - - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); - - assertThat( - executionGraphInfoStore.get(executionGraphInfo.getJobId()), - Matchers.nullValue()); - - final File storageDirectory = executionGraphInfoStore.getStorageDir(); - - // check that the persisted file has been deleted - assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); - } - } - - /** Tests that all persisted files are cleaned up after closing the store. */ - @Test - public void testCloseCleansUp() throws IOException { - final File rootDir = temporaryFolder.newFolder(); - - assertThat(rootDir.listFiles().length, Matchers.equalTo(0)); - - try (final FileExecutionGraphInfoStore executionGraphInfoStore = - createDefaultExecutionGraphInfoStore( - rootDir, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - - assertThat(rootDir.listFiles().length, Matchers.equalTo(1)); - - final File storageDirectory = executionGraphInfoStore.getStorageDir(); - - assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); - - executionGraphInfoStore.put( - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder() - .setState(JobStatus.FINISHED) - .build())); - - assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); - } - - assertThat(rootDir.listFiles().length, Matchers.equalTo(0)); - } - - /** Tests that evicted {@link ExecutionGraphInfo} are loaded from disk again. */ - @Test - public void testCacheLoading() throws IOException { - final File rootDir = temporaryFolder.newFolder(); - - try (final FileExecutionGraphInfoStore executionGraphInfoStore = - new FileExecutionGraphInfoStore( - rootDir, - Duration.ofHours(1L), - Integer.MAX_VALUE, - 100L << 10, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()), - Ticker.systemTicker())) { - - final LoadingCache executionGraphInfoCache = - executionGraphInfoStore.getExecutionGraphInfoCache(); - - Collection executionGraphInfos = new ArrayList<>(64); - - boolean continueInserting = true; - - // insert execution graphs until the first one got evicted - while (continueInserting) { - // has roughly a size of 1.4 KB - final ExecutionGraphInfo executionGraphInfo = - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder() - .setState(JobStatus.FINISHED) - .build()); - - executionGraphInfoStore.put(executionGraphInfo); - - executionGraphInfos.add(executionGraphInfo); - - continueInserting = executionGraphInfoCache.size() == executionGraphInfos.size(); - } - - final File storageDirectory = executionGraphInfoStore.getStorageDir(); - - assertThat( - storageDirectory.listFiles().length, - Matchers.equalTo(executionGraphInfos.size())); - - for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { - assertThat( - executionGraphInfoStore.get(executionGraphInfo.getJobId()), - matchesPartiallyWith(executionGraphInfo)); - } - } - } - - /** - * Tests that the size of {@link FileExecutionGraphInfoStore} is no more than the configured max - * capacity and the old execution graphs will be purged if the total added number exceeds the - * max capacity. - */ - @Test - public void testMaximumCapacity() throws IOException { - final File rootDir = temporaryFolder.newFolder(); - - final int maxCapacity = 10; - final int numberExecutionGraphs = 10; - - final Collection oldExecutionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - final Collection newExecutionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - - final Collection jobDetails = generateJobDetails(newExecutionGraphInfos); - - try (final FileExecutionGraphInfoStore executionGraphInfoStore = - new FileExecutionGraphInfoStore( - rootDir, - Duration.ofHours(1L), - maxCapacity, - 10000L, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()), - Ticker.systemTicker())) { - - for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - // no more than the configured maximum capacity - assertTrue(executionGraphInfoStore.size() <= maxCapacity); - } - - for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - // equals to the configured maximum capacity - assertEquals(maxCapacity, executionGraphInfoStore.size()); - } - - // the older execution graphs are purged - assertThat( - executionGraphInfoStore.getAvailableJobDetails(), - Matchers.containsInAnyOrder(jobDetails.toArray())); - } - } - - /** Tests that a session cluster can terminate gracefully when jobs are still running. */ - @Test - public void testPutSuspendedJobOnClusterShutdown() throws Exception { - File rootDir = temporaryFolder.newFolder(); - try (final MiniCluster miniCluster = - new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster( - new MiniClusterConfiguration.Builder().withRandomPorts().build(), - rootDir, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - miniCluster.start(); - final JobVertex vertex = new JobVertex("blockingVertex"); - // The adaptive scheduler expects that every vertex has a configured parallelism - vertex.setParallelism(1); - vertex.setInvokableClass( - ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class); - final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex); - miniCluster.submitJob(jobGraph); - ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await(); - } - } - - private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws IOException { - final ExecutionGraphInfo dummyExecutionGraphInfo = - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder().setState(jobStatus).build()); - final File rootDir = temporaryFolder.newFolder(); - - try (final FileExecutionGraphInfoStore executionGraphStore = - createDefaultExecutionGraphInfoStore( - rootDir, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - - final File storageDirectory = executionGraphStore.getStorageDir(); - - // check that the storage directory is empty - assertThat(storageDirectory.listFiles().length, Matchers.equalTo(0)); - - executionGraphStore.put(dummyExecutionGraphInfo); - - // check that we have persisted the given execution graph - assertThat(storageDirectory.listFiles().length, Matchers.equalTo(1)); - - assertThat( - executionGraphStore.get(dummyExecutionGraphInfo.getJobId()), - new ExecutionGraphInfoStoreTestUtils.PartialExecutionGraphInfoMatcher( - dummyExecutionGraphInfo)); - } - } - - private static Matcher matchesPartiallyWith( - ExecutionGraphInfo executionGraphInfo) { - return new ExecutionGraphInfoStoreTestUtils.PartialExecutionGraphInfoMatcher( - executionGraphInfo); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStoreTest.java new file mode 100644 index 0000000000000..a79b2776e95c9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryArchivedApplicationStoreTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobsOverview; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.util.ManualTicker; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.apache.flink.shaded.guava33.com.google.common.base.Ticker; + +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateApplicationDetails; +import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateJobDetails; +import static org.apache.flink.runtime.dispatcher.ArchivedApplicationStoreTestUtils.generateTerminalArchivedApplications; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for the {@link MemoryArchivedApplicationStore}. */ +class MemoryArchivedApplicationStoreTest extends TestLogger { + + @RegisterExtension + static final TestExecutorExtension EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + /** + * Tests that we can put {@link ArchivedApplication} into the {@link ArchivedApplicationStore} + * and that the application is persisted. + */ + @Test + void testPut() throws IOException { + final ArchivedApplication expectedArchivedApplication = + generateTerminalArchivedApplications(1).iterator().next(); + final ExecutionGraphInfo expectedExecutionGraphInfo = + expectedArchivedApplication.getJobs().values().iterator().next(); + final ApplicationID applicationId = expectedArchivedApplication.getApplicationId(); + final JobID jobId = expectedExecutionGraphInfo.getJobId(); + + try (final ArchivedApplicationStore archivedApplicationStore = + createDefaultArchivedApplicationStore()) { + + // check that the store is empty + assertEquals(0, archivedApplicationStore.size()); + + archivedApplicationStore.put(expectedArchivedApplication); + + // check that we have persisted the given application + assertEquals(1, archivedApplicationStore.size()); + + assertThat( + archivedApplicationStore.get(applicationId).orElse(null), + new ArchivedApplicationStoreTestUtils.PartialArchivedApplicationMatcher( + expectedArchivedApplication)); + + assertThat( + archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null), + new ArchivedApplicationStoreTestUtils.PartialExecutionGraphInfoMatcher( + expectedExecutionGraphInfo)); + } + } + + /** Tests that empty is returned if we request an unknown ApplicationID. */ + @Test + void testUnknownGet() throws IOException { + try (final ArchivedApplicationStore archivedApplicationStore = + createDefaultArchivedApplicationStore()) { + assertNull(archivedApplicationStore.get(new ApplicationID()).orElse(null)); + } + } + + /** Tests that we obtain the correct jobs overview. */ + @Test + void testJobsOverview() throws IOException { + final int numberArchivedApplications = 10; + final Collection archivedApplications = + generateTerminalArchivedApplications(numberArchivedApplications); + + final List jobStatuses = + archivedApplications.stream() + .flatMap( + archivedApplication -> + archivedApplication.getJobs().values().stream()) + .map(ExecutionGraphInfo::getArchivedExecutionGraph) + .map(ArchivedExecutionGraph::getState) + .collect(Collectors.toList()); + + final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses); + + try (final ArchivedApplicationStore archivedApplicationStore = + createDefaultArchivedApplicationStore()) { + for (ArchivedApplication executionGraphInfo : archivedApplications) { + archivedApplicationStore.put(executionGraphInfo); + } + + assertEquals(expectedJobsOverview, archivedApplicationStore.getJobsOverview()); + } + } + + /** Tests that we obtain the correct collection of available jobs. */ + @Test + void testGetJobDetails() throws IOException { + final int numberArchivedApplications = 10; + final Collection archivedApplications = + generateTerminalArchivedApplications(numberArchivedApplications); + + final Collection jobDetails = generateJobDetails(archivedApplications); + + try (final ArchivedApplicationStore archivedApplicationStore = + createDefaultArchivedApplicationStore()) { + for (ArchivedApplication archivedApplication : archivedApplications) { + archivedApplicationStore.put(archivedApplication); + } + + assertThat( + archivedApplicationStore.getJobDetails(), + Matchers.containsInAnyOrder(jobDetails.toArray())); + } + } + + /** Tests that we obtain the correct collection of available application details. */ + @Test + void testGetApplicationDetails() throws IOException { + final int numberArchivedApplications = 10; + final Collection archivedApplications = + generateTerminalArchivedApplications(numberArchivedApplications); + + final Collection applicationDetails = + generateApplicationDetails(archivedApplications); + + try (final ArchivedApplicationStore archivedApplicationStore = + createDefaultArchivedApplicationStore()) { + for (ArchivedApplication archivedApplication : archivedApplications) { + archivedApplicationStore.put(archivedApplication); + } + + assertThat( + archivedApplicationStore.getApplicationDetails(), + Matchers.containsInAnyOrder(applicationDetails.toArray())); + } + } + + /** Tests that an expired application is removed from the store. */ + @Test + void testArchivedApplicationExpiration() throws Exception { + final ArchivedApplication archivedApplication = + generateTerminalArchivedApplications(1).iterator().next(); + final ExecutionGraphInfo executionGraphInfo = + archivedApplication.getJobs().values().iterator().next(); + final ApplicationID applicationId = archivedApplication.getApplicationId(); + final JobID jobId = executionGraphInfo.getJobId(); + + final Duration expirationTime = Duration.ofMillis(1L); + + final ManuallyTriggeredScheduledExecutor scheduledExecutor = + new ManuallyTriggeredScheduledExecutor(); + + final ManualTicker manualTicker = new ManualTicker(); + + try (final MemoryArchivedApplicationStore archivedApplicationStore = + new MemoryArchivedApplicationStore( + expirationTime, Integer.MAX_VALUE, scheduledExecutor, manualTicker)) { + + archivedApplicationStore.put(archivedApplication); + + // there should one application + assertEquals(1, archivedApplicationStore.size()); + + manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS); + + // this should trigger the cleanup after expiration + scheduledExecutor.triggerScheduledTasks(); + + // check that the store is empty + assertEquals(0, archivedApplicationStore.size()); + + assertNull(archivedApplicationStore.get(applicationId).orElse(null)); + + assertNull(archivedApplicationStore.getExecutionGraphInfo(jobId).orElse(null)); + } + } + + /** Tests that all applications are cleaned up after closing the store. */ + @Test + void testCloseCleansUp() throws IOException { + final ArchivedApplication archivedApplication = + generateTerminalArchivedApplications(1).iterator().next(); + + try (final ArchivedApplicationStore archivedApplicationStore = + createDefaultArchivedApplicationStore()) { + + assertEquals(0, archivedApplicationStore.size()); + + archivedApplicationStore.put(archivedApplication); + + assertEquals(1, archivedApplicationStore.size()); + + archivedApplicationStore.close(); + assertEquals(0, archivedApplicationStore.size()); + } + } + + /** + * Tests that the size of {@link ArchivedApplicationStore} is no more than the configured max + * capacity and the old applications will be purged if the total added number exceeds the max + * capacity. + */ + @Test + void testMaximumCapacity() throws IOException { + final int maxCapacity = 10; + final int numberArchivedApplications = 10; + + final Collection oldArchivedApplications = + generateTerminalArchivedApplications(numberArchivedApplications); + final Collection newArchivedApplications = + generateTerminalArchivedApplications(numberArchivedApplications); + + final Collection applicationDetails = + generateApplicationDetails(newArchivedApplications); + + try (final ArchivedApplicationStore archivedApplicationStore = + createArchivedApplicationStore(maxCapacity)) { + + for (ArchivedApplication archivedApplication : oldArchivedApplications) { + archivedApplicationStore.put(archivedApplication); + // no more than the configured maximum capacity + assertTrue(archivedApplicationStore.size() <= maxCapacity); + } + + for (ArchivedApplication archivedApplication : newArchivedApplications) { + archivedApplicationStore.put(archivedApplication); + // equals to the configured maximum capacity + assertEquals(maxCapacity, archivedApplicationStore.size()); + } + + // the older applications are purged + assertThat( + archivedApplicationStore.getApplicationDetails(), + Matchers.containsInAnyOrder(applicationDetails.toArray())); + } + } + + ArchivedApplicationStore createDefaultArchivedApplicationStore() throws IOException { + return new MemoryArchivedApplicationStore(); + } + + ArchivedApplicationStore createArchivedApplicationStore(int maximumCapacity) + throws IOException { + return new MemoryArchivedApplicationStore( + Duration.ofHours(1), + maximumCapacity, + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()), + Ticker.systemTicker()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java deleted file mode 100644 index 668d8d6781512..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; -import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; - -import org.junit.ClassRule; -import org.junit.Test; - -import java.util.concurrent.ScheduledExecutorService; - -/** Tests for the {@link MemoryExecutionGraphInfoStore}. */ -public class MemoryExecutionGraphInfoStoreITCase extends TestLogger { - - @ClassRule - public static final TestExecutorResource EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorResource(); - - /** Tests that a session cluster can terminate gracefully when jobs are still running. */ - @Test - public void testPutSuspendedJobOnClusterShutdown() throws Exception { - Configuration configuration = new Configuration(); - configuration.set(JobManagerOptions.JOB_STORE_TYPE, JobManagerOptions.JobStoreType.Memory); - try (final MiniCluster miniCluster = - new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster( - new MiniClusterConfiguration.Builder() - .withRandomPorts() - .setConfiguration(configuration) - .build(), - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - miniCluster.start(); - final JobVertex vertex = new JobVertex("blockingVertex"); - // The adaptive scheduler expects that every vertex has a configured parallelism - vertex.setParallelism(1); - vertex.setInvokableClass( - ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class); - final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex); - miniCluster.submitJob(jobGraph); - ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await(); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java deleted file mode 100644 index cc6cd97734489..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.messages.webmonitor.JobsOverview; -import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; -import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.runtime.util.ManualTicker; -import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; -import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; -import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; - -import org.apache.flink.shaded.guava33.com.google.common.base.Ticker; - -import org.hamcrest.Matchers; -import org.junit.ClassRule; -import org.junit.Test; - -import java.io.IOException; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateJobDetails; -import static org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils.generateTerminalExecutionGraphInfos; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -/** Tests for the {@link MemoryExecutionGraphInfoStore}. */ -public class MemoryExecutionGraphInfoStoreTest extends TestLogger { - - @ClassRule - public static final TestExecutorResource EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorResource(); - - /** - * Tests that we can put {@link ExecutionGraphInfo} into the {@link - * MemoryExecutionGraphInfoStore} and that the graph is persisted. - */ - @Test - public void testPut() throws IOException { - assertPutJobGraphWithStatus(JobStatus.FINISHED); - } - - /** Tests that a SUSPENDED job can be persisted. */ - @Test - public void testPutSuspendedJob() throws IOException { - assertPutJobGraphWithStatus(JobStatus.SUSPENDED); - } - - /** Tests that null is returned if we request an unknown JobID. */ - @Test - public void testUnknownGet() throws IOException { - - try (final MemoryExecutionGraphInfoStore executionGraphStore = - createMemoryExecutionGraphInfoStore()) { - assertThat(executionGraphStore.get(new JobID()), Matchers.nullValue()); - } - } - - /** Tests that we obtain the correct jobs overview. */ - @Test - public void testStoredJobsOverview() throws IOException { - final int numberExecutionGraphs = 10; - final Collection executionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - - final List jobStatuses = - executionGraphInfos.stream() - .map(ExecutionGraphInfo::getArchivedExecutionGraph) - .map(ArchivedExecutionGraph::getState) - .collect(Collectors.toList()); - - final JobsOverview expectedJobsOverview = JobsOverview.create(jobStatuses); - - try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = - createMemoryExecutionGraphInfoStore()) { - for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - } - - assertThat( - executionGraphInfoStore.getStoredJobsOverview(), - Matchers.equalTo(expectedJobsOverview)); - } - } - - /** Tests that we obtain the correct collection of available job details. */ - @Test - public void testAvailableJobDetails() throws IOException { - final int numberExecutionGraphs = 10; - final Collection executionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - - final Collection jobDetails = generateJobDetails(executionGraphInfos); - - try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = - createMemoryExecutionGraphInfoStore()) { - for (ExecutionGraphInfo executionGraphInfo : executionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - } - - assertThat( - executionGraphInfoStore.getAvailableJobDetails(), - Matchers.containsInAnyOrder(jobDetails.toArray())); - } - } - - /** Tests that an expired execution graph is removed from the execution graph store. */ - @Test - public void testExecutionGraphExpiration() throws Exception { - final Duration expirationTime = Duration.ofMillis(1L); - - final ManuallyTriggeredScheduledExecutor scheduledExecutor = - new ManuallyTriggeredScheduledExecutor(); - - final ManualTicker manualTicker = new ManualTicker(); - - try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = - new MemoryExecutionGraphInfoStore( - expirationTime, Integer.MAX_VALUE, scheduledExecutor, manualTicker)) { - - final ExecutionGraphInfo executionGraphInfo = - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder() - .setState(JobStatus.FINISHED) - .build()); - - executionGraphInfoStore.put(executionGraphInfo); - - // there should one execution graph - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1)); - - manualTicker.advanceTime(expirationTime.toMillis(), TimeUnit.MILLISECONDS); - - // this should trigger the cleanup after expiration - scheduledExecutor.triggerScheduledTasks(); - - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); - - assertThat( - executionGraphInfoStore.get(executionGraphInfo.getJobId()), - Matchers.nullValue()); - - // check that the store is empty - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); - } - } - - /** Tests that all job graphs are cleaned up after closing the store. */ - @Test - public void testCloseCleansUp() throws IOException { - try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = - createMemoryExecutionGraphInfoStore()) { - - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); - - executionGraphInfoStore.put( - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder() - .setState(JobStatus.FINISHED) - .build())); - - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(1)); - - executionGraphInfoStore.close(); - assertThat(executionGraphInfoStore.size(), Matchers.equalTo(0)); - } - } - - /** - * Tests that the size of {@link MemoryExecutionGraphInfoStore} is no more than the configured - * max capacity and the old execution graphs will be purged if the total added number exceeds - * the max capacity. - */ - @Test - public void testMaximumCapacity() throws IOException { - final int maxCapacity = 10; - final int numberExecutionGraphs = 10; - - final Collection oldExecutionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - final Collection newExecutionGraphInfos = - generateTerminalExecutionGraphInfos(numberExecutionGraphs); - - final Collection jobDetails = generateJobDetails(newExecutionGraphInfos); - - try (final MemoryExecutionGraphInfoStore executionGraphInfoStore = - createMemoryExecutionGraphInfoStore(Duration.ofHours(1L), maxCapacity)) { - - for (ExecutionGraphInfo executionGraphInfo : oldExecutionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - // no more than the configured maximum capacity - assertTrue(executionGraphInfoStore.size() <= maxCapacity); - } - - for (ExecutionGraphInfo executionGraphInfo : newExecutionGraphInfos) { - executionGraphInfoStore.put(executionGraphInfo); - // equals to the configured maximum capacity - assertEquals(maxCapacity, executionGraphInfoStore.size()); - } - - // the older execution graphs are purged - assertThat( - executionGraphInfoStore.getAvailableJobDetails(), - Matchers.containsInAnyOrder(jobDetails.toArray())); - } - } - - private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws IOException { - final ExecutionGraphInfo dummyExecutionGraphInfo = - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder().setState(jobStatus).build()); - try (final MemoryExecutionGraphInfoStore executionGraphStore = - createMemoryExecutionGraphInfoStore()) { - - // check that the graph store is empty - assertThat(executionGraphStore.size(), Matchers.equalTo(0)); - - executionGraphStore.put(dummyExecutionGraphInfo); - - // check that we have persisted the given execution graph - assertThat(executionGraphStore.size(), Matchers.equalTo(1)); - - assertThat( - executionGraphStore.get(dummyExecutionGraphInfo.getJobId()), - new ExecutionGraphInfoStoreTestUtils.PartialExecutionGraphInfoMatcher( - dummyExecutionGraphInfo)); - } - } - - private MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore() { - return new MemoryExecutionGraphInfoStore(); - } - - private MemoryExecutionGraphInfoStore createMemoryExecutionGraphInfoStore( - Duration expirationTime, int maximumCapacity) { - return new MemoryExecutionGraphInfoStore( - expirationTime, - maximumCapacity, - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()), - Ticker.systemTicker()); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index 81241e1ed2ca5..36efcf9de9531 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -82,7 +82,7 @@ private TestingDispatcher( @Nullable String metricServiceQueryAddress, Executor ioExecutor, HistoryServerArchivist historyServerArchivist, - ExecutionGraphInfoStore executionGraphInfoStore, + ArchivedApplicationStore archivedApplicationStore, JobManagerRunnerFactory jobManagerRunnerFactory, CleanupRunnerFactory cleanupRunnerFactory, DispatcherBootstrapFactory dispatcherBootstrapFactory, @@ -102,7 +102,7 @@ private TestingDispatcher( resourceManagerGatewayRetriever, blobServer, heartbeatServices, - executionGraphInfoStore, + archivedApplicationStore, fatalErrorHandler, historyServerArchivist, metricServiceQueryAddress, @@ -192,8 +192,8 @@ public static class Builder { @Nullable private String metricServiceQueryAddress = null; private Executor ioExecutor = ForkJoinPool.commonPool(); private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE; - private ExecutionGraphInfoStore executionGraphInfoStore = - new MemoryExecutionGraphInfoStore(); + private ArchivedApplicationStore archivedApplicationStore = + new MemoryArchivedApplicationStore(); private JobManagerRunnerFactory jobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory(); private CleanupRunnerFactory cleanupRunnerFactory = new TestingCleanupRunnerFactory(); @@ -288,8 +288,9 @@ public Builder setHistoryServerArchivist(HistoryServerArchivist historyServerArc return this; } - public Builder setExecutionGraphInfoStore(ExecutionGraphInfoStore executionGraphInfoStore) { - this.executionGraphInfoStore = executionGraphInfoStore; + public Builder setArchivedApplicationStore( + ArchivedApplicationStore archivedApplicationStore) { + this.archivedApplicationStore = archivedApplicationStore; return this; } @@ -359,7 +360,7 @@ public TestingDispatcher build(RpcService rpcService) throws Exception { metricServiceQueryAddress, ioExecutor, historyServerArchivist, - executionGraphInfoStore, + archivedApplicationStore, jobManagerRunnerFactory, cleanupRunnerFactory, dispatcherBootstrapFactory, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java index c8004311eb118..fa11efafe3ce7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java @@ -55,7 +55,7 @@ public TestingPartialDispatcherServices( BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroupFactory jobManagerMetricGroupFactory, - ExecutionGraphInfoStore executionGraphInfoStore, + ArchivedApplicationStore archivedApplicationStore, FatalErrorHandler fatalErrorHandler, HistoryServerArchivist historyServerArchivist, @Nullable String metricQueryServiceAddress, @@ -69,7 +69,7 @@ public TestingPartialDispatcherServices( blobServer, heartbeatServices, jobManagerMetricGroupFactory, - executionGraphInfoStore, + archivedApplicationStore, fatalErrorHandler, historyServerArchivist, metricQueryServiceAddress, @@ -93,8 +93,8 @@ public static class Builder { private HeartbeatServices heartbeatServices = new TestingHeartbeatServices(); private JobManagerMetricGroupFactory jobManagerMetricGroupFactory = UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup; - private ExecutionGraphInfoStore executionGraphInfoStore = - new MemoryExecutionGraphInfoStore(); + private ArchivedApplicationStore archivedApplicationStore = + new MemoryArchivedApplicationStore(); private FatalErrorHandler fatalErrorHandler = NoOpFatalErrorHandler.INSTANCE; private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE; @Nullable private String metricQueryServiceAddress = null; @@ -131,9 +131,9 @@ public Builder withJobManagerMetricGroupFactory( return this; } - public Builder withExecutionGraphInfoStore( - ExecutionGraphInfoStore executionGraphInfoStore) { - this.executionGraphInfoStore = executionGraphInfoStore; + public Builder withArchivedApplicationStore( + ArchivedApplicationStore archivedApplicationStore) { + this.archivedApplicationStore = archivedApplicationStore; return this; } @@ -178,7 +178,7 @@ public TestingPartialDispatcherServices build( blobServer, heartbeatServices, jobManagerMetricGroupFactory, - executionGraphInfoStore, + archivedApplicationStore, fatalErrorHandler, historyServerArchivist, metricQueryServiceAddress, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index 7271011a49aad..0e4f14d168991 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; @@ -174,7 +174,7 @@ void testResourceCleanupUnderLeadershipChange() throws Exception { blobServer, new TestingHeartbeatServices(), UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup, - new MemoryExecutionGraphInfoStore(), + new MemoryArchivedApplicationStore(), fatalErrorHandler, VoidHistoryServerArchivist.INSTANCE, null, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java index 86796fdeba595..80bff9e750069 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java @@ -25,8 +25,8 @@ import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; import org.apache.flink.runtime.dispatcher.PartialDispatcherServices; import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner; import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory; @@ -389,9 +389,9 @@ private TestingEntryPoint( } @Override - protected ExecutionGraphInfoStore createSerializableExecutionGraphStore( + protected ArchivedApplicationStore createArchivedApplicationStore( Configuration configuration, ScheduledExecutor scheduledExecutor) { - return new MemoryExecutionGraphInfoStore(); + return new MemoryArchivedApplicationStore(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java index ae5178310b242..d6d5887d41770 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterUncaughtExceptionHandlerITCase.java @@ -20,7 +20,7 @@ import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; @@ -97,7 +97,7 @@ protected ClusterTestingEntrypoint(Configuration configuration) { } @Override - protected ExecutionGraphInfoStore createSerializableExecutionGraphStore( + protected ArchivedApplicationStore createArchivedApplicationStore( Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException { return null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java index fe02100f73d06..f09807690f86e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.dispatcher.DispatcherGateway; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -175,7 +175,7 @@ protected HighAvailabilityServices createHighAvailabilityServices( heartbeatServices, delegationTokenManager, metricRegistry, - new MemoryExecutionGraphInfoStore(), + new MemoryArchivedApplicationStore(), metricQueryServiceRetriever, Collections.emptySet(), fatalErrorHandler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java index 5f99ff4b04425..055cc412c026c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationDetailsHandlerTest.java @@ -80,7 +80,7 @@ void setUp() throws HandlerRequestException { "Test Application", ApplicationState.FINISHED, new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, - Collections.emptyList()); + Collections.emptyMap()); handlerRequest = createRequest(archivedApplication.getApplicationId()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java index de8e41dede7e6..be699f8842513 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/application/ApplicationsOverviewHandlerTest.java @@ -77,7 +77,7 @@ void setUp() throws HandlerRequestException { "Test Application", ApplicationState.FINISHED, new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L}, - Collections.emptyList()); + Collections.emptyMap()); testingRestfulGateway = new TestingRestfulGateway.Builder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java index eadbfd2cee52d..5d11e5772c228 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java @@ -21,8 +21,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; @@ -117,10 +117,10 @@ protected TestingClusterEntrypoint(Configuration configuration, File markerFile) } @Override - protected ExecutionGraphInfoStore createSerializableExecutionGraphStore( + protected ArchivedApplicationStore createArchivedApplicationStore( Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException { - return new MemoryExecutionGraphInfoStore(); + return new MemoryArchivedApplicationStore(); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index fb109741f9454..1cf9590f14e5a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -32,7 +32,7 @@ import org.apache.flink.core.testutils.EachCallbackWrapper; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; @@ -159,7 +159,7 @@ void testCancelingOnProcessFailure() throws Throwable { new HeartbeatServicesImpl(100L, 10000L, 2), new NoOpDelegationTokenManager(), NoOpMetricRegistry.INSTANCE, - new MemoryExecutionGraphInfoStore(), + new MemoryArchivedApplicationStore(), VoidMetricQueryServiceRetriever.INSTANCE, Collections.emptySet(), fatalErrorHandler);