DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Current state | Accepted |
|---|---|
| Discussion thread | https://lists.apache.org/thread/7wtj8x3zr6fps4hrkk37lb1cyptpqvzd |
| JIRA | FLINK-38972 - Getting issue details... STATUS |
| Released | <Flink Version> |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-549 introduced the Application concept to Flink, along with its entities and management interfaces, thereby improving the observability and manageability of user logic execution. Building on this foundation, this FLIP further optimizes the Application architecture to enhance system stability, flexibility, and user experience. Specifically, the optimizations focus on the following aspects.
1. Improved High Availability Guarantees
Currently, Flink handles JobManager failures through High Availability (HA), which can restore running jobs after a failure. However, this mechanism has several practical limitations:
In Session Mode, the execution of the user's
mainmethod cannot be recovered. If user logic needs to perform additional actions after a job completes (e.g., cleaning up external resources), this follow-up logic will be lost after an HA recovery, resulting in an incomplete workflow.In Application Mode, although the user's
mainmethod execution can be recovered, it may lead to incorrect job recovery. Consider the following code example, theconditionistruebefore a failure, and the system submits and runs Job 1. After the HA recovery, an environmental change may cause theconditionto becomefalse, in which case Job 2 should be submitted instead of recovering Job 1. However, the current implementation directly recovers Job 1. Moreover, when the execution reachesenv.execute("Job 2"), it uses the same job ID as the previous Job 1 and does not submit a new job. This silently masks the logical switch that should have occurred, causing the job's behavior to diverge from expectations and making it difficult to troubleshoot.
public static void main() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
if (condition) {
...
env.execute("Job 1");
} else {
...
env.execute("Job 2");
}
}
We plan to provide more robust High Availability guarantees based on the new Application architecture. This will ensure that the user's main method execution can be recovered after a JobManager failure in both Session and Application modes, while correctly handling the associated job recovery and execution logic.
2. More Flexible Job Execution Logic
Currently, in typical production scenarios (such as Application Mode with High Availability enabled), Flink enforces that the user's main method must execute exactly one job; running multiple jobs or none at all results in an error. This restricts the use of Flink in complex scenarios, such as executing multiple jobs sequentially (a common pattern in batch processing) or skipping job execution under certain conditions. We plan to support running multiple jobs or no jobs within a single application.
3. More User-friendly Exposure of Exception Information
Currently, Flink provides job-level exception information, recording errors that occur during job execution and exposing them through the REST API /jobs/:jobid/exceptions. However, it cannot provide information for errors in the main method that are unrelated to job execution, such as exceptions thrown in the user logic. Users typically have to rely on logs to diagnose the causes of such errors.
On the basis of application entity, we plan to expose application-level exception information via a new REST API and support archiving it. This will allow users to access exceptions that occur during main method execution, making it easier to diagnose issues.
Proposed Design
Application High Availability
Currently, Flink HA mode can automatically recover running jobs after a JobManager failure, but it has the following limitations:
In Application mode, after an HA recovery, the user’s
mainmethod is executed again. However, this mode does not support submitting multiple jobs from themainmethod; and non-deterministic job-submission logic can even result in incorrect job recovery. The root cause is the lack of an effective job-matching mechanism: the current system relies on a fixed job ID (either user-specified or derived from the cluster ID) to associate jobs before and after a failure, so that it can reuse ExecutionPlan, Checkpoint or JobResult. This approach cannot handle scenarios where multiple jobs are submitted inmain, or where the job-submission logic is non-deterministic.In Session mode, HA can only recover the jobs that were running before the failure; it cannot re-run the user’s
mainmethod. As a result, other logic inmainis lost, which can compromise the semantic consistency.
To address these issues, we propose an enhanced Application High Availability solution, which includes the following aspects.
Establish an effective job-matching mechanism that can correctly associate the jobs submitted by an application before and after a failure, even in scenarios involving multiple jobs or non-deterministic job-submission logic.
Based on accurate job matching, apply appropriate handling strategies for jobs in different statuses within a running application before the failure:
Running jobs: if the restored code path can still cover the original job, resume its execution; otherwise, skip recovery for that job and ensure that all residual states and resources are completely cleaned up.
Terminated jobs: do not re-run them after recovery, while ensuring that all related states and resources are eventually cleaned up.
- Extend the HA capabilities in Session Mode to support re-running the user’s
mainmethod after recovery, thereby providing HA guarantees aligned with those in Application Mode. - Finally, properly handle applications that have already reached a terminal status at the time of failure, ensuring that their jobs are not re-executed during HA recovery and that no resources are left behind.
Figure 1 illustrates the overall workflow of normal application execution and HA recovery. The newly added key components include:
Application-level Blob and ApplicationStore (see the Re-execute Applications in Session Mode section);
ApplicationResultStore (see the Application and Job Cleanup section).
Figure 1. Workflow of Normal Application Execution and HA Recovery
Match Jobs Correctly after Recovery
The goal of this section is to correctly match jobs before and after a JobManager failure so that their ExecutionPlan, Checkpoint, JobResult, and other information can be reused. This provides the foundation for Application High Availability.
Currently, the system relies on a fixed job ID (either specified by the user or generated from high-availability.cluster-id) to associate jobs before and after a failure. This approach cannot handle scenarios where multiple jobs are submitted in the user's main method or where the job-submission logic is non-deterministic. To address this, we propose a job-name-based ID assignment and matching mechanism. This mechanism assigns a globally unique ID to each job based on its name and uses the name to match jobs during recovery, ensuring the following properties:
Backward Compatibility: The first submitted job continues to use the system's existing fixed job ID, fully preserving backard compatibility.
Uniqueness: In multi-job scenarios, each job is assigned a globally unique ID.
Matching Correctness: Jobs are matched by name during recovery, preventing incorrect associations between jobs with different names. For jobs sharing the same name, association strictly follows their submission order.
Users can specify a job name via the existing execute interface in StreamExecutionEnvironment:
JobExecutionResult execute(String jobName);
Note: When multiple jobs with the same name exist within a single application, their matching result during recovery depends on the submission order. Therefore, correct matching is guaranteed only when the job submission order is deterministic.
To ensure accurate job association during failure recovery in all scenarios (especially when job submission logic is non-deterministic), we strongly recommend that users explicitly assign a unique name to each job, as shown in the example below.
public static void main(String[] args) {
final boolean condition = Boolean.parseBoolean(System.getenv("ENV_FLAG"));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.execute("Job 1");
if (condition) {
...
env.execute("Job 2");
}
...
env.executeAsync("Job 3");
}
Handle Jobs in Running Applications
For an application that is running before a JM failover, the jobs it has submitted can be in one of two statuses: running or terminated. When the application is re-executed after HA recovery, these jobs should be handled differently according to their status, as follows.
1. Running Jobs
Based on correct job matching, this section describes how to handle running jobs before the failure.
Currently, when a job is submitted, its ExecutionPlan is written to the ExecutionPlanStore and kept there for the entire duration of the job. In the existing implementation, after a JM failover, all jobs in the ExecutionPlanStore are immediately recovered. However, if the execution path of the user’s main method after recovery no longer reaches these jobs, this will lead to incorrect job recovery. Therefore, we plan to defer job recovery and only trigger it when the re-executed main method reaches the submission point of the corresponding job.
For jobs that were running before the failure but whose submission points are not reached when the main method is re-executed after recovery, the system should still display their basic information (such as job name) based on the ExecutionPlan, and ensure that any remaining job resources (including the ExecutionPlan and checkpoints) are eventually cleaned up.
2. Terminated Jobs
Based on correct job matching, this section describes how to handle terminated jobs before the failure.
In the current system, once a job reaches a terminal status, a dirty JobResult is created; after the related resources are cleaned up, it is marked as clean. By default, this JobResult is then deleted (the deletion can be disabled by setting job-result-store.delete-on-commit to false, in which case users must clean it up manually). However, if the application to which the job belongs has not yet terminated, and a JM failover occurs, the system will re-execute the application. Since the JobResult may have already been deleted, the system can no longer tell that the job has finished, which leads to resubmitting jobs that have already terminated. We plan to delay marking a JobResult as clean until after the entire application has terminated. This way, when the application is re-executed after a JM failover, it can still see the previously terminated jobs and thereby avoid resubmitting them.
After HA recovery, the application retrieves all associated dirty JobResults, performs the necessary resource cleanup, and restores basic job information (such as name, status, and start/end time) for display in the Web UI.
Note: In the current design, users cannot determine whether the recovered application execution path still covers these terminated jobs. This can be considered as a future enhancement if needed.
Re-execute Applications in Session Mode
In Application Mode, the JM loads and automatically executes the bootstrap application on startup, so after an HA recovery, it naturally re-runs that application. In Session Mode, however, the system cannot re-run the user-submitted application after an HA recovery, which may result in loss of user logic it contains. The root cause is that, in Session Mode, the application metadata is not persisted to HA storage, so the execution context cannot be reconstructed after a JM failover. To address this, we plan to introduce the following mechanisms.
1. Introduce ApplicationStore
Similar to how ExecutionPlanStore is used to record information about running jobs, we plan to add an ApplicationStore to persist information about running applications, so that applications can be restored after HA recovery in Session mode.
2. Support Application-Level Blob Storage
Currently, user jars are only uploaded via job-level Blob services and are not available at the application level, which means that after HA recovery in Session mode, the dependencies required to re-execute an application are missing. Therefore, we plan to introduce application-level Blob storage. When the /jars/:jarid/run-application API is called, the user jar will be uploaded to the application-level Blob store. At the same time, we will adjust the job submission logic so that jobs reuse the jar Blob associated with their application, avoiding performance regressions from repeatedly uploading the same jar.
Application and Job Cleanup
The previous two sections (Handle Jobs in Running Applications and Re-execute Applications in Session Mode) ensure that applications running at the time of a JM failover can be correctly recovered, and that their jobs can be properly recovered or cleaned up. This section focuses on another scenario: the application has already reached a terminal status when the JM failover occurs. Our goal is to ensure that, after HA recovery, such applications do not incorrectly re-run their jobs, while also avoiding resource leaks.
Currently, terminated applications exhibit the following issues during HA recovery:
In Application Mode, after an application reaches a terminal status, the system will by default clean up the JobResults of its jobs. If a JM failover occurs after this cleanup has completed, the system can no longer determine whether the jobs have already been executed, and may incorrectly trigger them to run again.
In Session Mode, after an application reaches a terminal status, in addition to JobResults, the ApplicationStore and application-level Blob resources must also be cleaned up. If a JM failover occurs before this cleanup is fully completed, the following abnormal situations may arise:
Errors when recovering the application, e.g., the ApplicationStore has not yet been cleaned up, but the user jar Blob have already been removed.
Incorrect re-execution of jobs, e.g., the ApplicationStore and Blobs have not yet been cleaned up, but the JobResults have already been deleted.
Resource leaks, e.g., the ApplicationStore has been cleaned up, but the Blobs have not.
To address these issues, terminated applications that are in the cleanup phase must be explicitly marked. This allows the system to recognize them after a JM failover, avoid re-running the application (thus avoiding incorrect job re-execution), and resume resource cleanup instead. We plan to implement this via ApplicationResults, designed as follows.
1. Introduce ApplicationResultStore
When an application reaches a terminal status, the system will create an ApplicationResult for it and store it in a high-available way via ApplicationResultStore. After a JM failover, the system can use the ApplicationResult to determine that the application has already terminated, thereby skipping re-execution and resuming cleanup (including cleaning up JobResults and releasing application-level resources).
Once the application’s cleanup has finished, the ApplicationResult itself should also be removed. However, if a JM failover occurs after the ApplicationResult has been deleted, the system can no longer tell that the application has completed: in Application Mode, this leads to the application and its jobs being executed again; in Session Mode, it becomes impossible to distinguish re-submitted applications (and their jobs) from the originals. To address this, we plan to introduce a configuration option application-result-store.delete-on-commit, which controls whether the ApplicationResult is automatically deleted after cleanup. When it is set to false, the system retains the ApplicationResult, thus providing stronger consistency guarantees at the cost of requiring manual cleanup. To track cleanup progress and avoid redundant work, the ApplicationResult is marked as dirty before application cleanup is complete, and updated to clean afterwards. For details on this configuration, see the Configuration section under Public Interfaces.
2. Application and Job Cleanup Workflow
After introducing ApplicationResultStore, the application and its jobs have their own independent termination and cleanup workflow: create a dirty result → perform cleanup → mark the result as clean. However, there is no coordination between them, which can lead to inconsistencies after HA recovery, causing resource leaks or repeated execution. Concretely, the issue appears in the following three scenarios:
Creating a dirty ApplicationResult too early: If a dirty ApplicationResult is created before some jobs have created their dirty JobResults, and a JM failover occurs at this point, the system will assume the application does not need to be re-executed and therefore will not re-run these jobs. At the same time, since their corresponding JobResults are missing, these jobs cannot be properly cleaned up, leading to resource leaks.
Marking a JobResult as clean too early: If a job marks its JobResult as clean (by default, this deletes the JobResult) before the dirty ApplicationResult is created, and a JM failover occurs, the system will re-execute the application. However, since the JobResult has already been deleted, this job will be executed again incorrectly.
Marking an ApplicationResult as clean too early: If the ApplicationResult is marked as clean before some jobs have finished cleanup, and a JM failover occurs, the system will no longer process this application, thereby skipping cleanup for those jobs and causing resource leaks.
To ensure consistency between the status of an application and its jobs after HA recovery, the termination and cleanup order must be strictly controlled, as illustrated in Figure 2 (the gray area shows the existing job termination process). The key constraints are as follows.
Prerequisite for creating a dirty ApplicationResult: All jobs within the application must have already created their dirty JobResults. Implication: The application can be marked as "no need to re-run" only when none of its jobs need to be re-run.
Prerequisite for marking a JobResult as clean: The corresponding application must already have a dirty ApplicationResult. Implication: A job’s "no need to re-run" marker may be cleared only under the premise that the application itself no longer needs to be re-run.
Prerequisite for marking an ApplicationResult as clean: All jobs in the application must have their JobResults marked as clean. Implication: The application can be considered fully cleaned up only after cleanup has completed for all its jobs.
Figure 2. Application and Job Termination Workflow
With this coordination mechanism, once an application enters the termination process, the statuses of the application and its jobs fall into three typical cases, as shown in Table 1. For each case, if a JM failover occurs, the system applies the corresponding handling strategy to ensure that, regardless of the failover point, the application and job statuses remain consistent, effectively preventing job re-execution and resource leaks.
ApplicationResult | JobResult | Behavior after JM failover |
|---|---|---|
Not created | Some jobs have created a dirty result; no jobs are marked as clean | Re-execute the application; but skip execution of jobs that already have a dirty result. |
Dirty result created | All jobs have created a dirty result; some jobs are marked as clean | Clean up the application-level resources and resources of jobs that still have a dirty result. |
Marked as clean | All jobs are marked as clean | No action required. |
Table 1. Status Combinations of ApplicationResult/JobResult and Handling Strategies
Support Executing Zero or Multiple Jobs
Currently, Flink requires the user's main method to execute at least one job. We plan to remove this restriction to support cases where no jobs need to be submitted; for instance, the application may terminate early without executing any job when certain conditions are met.
Currently, Flink does not support executing multiple jobs within the main method in certain scenarios (including Application Mode with HA enabled, and submissions via the Web/REST interface in Session Mode). The primary reason is that, during HA recovery, the system cannot correctly match and restore multiple jobs. We have described a solution for this issue in the Application High Availability section. With that in place, we can safely remove the restriction on multiple jobs, thereby supporting user logic that sequentially executes multiple jobs within main.
Note the following limitations:
Streaming jobs: Running multiple streaming jobs may involve restoring savepoints for several jobs. The related APIs and behavior are not yet well defined, and current user demand for this capability is unclear. Therefore, multiple streaming jobs are not supported in the above scenarios.
REST API
/jars/:jarid/run: This API runs the user’smainmethod and return a single job ID. To maintain backward compatibility, submissions via this API must still adhere to the restriction that themainmethod executes exactly one job.Application Mode with
execution.submit-failed-job-on-application-error = true: If the application terminates due to an exception in user logic before any job is submitted, the system will submit a failed job to facilitate diagnosis. To guarantee a stable job ID for this failed job, the mechanism supports only a single job in themainmethod. Consequently, when this configuration is enabled, executing multiple jobs in an application is not supported.
Application Exceptions
Flink currently exposes only exceptions related to job execution, and cannot capture or display exceptions thrown in the main method by non-job-execution logic (such as user-defined code). This limitation complicates troubleshooting. To address it, we plan to introduce a new REST API, /applications/:applicationid/exceptions, to provide comprehensive visibility into exceptions that occur during application execution.
The exception returned by this API is exactly the cause that led the application to enter the FAILED status. It may be a job-execution exception or an exception thrown by user logic in the main method, and it includes the following information:
exceptionNamestacktracetimestamp[optional]
jobId
In addition, when an application reaches a terminal status and is archived, the information returned by this API will also be persisted, so that it can be exposed via the HistoryServer, facilitating later review and diagnosis. For the detailed API design, see the REST API section under Public Interfaces.
FLINK-25715 introduced the configuration option execution.submit-failed-job-on-application-error. In Application Mode, when this option is set to true, if the application terminates due to an exception in user logic before any job is submitted, the system will submit a failed job to facilitate diagnosis. This FLIP improves the visibility of application exceptions, enabling users to directly obtain information about exceptions in user logic for troubleshooting. Therefore, we plan to deprecate the execution.submit-failed-job-on-application-error configuration option and its associated functionality. The detailed plan is described in the Compatibility, Deprecation and Migration Plan section.
Public Interfaces
REST API
We plan to introduce an API for querying application-level exception information.
/applications/:applicationid/exceptions | |
|---|---|
Verb: GET | Response code:200 OK |
| Returns the exception information of an application. | |
Request ▾ {} | |
Response ▾ Response example
| |
We plan to introduce a new application configuration query API that returns the corresponding JobManager configuration and supports archiving the relevant configuration options after the application terminates, ensuring that configuration information is persistently retained even when no jobs have been submitted with the application.
/applications/:applicationid/jobmanager/config | |
|---|---|
Verb: GET | Response code:200 OK |
Returns the jobmanager's configuration of a specific application. | |
Request ▾ {} | |
Response ▾ | |
Configuration
We plan to add the following configuration options.
key | default | description | comment |
|---|---|---|---|
application-result-store.storage-path | Specifies the storage path for the ApplicationResultStore. If not configured, the default is | ||
application-result-store.delete-on-commit | true | Determines whether application results should be removed from the underlying storage when it transitions into a clean state. If set to | It can replace the current We plan to deprecate the |
Table 2. New Configuration Options
Web UI
We plan to refine the design of the application page with the following changes:
Add an action link to cancel the application
Add an action link to navigate to the JobManager log
Display the application’s start time, end time, and duration
Add a subpage for exceptions
The updated page is shown in the figures below.
Figure 3. Application Overview
Figure 4. Application Exceptions
Compatibility, Deprecation, and Migration Plan
The changes introduced by this FLIP are fully compatible with existing jobs. Current interfaces (such as StreamExecutionEnvironment#execute) can continue to be used as before; their behavior remains consistent with previous versions, and the state of existing jobs can still be restored via the original mechanisms.
At the same time, this FLIP enhances high availability and error visibility. We plan to deprecate some existing interfaces, which will require users to migrate to the new interfaces, as described below.
Deprecate job-result-store.delete-on-commit
The job-result-store.delete-on-commit configuration option will be marked as deprecated, with a deprecation warning provided, and will be removed in the next major release (3.0). If your current jobs have job-result-store.delete-on-commit = false, you need to migrate to the new configuration option application-result-store.delete-on-commit and also set it to false. The new option provides stronger guarantees for consistency during high-availability recovery. For details, see the Configuration section under Public Interfaces.
Deprecate execution.submit-failed-job-on-application-error
The execution.submit-failed-job-on-application-error configuration option will be marked as deprecated, with a deprecation warning provided, and will be removed in the next major release (3.0). If your current jobs have execution.submit-failed-job-on-application-error = true and rely on the failed job submitted by the system to detect exceptions in user logic, you need to remove this configuration and instead obtain exception details via the /applications/:applicationid/exceptions API or from the UI.
Test Plan
Unit Tests & Integration Tests & Manual Tests will be used to verify this change.
Rejected Alternatives
No rejected alternatives.



