Current stateAccepted
Discussion threadhttps://lists.apache.org/thread/7jv3cflsqnp114r86kqvhp8xs00gx8zr
JIRA

FLINK-38755 - Getting issue details... STATUS

Released<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As a unified framework for streaming and batch data processing, Flink has seen rapid development and widespread adoption in recent years. However, the current framework for executing user logic presents issues that impact its flexibility and user experience:

  1. User main execution status is not observable: In the current framework, users have no direct visibility into the execution status of the main method of a Flink application. It's unclear whether the main has started, completed or whether any remnants remain. This lack of observability complicates debugging and monitoring, especially when execution errors occur, making it difficult to quickly identify and resolve issues.

  2. Limited support for user main logic: The current execution framework imposes restrictions on Java logic, introducing unnecessary assumptions and limitations.

    1. The user main method is required to submit exactly one job in some modes, and submitting multiple jobs or not submitting any job at all will result in an error. This limitation affects Flink’s applicability in complex scenarios — for example, cases where no job needs execution, or where multiple jobs are executed sequentially (a common pattern in batch processing).

    2. The success of the user logic is directly tied to the success of its jobs, making it difficult to accommodate diverse scenarios. For instance, users may want to implement retry logic for job execution within the main method and consider the overall logic successful as long as the job eventually succeeds. However, under the current implementation, the presence of any failed job causes the user logic to be considered failed.

  3. Inconsistent behavior across different modes increases user cognitive load: Flink supports multiple deployment modes, but the behavior across these modes is inconsistent, as shown in the table below.

Mode

Behavior

Deployment Mode

Submission Method

High Availability Mode

Default Blocking Behavior of StreamExecutionEnvironment#execute

Multi-job Support

Application Mode

-

Non-HA

Non-blocking

HA

Session Mode

CLI

Non-HA

Blocking

HA

Web via webUI or REST API(/jars/:jarid/run)

Non-HA

Non-blocking

HA

Table 1. Flink Behavior Inconsistency Across Different Modes


The blocking behavior of StreamExecutionEnvironment#execute directly impacts the execution of user logic. For example, in the following case, under application mode or session mode with Web submission, the cleanup() method may be invoked before the job completes, whereas in session mode with CLI submission, cleanup() will be invoked only after the job finishes.

public static void main() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  ...

  env.execute("Job");
  
  cleanup();
}

It is worth noting that the blocking behavior of StreamExecutionEnvironment#execute is currently affected by the execution.attached configuration, which introduces conceptual ambiguity and further increases the user’s understanding and learning burden.


Addressing the above issues requires systematic, multi-faceted changes. As the initial FLIP, we confine the discussion to introducing the concept and entity of an application, as well as providing query and management interfaces for applications. These changes can improve the observability and manageability of user-logic execution, and lay the foundation for future improvements. Application capability enhancements—such as high availability for applications and support for multiple-job execution—will be addressed in a subsequent FLIP.

Proposed Design

This section presents the overall design, which consists of three parts:

  1. Introducing the concept and entity of an application, with support for application status queries and manual cancellation;

  2. Unifying various cluster modes and submission modes based on the Cluster-Application-Job tiered structure.

  3. Introducing an application archiving mechanism to provide historical information about applications.

Application Management

Application Definition

An application refers to a piece of user-submitted logic. We plan to introduce a concrete application object to standardize user code execution and provide unified status management capabilities. Applications are managed by the Dispatcher and have two kinds of implementations:

  • PackagedProgramApplication:Represents an application created by wrapping a user-submitted JAR (PackagedProgram). Its core execution logic involves invoking the main() method within the user JAR. This is suitable for scenarios where complete user-defined logic is submitted to the cluster, such as in application mode or session mode via web submission. Note that SQL queries can also run in this mode via SqlDriver introduced in FLIP-480.

  • SingleJobApplication:Represents an application created by wrapping a single job. It acts as a lightweight main method that submits one job and waits for its completion. This type is appropriate for scenarios where an individual job is submitted to the cluster, such as submitting a single JobGraph in session mode.

Application Status

An application can have the following statuses:

Status

Description

CREATED

The default state after the application is created.

RUNNING

The application transitions to this state once execution begins.

CANCELING

The application has received a user-initiated cancellation request, started to cancel all of its non-terminal jobs and wait for them to reach a terminal state.

FAILING

An exception occurred during the application's execution, and it has started to cancel all of its non-terminal jobs and wait for them to reach a terminal state.

FINISHED

The application execution has completed normally, and all jobs have reached a terminal state (regardless of whether they succeeded). Note: Currently, any job being canceled or failed will cause the user main to terminate immediately. We plan to introduce a configuration option execution.terminate-application-on-any-job-terminated-exceptionally, with a default value of true, which preserves the existing behavior that when any job is canceled (or failed), the application will immediately transition to a canceled (or failed) status. Users can set this configuration option to false to enable the new behavior.

FAILED

The application exited due to an exception. For PackagedProgramApplication, this means the user’s main method exited with an error. For SingleJobApplication, this indicates a failure during job submission or execution.

CANCELED

The application was canceled by the user. For PackagedProgramApplication , this means the execution of the main method has stopped, and all associated jobs have reached their terminal states.

Table 2. Application Statuses


Note: A fatal error will be triggered if an application in the CANCELING or FAILING state fails to properly terminate its jobs. In session mode, this may affect other applications in the cluster. However, this action is necessary because a job's failure to respond to a cancellation request usually indicates a severe underlying problem, and a fatal error is required to avoid leaving the system in an inconsistent state.

The application status transition is shown below.

Figure 1. Application Status Transition


Manage Applications

To support application management and querying, new interfaces for application submission, querying, and cancellation have been introduced. For details, refer to the REST API and Web UI sections under Public Interfaces.

It is worth noting that when canceling an application, all jobs it has already submitted must be terminated, and any further job submissions should be prevented to ensure the application exits as quickly as possible.

Unified Cluster-Application-Job Structure

Currently, the Flink cluster's Dispatcher does not have application objects and the overall structure is based on cluster-job. This design makes it difficult to observe the execution status of user logic, and different job submission paths across various deployment modes have led to inconsistent behavior. We plan to introduce a cluster-application-job structure to support unified management based on applications and to lay the foundation for future optimizations. Figure 2 illustrates a comparison of the cluster architecture before and after the proposed changes.

Figure 2. Flink Cluster Structure Changes


Cluster & Application

An Flink cluster can operate in two modes: application mode and session mode.

In application mode, a cluster is launched for a specific application and can be terminated upon application completion, with a one-to-one mapping between the cluster and the application.

In session mode, the cluster is pre-launched and runs independently of any single application. A session cluster can execute multiple applications.

Application & Job

An application can contain 0 to N jobs. Each job is associated with a single application, and the Dispatcher is responsible for managing the mapping between applications and their respective jobs.

Application Submission

Based on the Cluster–Application–Job structure, applications are submitted to the cluster in a unified way across different modes.

  • Application mode: A PackagedProgramApplication is created from the user JAR during cluster creation and submitted once the cluster starts running.

  • Session mode (Web submission): The /jars/:jarid/run-application REST API is used. The handler creates a PackagedProgramApplication from the user JAR and submits it to the cluster.

  • Session mode (CLI submission): Users can configure either attached (default) or detached mode. In attached mode, the user's main method is executed on the client side, and the job is submitted via the JobSubmit REST API, whose handler wraps the single job as a SingleJobApplication and submits it to the cluster. In detached mode, the /jars/:jarid/run-application REST API can be used, similar to Web submission.

Archive Applications

Flink only provides an archiving mechanism for job-related information currently. To support historical querying and visualization of applications that have reached a terminal status, we plan to introduce an application archiving mechanism.

Archiving Directory Structure

Currently, the job archive files are written to the directory specified by the configuration option jobmanager.archive.fs.dir, one file per job. The original directory structure is as follows.

<jobmanager.archive.fs.dir>/
├── <job-id-1>
├── <job-id-2>
└── ...

Based on the unified cluster-application-job structure, we plan to modify the directory structure as shown below. The application-summary is the application archive file.

<jobmanager.archive.fs.dir>/
├── <cluster-id-1>/
│   └── applications/
│       ├── <application-id-1>/
│       │   ├── application-summary
│       │   └── jobs/
│       │       ├── <job-id-1>
│       │       └── ...
│       └── ...
├── <cluster-id-2>/
│   └── applications/
│       └── ...
└── ...

Note that a new configuration option cluster.id is introduced to specify a unique identifier for the Flink cluster. This helps make the archive directory structure clearer and facilitates the addition of cluster-level archive information such as resource consumption in the future. Unlike existing options high-availability.cluster-id and kubernetes.cluster-id, cluster.id does not need to be tied to high availability mode or a specific resource provider (e.g., Kubernetes). In the future, for ease of configuration, we may consider using cluster.id as the default value for high-availability.cluster-id and kubernetes.cluster-id.

Application Archiving Operation

The content archived for an application is consistent with the response returned by the REST queries. Archiving is triggered by the Dispatcher, which listens to application status changes. When the application reaches a terminal status (FINISHED, CANCELED, or FAILED), the Dispatcher performs the archiving operation in the callback, persisting application-related information to the file system for future access.

History Server

The HistoryServer should be able to provide information about archived applications and display it in the Web UI. Similar to job archiving, new configuration options are introduced to specify the maximum number of application archives to retain and whether to clean up expired archived applications. For more details, refer to the Configuration and Web UI sections under Public Interfaces.

FLIP 505 introduces a fetchArchiveByJobId(String jobId)method for the History Server to fetch job archives from remote storage. However, with the new hierarchical directory structure (cluster -> application -> job), the full path to the job archive file cannot be derived from the jobId alone; instead, it requires searching the jobs directories across all clusters and applications, which may impact efficiency. To address this, we plan to provide a fetchArchiveByApplicationId method to retrieve archive information from remote storage at the application level (including all job archives within that application).

Public Interfaces

REST API

We plan to add the following new APIs:

  • /applications/overview

  • /applications/:applicationid

  • /applications/:applicationid/cancel

  • /jars/:jarid/run-application

Additionally, the existing /jobs/:jobid  and /jars/:jarid/run API will be modified. Note that all existing job-related APIs for querying or cancellation will retain their original functionality.


/applications/overview
Verb: GETResponse code: 200 OK
Returns an overview over all applications.

Request

{}

Response

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:MultipleApplicationsDetails",
  "properties" : {
    "applications" : {
      "type" : "array",
      "items" : {
        "type" : "object",
        "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:ApplicationDetails",
        "properties" : {
          "duration" : {
            "type" : "integer"
          },
          "end-time" : {
            "type" : "integer"
          },
          "id" : {
            "type" : "any"
          },
          "name" : {
            "type" : "string"
          },
          "start-time" : {
            "type" : "integer"
          },
          "status" : {
            "type" : "string",
            "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELING", "CANCELED", "FINISHED" ]
          },
          "jobs" : {
            "type" : "object",
            "additionalProperties" : {
              "type" : "integer"
            }
          }
        }
      }
    }
  }
}

Response example

{
  "applications" : [
    {
      "id" : "fd72014d4c864993a2e5a9287b4a9c5d",
      "name" : "WordCount",
      "status": "RUNNING",
      "start-time" : 1744890639819,
      "end-time" : -1,
      "duration" : 2000,
      "jobs" : [
        {
          "RUNNING" : 2,
          "FINISHED" : 1
        }
      ]
    }
  ]
}


/applications/:applicationid
Verb: GETResponse code: 200 OK
Returns details of an application.

Request

{}

Response

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:ApplicationDetailsInfo",
  "properties" : {
    "id" : {
      "type" : "any"
    },
    "name" : {
      "type" : "string"
    },
    "status" : {
      "type" : "string",
      "enum" : [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELING", "CANCELED", "FINISHED" ]
    },
    "timestamps" : {
      "type" : "object",
      "additionalProperties" : {
        "type" : "integer"
      }
    },
    "start-time" : {
      "type" : "integer"
    },
    "end-time" : {
      "type" : "integer"
    },
    "duration" : {
      "type" : "integer"
    },
    "jobs" : {
      "type" : "array",
      "items" : {
        "type" : "object",
        "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
        "properties" : {
          "duration" : {
            "type" : "integer"
          },
          "end-time" : {
            "type" : "integer"
          },
          "jid" : {
            "type" : "any"
          },
          "last-modification" : {
            "type" : "integer"
          },
          "name" : {
            "type" : "string"
          },
          "pending-operators" : {
            "type" : "integer"
          },
          "start-time" : {
            "type" : "integer"
          },
          "state" : {
            "type" : "string",
            "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
          },
          "tasks" : {
            "type" : "object",
            "additionalProperties" : {
              "type" : "integer"
            }
          }
        }
      }
    }
  }
}

Response example

{
  "id" : "fd72014d4c864993a2e5a9287b4a9c5d",
  "name" : "WordCount",
  "status": "RUNNING",
  "timestamps" : {
    "CREATED" : 1744890639818,
    "RUNNING" : 1744890639819,
    ...
  },
  "start-time" : 1744890639818,
  "end-time" : -1,
  "duration" : 2000,
  "jobs" : [
    {
      "jid" : "1",
      "name" : "WordCount",
      "state" : "RUNNING",
      "start-time" : 1744890639823,
      "end-time" : -1,
      "duration" : 32449,
      "last-modification" : 1744890641397,
      "pending-operators" : 0,
      "tasks" : {
        "running" : 2,
        "canceling" : 0,
        ...
      }
    }
  ]
}

/applications/:applicationid/cancel
Verb: POSTResponse code: 202 Accepted
Terminates an application including all jobs with it.

Request

{}

Response

{}

We plan to add an applicationId field to the job query API response to clarify which application the job belongs to.

/jobs/:jobId
Verb: GETResponse code: 200 OK
Returns details of a job.

Request

{}

Response

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobDetailsInfo",
  "properties" : {
    "applicationId" : {
      "type" : "any"
    },
    "duration" : {
      "type" : "integer"
    },
    "end-time" : {
      "type" : "integer"
    },
    "isStoppable" : {
      "type" : "boolean"
    },
    "jid" : {
      "type" : "any"
    },
    ...
  }
}

Response example

{
  "applicationId" : "fd72014d4c864993a2e5a9287b4a9c5d",
  "duration" : 2000,
  "end-time" : -1,
  "isStoppable" : true,
  "jid" : "",
  ...
}


We plan to modify the /jars/:jarid/run REST API by adding an applicationId field in the response body, allowing users to manage the corresponding application afterward. Currently, the API runs the user’s main method and returns a single job ID. To ensure full backward compatibility, this behavior will remain unchanged, and the newly added applicationId will simply mirror the job ID in the response.

/jars/:jarid/run
Verb: POSTResponse code: 200 OK
Submits an application by running a jar previously uploaded via '/jars/upload'.

Request

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
    "entryClass" : {
      "type" : "string"
    },
    "flinkConfiguration" : {
      "type" : "object",
      "additionalProperties" : {
        "type" : "string"
      }
    },
    "jobId" : {
      "type" : "any"
    },
    "parallelism" : {
      "type" : "integer"
    },
    "programArgsList" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    },
    "allowNonRestoredState" : {
      "type" : "boolean"
    },
    "claimMode" : {
      "type" : "string",
      "enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
    },
    "savepointPath" : {
      "type" : "string"
    }
  }
}

Response

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunResponseBody",
  "properties" : {
    "jobid" : {
      "type" : "any"
    },
    "applicationId" : {
      "type" : "any"
    }
  }
}

The current /jars/:jarid/run API operates synchronously, waiting for the user's main method to complete before returning a job ID. This can cause significant response latency but we need to preserve this behavior for backward compatibility.

Additionally, we plan to introduce a new asynchronous /jars/:jarid/run-application API. It will immediately return an application ID upon submission, which can be used for subsequent status polling. This approach avoids waiting for the main method to execute, thereby improving response speed and user experience.

/jars/:jarid/run-application
Verb: POSTResponse code: 202 Accepted

Submits an application by running a jar previously uploaded via '/jars/upload'. Unlike /jars/:jarid/run, it returns immediately after the application is submitted and does not wait for the main method to complete.

Request

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunApplicationRequestBody",
    "entryClass" : {
      "type" : "string"
    },
    "flinkConfiguration" : {
      "type" : "object",
      "additionalProperties" : {
        "type" : "string"
      }
    },
    "applicationId" : {
      "type" : "any"
    },
    "parallelism" : {
      "type" : "integer"
    },
    "programArgsList" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    },
    "allowNonRestoredState" : {
      "type" : "boolean"
    },
    "claimMode" : {
      "type" : "string",
      "enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
    },
    "savepointPath" : {
      "type" : "string"
    }
  }
}

Response

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunApplicationResponseBody",
  "properties" : {
    "applicationId" : {
      "type" : "any"
    }
  }
}

Configuration

We plan to add the following configuration options.

key

default

description

comment

execution.terminate-application-on-any-job-terminated-exceptionally

true

When set to true, the application will immediately terminate as canceled (or failed) if any job is canceled (or failed).

When set to false, the application waits for all jobs to reach a terminal state before transitioning to finished.

Used to maintain compatibility with the current behavior where the application’s terminal status is tied to its jobs.

cluster.id

"00000000000000000000000000000000"

The unique identifier of the Flink cluster, which must conform to the format of AbstractID. It is primarily used to determine the application archive directory. If the user does not configure cluster.id but has configured high-availability.cluster-id, the system will use the value of high-availability.cluster-id as the cluster.id so the existing jobs are not affected.

It can be used as the default value for high-availability.cluster-id and kubernetes.cluster-id. Since this changes the current defaults and may introduce compatibility issues, the change should be made in the next major Flink release(3.x).

historyserver.archive.clean-expired-applications

false

When set to true, the HistoryServer will remove application archive entries from its cache that no longer exist in the archive directory.


historyserver.archive.retained-applications

-1

Specifies the maximum number of application archives to retain. When set to -1 , it means there is no limit on the number of archived applications.

Note that when an application archive is deleted, its associated job archives will be deleted as well. The existing historyserver.archive.retained-jobs configuration only applies to jobs that do not belong to any application.


Table 3. Configuration Options Changes


Web UI

Homepage

  • Add an Applications tab to the sidebar.

  • Change the Submit New Job tab to Submit New Application in the sidebar.

  • Change the existing Running/Completed Job List section on the homepage to Running/Completed Application List, with columns including Application Name, Start Time, Duration, End Time, Jobs, and Status.

  • Clicking on an application navigates to the Application page.

Figure 3. Homepage Design


Application Page

Application Page (displayed after clicking on a specific application) displays information about the selected application along with all associated jobs, divided into Running/Completed Job List sections. Each job entry includes the following columns: Job Name, Start Time, Duration, End Time, Tasks, and Status.

Figure 4. Application Page Design


Job Page

On the Job page, add a < Return to Application link to navigate back to the parent Application page.

Figure 5. Job Page Design


History Server Homepage

  • Add an Applications tab to the sidebar.

  • Change the existing Running/Completed Job List section on the homepage to Running/Completed Application List, with columns including Application Name, Start Time, Duration, End Time, Jobs, and Status.

  • Clicking on an application navigates to the Application page (see the above Application Page section).

Figure 6. History Server Homepage Design


Compatibility, Deprecation, and Migration Plan

No compatibility issues will be introduced.

Test Plan

Both Unit Tests & Integration Tests & Manual Tests will be used to verify this change. 

Rejected Alternatives

No rejected alternatives.

Follow-up Tasks

Application High Availability

High availability for an application means that after a JobManager failover, the user's main method can be re-executed and the job can be correctly restored. Correct restoration of the job involves the following three aspects:

  1. Jobs before and after the restart must correctly match.

  2. Avoid restoring jobs that do not need to be recovered.

  3. Avoid resubmitting jobs that have already completed.

Here’s an explanation for the first two points. For example, consider the following scenario. Suppose before the restart, the condition is true, so Job 1 has already been submitted and is running. After a failover, the condition becomes false. At this point, Job 2 should be submitted, instead of restoring and running Job 1.

public static void main() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  if (condition) {
    ...
    env.execute("Job 1");
  } else {
    ...
    env.execute("Job 2");
  }
}

Although the main method can be re-executed in the current Application mode, it does not meet the above requirements for correct job recovery; further changes are needed to address these issues and achieve application-level high availability.

In Session mode, the user's main method is currently not re-executed after HA recovery. In the future, we plan to store application-related information (such as the JAR location and execution parameters) in a high available manner, and re-execute the user's main method after HA recovery to achieve consistent behavior with Application mode.

Support Executing Zero or Multiple Jobs

Currently, Flink requires the user's main method to execute at least one job. We plan to remove this restriction to support cases where no jobs need to be submitted; for instance, the application may terminate early without executing any job when certain conditions are met.

Currently, executing multiple jobs within the user's main method is not supported in certain scenarios. However, sequentially running several jobs is a common requirement in batch processing. Therefore, we plan to enhance application support for multi-job execution in the future.

Application Exceptions

At present, Flink exposes job-level exceptions via /jobs/:jobid/exceptions REST API, covering exceptions thrown during job execution. It does not, however, report exceptions that are not part of job execution, such as failures originating from user logic inside main. In these cases, users typically need to refer to logs to determine the causes of failures. After introducing application object, we plan to provide application-level exception information and support archiving them, so users can access exceptions thrown during main execution and troubleshoot more effectively.

Unify the behavior of execute

In the current implementation, when the configuration option execution.attached = false is set, the StreamExecutionEnvironment#execute method does not block during execution. This behavior is inconsistent with typical expectations—where blocking and non-blocking behavior should be clearly distinguished by using execute (blocking) and executeAsync (non-blocking), respectively.

Therefore, we plan to modify this method’s behavior: StreamExecutionEnvironment#execute will always run in blocking mode, regardless of the execution.attached configuration. This change aligns with the expected consistency of method semantics. Non-blocking execution should be invoked explicitly via executeAsync. Given potential compatibility impacts, this change should be made in next major Flink release(3.x).



  • No labels