Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 24 additions & 24 deletions docs/layouts/shortcodes/generated/all_jobmanager_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>completed-application-store.cache-size</h5></td>
<td style="word-wrap: break-word;">52428800</td>
<td>Long</td>
<td>The cache size in bytes which is used to keep completed applications in memory.</td>
</tr>
<tr>
<td><h5>completed-application-store.expiration-time</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The time after which a completed application expires and is purged from the store.</td>
</tr>
<tr>
<td><h5>completed-application-store.max-capacity</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td>Integer</td>
<td>The max number of completed applications that can be kept in the store. NOTICE: if memory store keeps too many applications in session cluster, it may cause FullGC or OOM in jm.</td>
</tr>
<tr>
<td><h5>completed-application-store.type</h5></td>
<td style="word-wrap: break-word;">File</td>
<td><p>Enum</p></td>
<td>Determines which store implementation is used in session cluster. Accepted values are:<ul><li>'File': the file store keeps the completed applications in files</li><li>'Memory': the memory store keeps the completed applications in memory. You may need to limit the <code class="highlighter-rouge">completed-application-store.max-capacity</code> to mitigate FullGC or OOM when there are too many applications</li></ul><br /><br />Possible values:<ul><li>"File"</li><li>"Memory"</li></ul></td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling</h5></td>
<td style="word-wrap: break-word;">30 s</td>
Expand Down Expand Up @@ -134,30 +158,6 @@
<td><p>Enum</p></td>
<td>Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler. <br /><br />Possible values:<ul><li>"Default": Default scheduler</li><li>"Adaptive": Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>"AdaptiveBatch": Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-batch-scheduler">here</a>.</li></ul></td>
</tr>
<tr>
<td><h5>jobstore.cache-size</h5></td>
<td style="word-wrap: break-word;">52428800</td>
<td>Long</td>
<td>The job store cache size in bytes which is used to keep completed jobs in memory.</td>
</tr>
<tr>
<td><h5>jobstore.expiration-time</h5></td>
<td style="word-wrap: break-word;">3600</td>
<td>Long</td>
<td>The time in seconds after which a completed job expires and is purged from the job store.</td>
</tr>
<tr>
<td><h5>jobstore.max-capacity</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td>Integer</td>
<td>The max number of completed jobs that can be kept in the job store. NOTICE: if memory store keeps too many jobs in session cluster, it may cause FullGC or OOM in jm.</td>
</tr>
<tr>
<td><h5>jobstore.type</h5></td>
<td style="word-wrap: break-word;">File</td>
<td><p>Enum</p></td>
<td>Determines which job store implementation is used in session cluster. Accepted values are:<ul><li>'File': the file job store keeps the archived execution graphs in files</li><li>'Memory': the memory job store keeps the archived execution graphs in memory. You may need to limit the <code class="highlighter-rouge">jobstore.max-capacity</code> to mitigate FullGC or OOM when there are too many graphs</li></ul><br /><br />Possible values:<ul><li>"File"</li><li>"Memory"</li></ul></td>
</tr>
<tr>
<td><h5>web.exception-history-size</h5></td>
<td style="word-wrap: break-word;">16</td>
Expand Down
48 changes: 24 additions & 24 deletions docs/layouts/shortcodes/generated/job_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>completed-application-store.cache-size</h5></td>
<td style="word-wrap: break-word;">52428800</td>
<td>Long</td>
<td>The cache size in bytes which is used to keep completed applications in memory.</td>
</tr>
<tr>
<td><h5>completed-application-store.expiration-time</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The time after which a completed application expires and is purged from the store.</td>
</tr>
<tr>
<td><h5>completed-application-store.max-capacity</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td>Integer</td>
<td>The max number of completed applications that can be kept in the store. NOTICE: if memory store keeps too many applications in session cluster, it may cause FullGC or OOM in jm.</td>
</tr>
<tr>
<td><h5>completed-application-store.type</h5></td>
<td style="word-wrap: break-word;">File</td>
<td><p>Enum</p></td>
<td>Determines which store implementation is used in session cluster. Accepted values are:<ul><li>'File': the file store keeps the completed applications in files</li><li>'Memory': the memory store keeps the completed applications in memory. You may need to limit the <code class="highlighter-rouge">completed-application-store.max-capacity</code> to mitigate FullGC or OOM when there are too many applications</li></ul><br /><br />Possible values:<ul><li>"File"</li><li>"Memory"</li></ul></td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling</h5></td>
<td style="word-wrap: break-word;">30 s</td>
Expand Down Expand Up @@ -188,30 +212,6 @@
<td><p>Enum</p></td>
<td>Determines which scheduler implementation is used to schedule tasks. If this option is not explicitly set, batch jobs will use the 'AdaptiveBatch' scheduler as the default, while streaming jobs will default to the 'Default' scheduler. <br /><br />Possible values:<ul><li>"Default": Default scheduler</li><li>"Adaptive": Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>"AdaptiveBatch": Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-batch-scheduler">here</a>.</li></ul></td>
</tr>
<tr>
<td><h5>jobstore.cache-size</h5></td>
<td style="word-wrap: break-word;">52428800</td>
<td>Long</td>
<td>The job store cache size in bytes which is used to keep completed jobs in memory.</td>
</tr>
<tr>
<td><h5>jobstore.expiration-time</h5></td>
<td style="word-wrap: break-word;">3600</td>
<td>Long</td>
<td>The time in seconds after which a completed job expires and is purged from the job store.</td>
</tr>
<tr>
<td><h5>jobstore.max-capacity</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td>Integer</td>
<td>The max number of completed jobs that can be kept in the job store. NOTICE: if memory store keeps too many jobs in session cluster, it may cause FullGC or OOM in jm.</td>
</tr>
<tr>
<td><h5>jobstore.type</h5></td>
<td style="word-wrap: break-word;">File</td>
<td><p>Enum</p></td>
<td>Determines which job store implementation is used in session cluster. Accepted values are:<ul><li>'File': the file job store keeps the archived execution graphs in files</li><li>'Memory': the memory job store keeps the archived execution graphs in memory. You may need to limit the <code class="highlighter-rouge">jobstore.max-capacity</code> to mitigate FullGC or OOM when there are too many graphs</li></ul><br /><br />Possible values:<ul><li>"File"</li><li>"Memory"</li></ul></td>
</tr>
<tr>
<td><h5>scheduler-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.ArchivedApplicationStore;
import org.apache.flink.runtime.dispatcher.MemoryArchivedApplicationStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
Expand Down Expand Up @@ -78,9 +78,9 @@ protected ApplicationClusterEntryPoint(
}

@Override
protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
protected ArchivedApplicationStore createArchivedApplicationStore(
final Configuration configuration, final ScheduledExecutor scheduledExecutor) {
return new MemoryExecutionGraphInfoStore();
return new MemoryArchivedApplicationStore();
}

protected static void configureExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,23 @@ public class JobManagerOptions {
.withDescription(
"Directory for JobManager to store the archives of completed jobs.");

/** The job store cache size in bytes which is used to keep completed jobs in memory. */
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
/**
* @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_CACHE_SIZE}
*/
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Long> JOB_STORE_CACHE_SIZE =
key("jobstore.cache-size")
.longType()
.defaultValue(50L * 1024L * 1024L)
.withDescription(
"The job store cache size in bytes which is used to keep completed jobs in memory.");

/** The time in seconds after which a completed job expires and is purged from the job store. */
/**
* @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_EXPIRATION_TIME}
*/
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long> JOB_STORE_EXPIRATION_TIME =
key("jobstore.expiration-time")
Expand All @@ -326,9 +333,11 @@ public class JobManagerOptions {
.withDescription(
"The time in seconds after which a completed job expires and is purged from the job store.");

/** The max number of completed jobs that can be kept in the job store. */
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
@Documentation.OverrideDefault("infinite")
/**
* @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_MAX_CAPACITY}
*/
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Integer> JOB_STORE_MAX_CAPACITY =
key("jobstore.max-capacity")
.intType()
Expand All @@ -337,8 +346,11 @@ public class JobManagerOptions {
"The max number of completed jobs that can be kept in the job store. "
+ "NOTICE: if memory store keeps too many jobs in session cluster, it may cause FullGC or OOM in jm.");

/** Config parameter determining the job store implementation in session cluster. */
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
/**
* @deprecated Use {@link JobManagerOptions#COMPLETED_APPLICATION_STORE_TYPE}
*/
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<JobStoreType> JOB_STORE_TYPE =
key("jobstore.type")
.enumType(JobStoreType.class)
Expand All @@ -362,6 +374,74 @@ public enum JobStoreType {
Memory
}

/** The cache size in bytes which is used to keep completed applications in memory. */
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long> COMPLETED_APPLICATION_STORE_CACHE_SIZE =
key("completed-application-store.cache-size")
.longType()
.defaultValue(50L * 1024L * 1024L)
.withDeprecatedKeys("jobstore.cache-size")
.withDescription(
"The cache size in bytes which is used to keep completed applications in memory.");

/** The time after which a completed application expires and is purged from the store. */
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Duration> COMPLETED_APPLICATION_STORE_EXPIRATION_TIME =
key("completed-application-store.expiration-time")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription(
"The time after which a completed application expires and is purged from the store.");

/** The max number of completed applications that can be kept in the store. */
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
@Documentation.OverrideDefault("infinite")
public static final ConfigOption<Integer> COMPLETED_APPLICATION_STORE_MAX_CAPACITY =
key("completed-application-store.max-capacity")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDeprecatedKeys("jobstore.max-capacity")
.withDescription(
"The max number of completed applications that can be kept in the store. "
+ "NOTICE: if memory store keeps too many applications in session cluster, it may cause FullGC or OOM in jm.");

/**
* Config parameter determining the store implementation in session cluster.
*
* <p>FLINK-38845 replaces job store with application store because every job is now associated
* with an application. The legacy job store expires jobs individually, risking partial loss of
* an application's job history and breaking application-level consistency. The application
* store manages and expires applications (with all its jobs) as a whole, ensuring complete,
* consistent, and queryable application state until explicitly discarded.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<ArchivedApplicationStoreType>
COMPLETED_APPLICATION_STORE_TYPE =
key("completed-application-store.type")
.enumType(ArchivedApplicationStoreType.class)
.defaultValue(ArchivedApplicationStoreType.File)
.withDeprecatedKeys("jobstore.type")
.withDescription(
Description.builder()
.text(
"Determines which store implementation is used in session cluster. Accepted values are:")
.list(
text(
"'File': the file store keeps the completed applications in files"),
text(
"'Memory': the memory store keeps the completed applications in memory. You"
+ " may need to limit the %s to mitigate FullGC or OOM when there are too many applications",
code(
COMPLETED_APPLICATION_STORE_MAX_CAPACITY
.key())))
.build());

/** Type of archived application store implementation. */
public enum ArchivedApplicationStoreType {
File,
Memory
}

/**
* Flag indicating whether JobManager would retrieve canonical host name of TaskManager during
* registration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -63,6 +65,14 @@ public abstract class AbstractApplication implements Serializable {

private final Set<JobID> jobs = new HashSet<>();

/**
* List of registered application status listeners that will be notified via {@link
* ApplicationStatusListener#notifyApplicationStatusChange} when the application state changes.
* For example, the Dispatcher registers itself as a listener to perform operations such as
* archiving when the application reaches a terminal state.
*/
private final transient List<ApplicationStatusListener> statusListeners = new ArrayList<>();

public AbstractApplication(ApplicationID applicationId) {
this.applicationId = checkNotNull(applicationId);
this.statusTimestamps = new long[ApplicationState.values().length];
Expand Down Expand Up @@ -130,6 +140,15 @@ public ApplicationState getApplicationStatus() {
return applicationState;
}

/**
* Registers a status listener.
*
* <p>This method is not thread-safe and should not be called concurrently.
*/
public void registerStatusListener(ApplicationStatusListener listener) {
statusListeners.add(listener);
}

// ------------------------------------------------------------------------
// State Transitions
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -196,6 +215,8 @@ void transitionState(ApplicationState targetState) {
targetState);
this.statusTimestamps[targetState.ordinal()] = System.currentTimeMillis();
this.applicationState = targetState;
statusListeners.forEach(
listener -> listener.notifyApplicationStatusChange(applicationId, targetState));
}

private void validateTransition(ApplicationState targetState) {
Expand Down
Loading