Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Currently, depending on the cluster lifecycle and resource isolation guarantees, a Flink job can be executed either on a Session Cluster, or on a Per-Job one.

Session mode assumes an already running cluster and uses the resources of that cluster to execute the submitted jobs. Jobs executed in the same cluster use, and consequently compete for, the same resources. In addition, if one of the jobs misbehaves or brings down a Task Manager, then all jobs running on that Task Manager will have to be restarted. This, apart from a negative impact on the jobs themselves, implies a potential massive recovery process with all the restarting jobs accessing the filesystem concurrently and making it unavailable to other services. Finally, on a running session cluster, metrics are often presented in an aggregated way across all jobs (e.g. CPU utilization) and all running jobs are visible to anyone with access to the REST interface of the cluster, something that may not be a desired feature.

Given the aforementioned resource isolation concerns, users often opt for the per-job mode. In this mode, the available cluster manager (e.g. YARN, Kubernetes) is used to spin up a cluster for each submitted job and this cluster is available to that job only. This mode is the prefered mode also for platform developers who manage tens or hundreds of streaming pipelines for the whole org or company.

Per-Job Mode Details and Limitations

Given the better isolation guarantees of the per-job mode, we will focus on this a bit more to present its current state and the main limitations whose alleviation will be the focus of this document.

The jobGraph is created at the client

Currently, with the exception of the StandaloneJobClusterEntryPoint, for the rest of the cluster management frameworks, the main() of the user’s program is executed on client and the generated JobGraph is shipped to the cluster for further execution. In the StandaloneJobClusterEntryPoint, all the job dependencies and the job itself are expected to be in the image used to launch the cluster, thus the cluster entrypoint is the one calling the main()to extract the JobGraph.

Given the above, platform developers normally have a centralised entity (from now on called Deployer) for job submission that they expose to their users. This can be seen as the client-side to the platform. If the JobGraph is generated at the Deployer, then the Deployer needs to download the job jar of each job in order to generate the job graph and launch a Flink CLI in a different process to prevent dependency conflicts. In this setting, the centralised deployment service can be overwhelmed with deployment requests for, in the worst case, all production jobs in a short period of time. The reported issues with this approach include:

  • The network is becoming a bottleneck to download thousands of job jars (100MB per job) into the service.
  • The separate process needed for the CLI brings a large memory footprint and CPU usage in a short period. Thus reduces the throughput of the single service instance.
  • If over-provisioning the service for disaster scenarios, it will be a big resource waste as the QPS of the service probably < 1 in most of the time.

IMPORTANT SIDE NOTE: There is also the web submission for jar execution on the cluster. This solves the issue of computing the JobGraph at the client but it assumes a session cluster already running, which has the drawbacks mentioned in the section about Session clusters.

Discussion


Glossary: In order to establish a common terminology, in the remainder of the document we will use the following terms with their respective semantics:

  • Job: dataflow graph that originates from executable user code with a single execute() call. Equivalent to a JobGraph.
  • Application: the user’s main() method, which may consist of multiple jobs. Consequently, this can correspond to multiple JobGraphs.

With the above in mind, to alleviate the limitation of the centralised deployer becoming a bottleneck, we should:

  1. give the option to the user to run the main() method of his/her application on the cluster, and
  2. allow the user to specify the location from where the job jar (and probably the dist-jar) can be fetched, without requiring the client to provide the actual jar during job submission.

The combination of the above, will allow the load of the job submission to be spread better (ideally evenly) among the available resources of the cluster.

Alternative Approaches

To do the first requirement from the above, we see two solutions:

  1. Have the execute() call of the user-code be the one to spawn the cluster, and execute the jobGraph.
  2. Create a new ClusterEntrypoint which will fetch the job jar, execute its main, and kill the cluster afterwards.


The first approach has the benefit of the user’s main() being a self-contained Flink Application. This means that to run an application could be run on YARN, by running:


            yarn jar MY_FLINK_JOB.jar myMainClass args...


This “Flink-as-a-library” mode, although tempting, assumes that the client is the one shipping everything to the cluster, as there is no “wrapping entity” around the main to bootstrap the execution process.

In addition, at least when HA is activated, the above leads to having the same code executed on all leader contenders until execution arrives to the execute() call. This implies that all the parallel instances are going to execute the main() method, and at the point where they call execute(), some of them will “decide” to not continue executing. In this case, the ones that do not become leaders would have to “exit” the main and execute the tasks assigned to them. This would require a work-around like throwing an exception which is undesirable from a code structure point-of-view. This issue will be more pronounces in the “as-a-library” mode or “standalone-process” mode where the launched containers will not know in advance if they are TaskManagers or JobManagers and they will have to decide "on-the-fly" about their role.

Finally, by creating and tearing down the cluster within the execute(), we limit our control of the cluster to the time window of the execute(). This seems to be “tying” our hands unnecessarily. Independent lifecycles of the cluster and the user’s main() allows us to: cleanup stuff without worrying that the user may need them later, keep the cluster alive for longer if we need to, etc.

Given the above, we choose to go with the second approach. 

Second Approach


For this mode, we propose the creation of a new ClusterEntrypoint , called ApplicationClusterEntryPointConceptually, this entrypoint will be a session cluster, whose lifecycle is bound to the main() method of a user’s application. This entrypoint will be responsible for:

  1. downloading the necessary jars / resources
  2. performing the leader election to decide who will be the entity executing the main()
  3. terminating the cluster when the user main() exits
  4. guaranteeing HA and fault-tolerance

This allows for a single instance of the user’s main() being executed, which also makes things easier to reason about.

DISCLAIMER: WHAT WE ARE NOT TRYING TO DO -- AND WHY

Currently, in the per-job mode, the executed application is not allowed to have more than one JobGraph, meaning it is not allowed to have more than one execute() calls. In this effort we are NOT trying to remove this design decision for two main reasons:

  1. Every multi-execute application can be expressed by a series of multiple single-execute ones that communicate through persistent channels, e.g. Kafka or HDFS.
  2. Including such functionality introduces a lot of complexity that, given point 1 above, is not justified by the added functionality. The added complexity stems mainly from the fact that the different jobs within an application may at any point be in different stages of their execution, e.g. some may be running, some finished and some may be waiting for others to finish in order to consume their output. Supporting savepoints and fault-tolerance/high-availability in these settings would require (partially) re-implementing a workflow management system like airflow within Flink.

Fault Tolerance

From the discussion so far, we are targeting single-execute() applications (although while have the multi-execute() case as a potential future extension) and we are running their main method on the cluster.

For high availability, in the current state of Flink, the application will be re-deployed and the main() will be re-executed. This will result in the jobGraph being recomputed and a new JobId being assigned to it. The latter would lead to the new jobGraph  starting fresh with no state, as Flink uses the JobId to connect a previous instance of a jobGraph with a new one. 

To this end, we need to assign static JobIds that are precomputed and survive re-executions of the main. These JobIds will be used so that when the main is re-executed, instead of re-creating the jobGraph we can fetch it from the JobGraphStore.

Implications


  1. The already existing Per-Job Mode can be seen as a “run-on-client” alternative to what we are proposing here. So efforts aiming at introducing Per-Job execution for new cluster managers like Kubernetes, should take this discussion into account.

First Version Deliverables

  1. New ApplicationEntrypoints which run the main on the cluster with HA and fault tolerance.
  2. Standalone (K8s) and Yarn ApplicationEntrypoints for “Application Mode” support in the respective environments.
  3. The job jar should be able to be fetched to the ApplicationMaster in the following ways: 
    1. Local Path
    2. DFS Path: this will be applicable to Yarn as in a containerised environment (like Kubernetes) we expect that all dependencies/jars will be baked into the image. For Yarn, we are planning to use LocalResources and let Yarn do all the necessary transfers.
  4. A new CLI command where the user will be able to launch jobs in “Application Mode". In the first version, this new command will be available for Yarn and in order to launch an application in this mode, the user will be able to write:

       bin/flink run-application ....

    with the rest of the options being the same as before.

Potential Future Features


  1. Being able to launch one image in Kubernetes and the launched containers will figure out who is the JM and who are the TMs.
  2. Support a “Library Mode”. The user will be able to write something like:


       ApplicationRunner.run((StreamExecutionEnvironment env) -> { 

          … // a consumer of the env

       })


And Flink will be able to execute it in the corresponding environment. This set-up does not play well with environments like YARN where there is an ApplicationMaster that requests the required resources, but it fits very well with containerized environments.

In addition, the above library mode is compatible with the design decision of having an external ClusterEntrypoint performing all the necessary bootstrapping actions.

Migration Plan and Compatibility

This Flip proposal a deployment option for Flink users, there is no impact for existing Flink users.
And there is no breaking change to existing functionality. Thus, no migration is needed.