DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Current state | Accepted |
|---|---|
| Discussion thread | https://lists.apache.org/thread/7jv3cflsqnp114r86kqvhp8xs00gx8zr |
| JIRA | FLINK-38755 - Getting issue details... STATUS |
| Released | <Flink Version> |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As a unified framework for streaming and batch data processing, Flink has seen rapid development and widespread adoption in recent years. However, the current framework for executing user logic presents issues that impact its flexibility and user experience:
User
mainexecution status is not observable: In the current framework, users have no direct visibility into the execution status of themainmethod of a Flink application. It's unclear whether themainhas started, completed or whether any remnants remain. This lack of observability complicates debugging and monitoring, especially when execution errors occur, making it difficult to quickly identify and resolve issues.Limited support for user
mainlogic: The current execution framework imposes restrictions on Java logic, introducing unnecessary assumptions and limitations.The user
mainmethod is required to submit exactly one job in some modes, and submitting multiple jobs or not submitting any job at all will result in an error. This limitation affects Flink’s applicability in complex scenarios — for example, cases where no job needs execution, or where multiple jobs are executed sequentially (a common pattern in batch processing).The success of the user logic is directly tied to the success of its jobs, making it difficult to accommodate diverse scenarios. For instance, users may want to implement retry logic for job execution within the
mainmethod and consider the overall logic successful as long as the job eventually succeeds. However, under the current implementation, the presence of any failed job causes the user logic to be considered failed.
Inconsistent behavior across different modes increases user cognitive load: Flink supports multiple deployment modes, but the behavior across these modes is inconsistent, as shown in the table below.
Mode | Behavior | |||
|---|---|---|---|---|
Deployment Mode | Submission Method | High Availability Mode | Default Blocking Behavior of | Multi-job Support |
Application Mode | - | Non-HA | Non-blocking | ✅ |
HA | ❌ | |||
Session Mode | CLI | Non-HA | Blocking | ✅ |
HA | ✅ | |||
Web via webUI or REST API(/jars/:jarid/run) | Non-HA | Non-blocking | ❌ | |
HA | ❌ | |||
Table 1. Flink Behavior Inconsistency Across Different Modes
The blocking behavior of StreamExecutionEnvironment#execute directly impacts the execution of user logic. For example, in the following case, under application mode or session mode with Web submission, the cleanup() method may be invoked before the job completes, whereas in session mode with CLI submission, cleanup() will be invoked only after the job finishes.
public static void main() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.execute("Job");
cleanup();
}
It is worth noting that the blocking behavior of StreamExecutionEnvironment#execute is currently affected by the execution.attached configuration, which introduces conceptual ambiguity and further increases the user’s understanding and learning burden.
Addressing the above issues requires systematic, multi-faceted changes. As the initial FLIP, we confine the discussion to introducing the concept and entity of an application, as well as providing query and management interfaces for applications. These changes can improve the observability and manageability of user-logic execution, and lay the foundation for future improvements. Application capability enhancements—such as high availability for applications and support for multiple-job execution—will be addressed in a subsequent FLIP.
Proposed Design
This section presents the overall design, which consists of three parts:
Introducing the concept and entity of an application, with support for application status queries and manual cancellation;
Unifying various cluster modes and submission modes based on the Cluster-Application-Job tiered structure.
Introducing an application archiving mechanism to provide historical information about applications.
Application Management
Application Definition
An application refers to a piece of user-submitted logic. We plan to introduce a concrete application object to standardize user code execution and provide unified status management capabilities. Applications are managed by the Dispatcher and have two kinds of implementations:
PackagedProgramApplication:Represents an application created by wrapping a user-submitted JAR (PackagedProgram). Its core execution logic involves invoking themain()method within the user JAR. This is suitable for scenarios where complete user-defined logic is submitted to the cluster, such as in application mode or session mode via web submission. Note that SQL queries can also run in this mode via SqlDriver introduced in FLIP-480.SingleJobApplication:Represents an application created by wrapping a single job. It acts as a lightweightmainmethod that submits one job and waits for its completion. This type is appropriate for scenarios where an individual job is submitted to the cluster, such as submitting a single JobGraph in session mode.
Application Status
An application can have the following statuses:
Status | Description |
|---|---|
CREATED | The default state after the application is created. |
RUNNING | The application transitions to this state once execution begins. |
CANCELING | The application has received a user-initiated cancellation request, started to cancel all of its non-terminal jobs and wait for them to reach a terminal state. |
FAILING | An exception occurred during the application's execution, and it has started to cancel all of its non-terminal jobs and wait for them to reach a terminal state. |
FINISHED | The application execution has completed normally, and all jobs have reached a terminal state (regardless of whether they succeeded). Note: Currently, any job being canceled or failed will cause the user |
FAILED | The application exited due to an exception. For |
CANCELED | The application was canceled by the user. For |
Table 2. Application Statuses
Note: A fatal error will be triggered if an application in the CANCELING or FAILING state fails to properly terminate its jobs. In session mode, this may affect other applications in the cluster. However, this action is necessary because a job's failure to respond to a cancellation request usually indicates a severe underlying problem, and a fatal error is required to avoid leaving the system in an inconsistent state.
The application status transition is shown below.
Figure 1. Application Status Transition
Manage Applications
To support application management and querying, new interfaces for application submission, querying, and cancellation have been introduced. For details, refer to the REST API and Web UI sections under Public Interfaces.
It is worth noting that when canceling an application, all jobs it has already submitted must be terminated, and any further job submissions should be prevented to ensure the application exits as quickly as possible.
Unified Cluster-Application-Job Structure
Currently, the Flink cluster's Dispatcher does not have application objects and the overall structure is based on cluster-job. This design makes it difficult to observe the execution status of user logic, and different job submission paths across various deployment modes have led to inconsistent behavior. We plan to introduce a cluster-application-job structure to support unified management based on applications and to lay the foundation for future optimizations. Figure 2 illustrates a comparison of the cluster architecture before and after the proposed changes.
Figure 2. Flink Cluster Structure Changes
Cluster & Application
An Flink cluster can operate in two modes: application mode and session mode.
In application mode, a cluster is launched for a specific application and can be terminated upon application completion, with a one-to-one mapping between the cluster and the application.
In session mode, the cluster is pre-launched and runs independently of any single application. A session cluster can execute multiple applications.
Application & Job
An application can contain 0 to N jobs. Each job is associated with a single application, and the Dispatcher is responsible for managing the mapping between applications and their respective jobs.
Application Submission
Based on the Cluster–Application–Job structure, applications are submitted to the cluster in a unified way across different modes.
Application mode: A
PackagedProgramApplicationis created from the user JAR during cluster creation and submitted once the cluster starts running.Session mode (Web submission): The
/jars/:jarid/run-applicationREST API is used. The handler creates aPackagedProgramApplicationfrom the user JAR and submits it to the cluster.Session mode (CLI submission): Users can configure either attached (default) or detached mode. In attached mode, the user's
mainmethod is executed on the client side, and the job is submitted via theJobSubmitREST API, whose handler wraps the single job as aSingleJobApplicationand submits it to the cluster. In detached mode, the/jars/:jarid/run-applicationREST API can be used, similar to Web submission.
Archive Applications
Flink only provides an archiving mechanism for job-related information currently. To support historical querying and visualization of applications that have reached a terminal status, we plan to introduce an application archiving mechanism.
Archiving Directory Structure
Currently, the job archive files are written to the directory specified by the configuration option jobmanager.archive.fs.dir, one file per job. The original directory structure is as follows.
<jobmanager.archive.fs.dir>/ ├── <job-id-1> ├── <job-id-2> └── ...
Based on the unified cluster-application-job structure, we plan to modify the directory structure as shown below. The application-summary is the application archive file.
<jobmanager.archive.fs.dir>/ ├── <cluster-id-1>/ │ └── applications/ │ ├── <application-id-1>/ │ │ ├── application-summary │ │ └── jobs/ │ │ ├── <job-id-1> │ │ └── ... │ └── ... ├── <cluster-id-2>/ │ └── applications/ │ └── ... └── ...
Note that a new configuration option cluster.id is introduced to specify a unique identifier for the Flink cluster. This helps make the archive directory structure clearer and facilitates the addition of cluster-level archive information such as resource consumption in the future. Unlike existing options high-availability.cluster-id and kubernetes.cluster-id, cluster.id does not need to be tied to high availability mode or a specific resource provider (e.g., Kubernetes). In the future, for ease of configuration, we may consider using cluster.id as the default value for high-availability.cluster-id and kubernetes.cluster-id.
Application Archiving Operation
The content archived for an application is consistent with the response returned by the REST queries. Archiving is triggered by the Dispatcher, which listens to application status changes. When the application reaches a terminal status (FINISHED, CANCELED, or FAILED), the Dispatcher performs the archiving operation in the callback, persisting application-related information to the file system for future access.
History Server
The HistoryServer should be able to provide information about archived applications and display it in the Web UI. Similar to job archiving, new configuration options are introduced to specify the maximum number of application archives to retain and whether to clean up expired archived applications. For more details, refer to the Configuration and Web UI sections under Public Interfaces.
FLIP 505 introduces a fetchArchiveByJobId(String jobId)method for the History Server to fetch job archives from remote storage. However, with the new hierarchical directory structure (cluster -> application -> job), the full path to the job archive file cannot be derived from the jobId alone; instead, it requires searching the jobs directories across all clusters and applications, which may impact efficiency. To address this, we plan to provide a fetchArchiveByApplicationId method to retrieve archive information from remote storage at the application level (including all job archives within that application).
Public Interfaces
REST API
We plan to add the following new APIs:
/applications/overview
/applications/:applicationid
/applications/:applicationid/cancel
/jars/:jarid/run-application
Additionally, the existing /jobs/:jobid and /jars/:jarid/run API will be modified. Note that all existing job-related APIs for querying or cancellation will retain their original functionality.
/applications/overview | |
|---|---|
| Verb: GET | Response code: 200 OK |
| Returns an overview over all applications. | |
Request {} | |
Response { Response example { | |
/applications/:applicationid | |
|---|---|
| Verb: GET | Response code: 200 OK |
| Returns details of an application. | |
Request {} | |
Response { Response example { | |
/applications/:applicationid/cancel | |
|---|---|
| Verb: POST | Response code: 202 Accepted |
| Terminates an application including all jobs with it. | |
Request {} | |
Response {} | |
We plan to add an applicationId field to the job query API response to clarify which application the job belongs to.
/jobs/:jobId | |
|---|---|
| Verb: GET | Response code: 200 OK |
| Returns details of a job. | |
Request {} | |
Response { Response example { | |
We plan to modify the /jars/:jarid/run REST API by adding an applicationId field in the response body, allowing users to manage the corresponding application afterward. Currently, the API runs the user’s main method and returns a single job ID. To ensure full backward compatibility, this behavior will remain unchanged, and the newly added applicationId will simply mirror the job ID in the response.
/jars/:jarid/run | |
|---|---|
| Verb: POST | Response code: 200 OK |
| Submits an application by running a jar previously uploaded via '/jars/upload'. | |
Request { | |
Response { | |
The current /jars/:jarid/run API operates synchronously, waiting for the user's main method to complete before returning a job ID. This can cause significant response latency but we need to preserve this behavior for backward compatibility.
Additionally, we plan to introduce a new asynchronous /jars/:jarid/run-application API. It will immediately return an application ID upon submission, which can be used for subsequent status polling. This approach avoids waiting for the main method to execute, thereby improving response speed and user experience.
/jars/:jarid/run-application | |
|---|---|
| Verb: POST | Response code: 202 Accepted |
Submits an application by running a jar previously uploaded via '/jars/upload'. Unlike | |
Request { | |
Response { | |
Configuration
We plan to add the following configuration options.
key | default | description | comment |
|---|---|---|---|
execution.terminate-application-on-any-job-terminated-exceptionally | true | When set to When set to | Used to maintain compatibility with the current behavior where the application’s terminal status is tied to its jobs. |
cluster.id | "00000000000000000000000000000000" | The unique identifier of the Flink cluster, which must conform to the format of | It can be used as the default value for |
historyserver.archive.clean-expired-applications | false | When set to | |
historyserver.archive.retained-applications | -1 | Specifies the maximum number of application archives to retain. When set to Note that when an application archive is deleted, its associated job archives will be deleted as well. The existing |
Table 3. Configuration Options Changes
Web UI
Homepage
Add an Applications tab to the sidebar.
Change the Submit New Job tab to Submit New Application in the sidebar.
Change the existing Running/Completed Job List section on the homepage to Running/Completed Application List, with columns including Application Name, Start Time, Duration, End Time, Jobs, and Status.
Clicking on an application navigates to the Application page.
Figure 3. Homepage Design
Application Page
Application Page (displayed after clicking on a specific application) displays information about the selected application along with all associated jobs, divided into Running/Completed Job List sections. Each job entry includes the following columns: Job Name, Start Time, Duration, End Time, Tasks, and Status.
Figure 4. Application Page Design
Job Page
On the Job page, add a < Return to Application link to navigate back to the parent Application page.
Figure 5. Job Page Design
History Server Homepage
Add an Applications tab to the sidebar.
Change the existing Running/Completed Job List section on the homepage to Running/Completed Application List, with columns including Application Name, Start Time, Duration, End Time, Jobs, and Status.
Clicking on an application navigates to the Application page (see the above Application Page section).
Figure 6. History Server Homepage Design
Compatibility, Deprecation, and Migration Plan
No compatibility issues will be introduced.
Test Plan
Both Unit Tests & Integration Tests & Manual Tests will be used to verify this change.
Rejected Alternatives
No rejected alternatives.
Follow-up Tasks
Application High Availability
High availability for an application means that after a JobManager failover, the user's main method can be re-executed and the job can be correctly restored. Correct restoration of the job involves the following three aspects:
Jobs before and after the restart must correctly match.
Avoid restoring jobs that do not need to be recovered.
Avoid resubmitting jobs that have already completed.
Here’s an explanation for the first two points. For example, consider the following scenario. Suppose before the restart, the condition is true, so Job 1 has already been submitted and is running. After a failover, the condition becomes false. At this point, Job 2 should be submitted, instead of restoring and running Job 1.
public static void main() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (condition) {
...
env.execute("Job 1");
} else {
...
env.execute("Job 2");
}
}
Although the main method can be re-executed in the current Application mode, it does not meet the above requirements for correct job recovery; further changes are needed to address these issues and achieve application-level high availability.
In Session mode, the user's main method is currently not re-executed after HA recovery. In the future, we plan to store application-related information (such as the JAR location and execution parameters) in a high available manner, and re-execute the user's main method after HA recovery to achieve consistent behavior with Application mode.
Support Executing Zero or Multiple Jobs
Currently, Flink requires the user's main method to execute at least one job. We plan to remove this restriction to support cases where no jobs need to be submitted; for instance, the application may terminate early without executing any job when certain conditions are met.
Currently, executing multiple jobs within the user's main method is not supported in certain scenarios. However, sequentially running several jobs is a common requirement in batch processing. Therefore, we plan to enhance application support for multi-job execution in the future.
Application Exceptions
At present, Flink exposes job-level exceptions via /jobs/:jobid/exceptions REST API, covering exceptions thrown during job execution. It does not, however, report exceptions that are not part of job execution, such as failures originating from user logic inside main. In these cases, users typically need to refer to logs to determine the causes of failures. After introducing application object, we plan to provide application-level exception information and support archiving them, so users can access exceptions thrown during main execution and troubleshoot more effectively.
Unify the behavior of execute
In the current implementation, when the configuration option execution.attached = false is set, the StreamExecutionEnvironment#execute method does not block during execution. This behavior is inconsistent with typical expectations—where blocking and non-blocking behavior should be clearly distinguished by using execute (blocking) and executeAsync (non-blocking), respectively.
Therefore, we plan to modify this method’s behavior: StreamExecutionEnvironment#execute will always run in blocking mode, regardless of the execution.attached configuration. This change aligns with the expected consistency of method semantics. Non-blocking execution should be invoked explicitly via executeAsync. Given potential compatibility impacts, this change should be made in next major Flink release(3.x).





