Status

Discussion thread
Vote thread
JIRA

FLINK-4319 - Getting issue details... STATUS

Release1.5


This Flink Improvement Proposal outlines improvements in how Flink is used on top of various cluster managers (YARN, Mesos) and deployment technologies (Docker, Kubernetes, etc).

For simplicity, we refer to all those cluster- and deployment frameworks as cluster managers.



Original design draft document: https://docs.google.com/document/d/1zwBP3LKnJ0LI1R4-9hLXBVjOXAkQS5jKrGYYoPz-xRk

(may still be used for copying and adapting figures)



Motivation

This FLIP addresses a series of shortcomings in the current integration with YARN and container managers (Docker/Kubernetes) and future operations with Mesos.

Shortcomings to address

  • Resource allocation is currently static. A job needs to grab all required resources up front and maintain that static set of resources throughout the execution. The improvement will allow for dynamic resource acquisition and dynamic release of resources on YARN and Mesos for better resource utilization and dynamic scaling

  • On YARN, all allocated containers are currently of the same size. The improvement will allow to allocate different container sizes (memory / CPU cores) for different operators.

  • Interaction with container management infrastructures like Docker/Kubernetes is clumsy, because Flink Jobs are deployed in two steps: (1) Start the Framework (2) submit the job. With these improvement, jobs can be dockerized and deployed in a natural way as part of the container deployment, step 2 will no longer be necessary

  • Currently, the JobManager accepts multiple concurrent jobs, but has no notion of “fair-”, “capacity-based-”, or “priority-based-” scheduling. In this proposal, we delegate all notion of cross-job resource prioritization to frameworks like YARN and Mesos, which already implement that.

  • The web dashboard disappears after the job finished in Flink on YARN

  • The concept of “sessions” (clusters that accept multiple jobs) with “per job clusters” is intermingled and hard to understand. This proposal separates them more clearly


Core Changes


Single Job JobManager

The most important change is that the JobManager handles only a single job. The JobManager will be created with a JobGraph and will be destroyed after the job execution is finished. This model more naturally maps what happens with most jobs anyways.

Cross-job functionality is handled by other components that wrap and create JobManagers. This leads to a better separation of concerns, and a more modular composability for various cluster managers.

The JobManager constructor will also optionally take a Savepoint or Checkpoint to initialize the job from.

ResourceManager

The ResourceManager (introduced in Flink 1.1) is the cluster-manager-specific component. There is a generic base class, and specific implementations for:

  • YARN

  • Mesos

  • Standalone-multi-job (Standalone mode)

  • Self-contained-single-job (Docker/Kubernetes)


The main tasks of the ResourceManager are

  • Acquire new TaskManager (or slots) by starting containers, or allocating them to a job

  • Giving failure notifications to JobManagers and TaskManagers

  • Caching TaskManagers (containers) to be reused, releasing TaskManagers (containers) that are unused for a certain period.


The ResourceManager may or may not be task slot aware (probably will be). The difference in implementation of the ResourceManager is minimal - A slot aware ResourceManager maintains a map of available TaskManager slots, rather than a map of available TaskManagers.

For the sake of simplicity, the following talks about “slots”, but one can think simply of “TaskManager” instead, for the common case of a one-slot TaskManager.


The following are core aspects of the ResourceManager design:

  • The ResourceManager no longer has a resource pool size, but receives individual requests for slots. That way, jobs can request TaskManagers of different resources (Memory/CPU).

  • The ResourceManager lives across jobs and JobManagers. That enables the use of sessions and the Standalone Mode.

    • Consequently, the ResourceManager is the first point of contact for TaskManagers and handle the TaskManager registration

    • To have a unified way of handling container-caching across Standalone Mode, for batch jobs with varying resource requirements, and for sessions, the ResourceManager keeps the pool of available TaskManagers and their slots.

  • The ResourceManager must be able to fail without interfering with the execution of current jobs. Running jobs can still continue executing and use the slots they have allocated across task failures and retries. They will only not be able to acquire new slots while the ResourceManager is down.

    • The ResourceManager may actually go down in Yarn / Mesos during maintenance. This should not stop the streaming programs.

    • While JobManagers and TaskManagers may be redundant and have a seamless failover, the ResourceManager may always be unable to provide containers or slots for certain times, as a result of the Yarn / Mesos architecture.

  • ResourceManager fault tolerance should work without persistent state in general

    • All that the ResourceManager does is negotiate between the cluster-manager, the JobManager, and the TaskManagers. Its state can hence be reconstructed from re-acquiring containers and re-registration from JobManagers and TaskManagers

    • Note that certain specialization (for example for Mesos or Yarn) may still persist cluster-manager-specific state, if that is required.

  • JobManager may register at the ResourceManager. A registered JobManager gets notifications about TaskManager failures for the slots it allocated from that TaskManager.





In the case where the ResourceManager already has slots available from registered TaskManagers, steps (2) and (3) are skipped.


TaskManager

TaskManagers are both in contact with the ResourceManager and JobManager:

It talks to the ResourceManager to advertise their slot resources, and to the JobManager to execute tasks in the slots allocated to that job. It needs to heartbeat both managers to monitor their liveliness and detect failures.


ResourceManager interaction

  • The TaskManager initially registers at the ResourceManager. A disconnect from the ResourceManager simply results in re-tying to register and advertise the currently available slots.

  • With each heartbeat, the TaskManager also transmits its slot availability. That way, the ResourceManager will always know about available slots. In addition, direct notifications about slots becoming available updates the ResourceManager’s view faster.

  • The TaskManager’s view about which slot is taken (and by which JobManager) is the ground truth - the ResourceManager derives its view from the TaskManager notifications and heartbeats.

  • The ResourceManager may tell the TaskManager to give a slot to a specific JobManager, and the TaskManager will offer that slot to the JobManager. If not accepted, the TaskManager notifies the ResourceManager that the slot is in fact available.

  • The ResouceManager may tell the TaskManager to shut down (exit the process)


JobManager interaction

  • The TaskManager offers a slot to a JobManager at the ResourceManager’s behest. That slot is then tied to that JobManager until the JobManager releases the slot.

  • The TaskManager watches all JobManagers to which it has offered slots. Loss of connection to the JobManager results in triggering master-failure recovery (currently: cancel all tasks form that master)

  • JobManagers can deploy tasks only into slots they allocated.

  • Upon loss of connection to the JobManager, the TaskManager will try to re-register the slots at the new JobManager for that job (retrieved via the HA leader lookup). After a moderate timeout period, it releases the slots and makes them available again. If a backup JobManager does not take over within that period, it will have to re-request the slots from the ResourceManager.


JobManager Slot Pool

The JobManager has a SlotPool which holds the slots that were offered to it and accepted. The JobManager’s scheduler grabs slots from the SlotPool and can thus access all currently registered slots even if the ResourceManager is down.

The SlotPool is a modification of what is currently the InstanceManager.

The SlotPool will attempt to acquire new slots from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available, or it gets a decline from the ResourceManager, or a request times out, it fails the slot request.

The SlotPool releases slots that are unused to the ResourceManager. Slots count as unused if they are not used when the job is fully running (fully recovered).

Dispatcher

The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.

The dispatcher is introduced because:

  • Some cluster managers need a central job spawning and monitoring instance

  • It subsumes the role of the standalone JobManager, waiting for jobs to be submitted

In some setups, the dispatcher is optional (YARN) or not applicable (Kubernetes).




In the future run, the dispatcher will also help with the following aspects:

  • The dispatcher is a cross-job service that can run a long-lived web dashboard

  • Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters

  • The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications

Fault Tolerance

The core recovery mechanism is still task restart and state restore from checkpoint.

The following aspects of fault tolerance are specific to the individual cluster managers and described in each section:

  • Detection and restart of the process(es) that execute the JobManager and ResourceManager

  • Recovery of the job’s JobGraph and libraries



Architecture with Cluster Managers

 YARN

Compared to the state in Flink 1.1, the new Flink-on-YARN architecture offers the following benefits:

  • The client directly starts the Job in YARN, rather than bootstrapping a cluster and after that submitting the job to that cluster. The client can hence disconnect immediately after the job was submitted

  • All user code libraries and config files are directly in the Application Classpath, rather than in the dynamic user code class loader

  • Containers are requested as needed and will be released when not used any more

  • The “as needed” allocation of containers allows for different profiles of containers (CPU / memory) to be used for different operators


Without Dispatcher


With Dispatcher


Yarn-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside the ApplicationMaster process. Failure detection and restart of that process is done by YARN.

JobGraph and libraries are always part of the working directory from which the ApplicationMaster processes is spawned. Internally, YARN stores them in a private HDFS directory.

Mesos

Mesos based setups are similar to YARN with a dispatcher. A dispatcher is strictly required for Mesos, because it is the only way to have the Mesos-specific ResourceManager run inside the Mesos cluster.


Mesos-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside a regular Mesos container. The Dispatcher is responsible for monitoring and restarting those containers in case they fail. The Dispatcher itself must be made highly available by a Mesos service like Marathon, as in this picture:








JobGraph and libraries need to be stored by the dispatcher in a persistent storage, typically the same storage where the checkpoints are stored.

 

Standalone

 

The Standalone Setup is should keep compatibility with current Standalone Setups.

The role of the long running JobManager is now  a “local dispatcher” process that spawns JobManagers with Jobs internally. The ResourceManager lives across jobs and handles TaskManager registration.

For highly-available setups, there are multiple dispatcher processes, competing for being leader, similar as the currently the JobManagers do.




Standalone-specific Fault Tolerance Aspects

By default, there is no mechanism to restart failed processes. This has to be solved by external tools, or by the availability of sufficient standby machines (TaskManagers and Dispatchers).

For high-availability, dispatchers must store the JobGraphs and libraries of submitted jobs in a persistent storage, like the checkpoint storage.

Standalone Setup v2.0

A future version of the Standalone Setup could be thought of to implement something like a “lightweight Yarn” architecture:

  • All nodes run a simple “NodeManager” process that spawns processes for the TaskManagers and JobManagers, that way offering proper isolation of jobs against each other.

  • The LocalDispatcher will not spawn the JobManager internally but on a lightweight “node manager”


 

Docker / Kubernetes

Users define Flink clusters purely based on Dockerized Jobs and TaskManagers. There exists no client and no job submission step is involved.

There are two types of Docker images:

  1. A job-specific image with the job libraries, configured to launch the job (with a JobManager)

  2. A TaskManager image, simply launching a TaskManager. The TaskManager image could be generic (does not contain job-specific libraries), but could also be job-specific (containing the job libraries, saves pulling the libraries when launching the job).


To start a Flink job, one configures a service to start one container of the Job/JobManager image, and N containers of the TaskManager image.













This case uses a ResourceManager that tells each registering TaskManager to give its slots to the JobManager immediately. That way, the JobManager always has all available slots in the cluster at its disposal (without checking and requesting) and can trigger scaleout to the full set of resources.

Docker/Kubernetes-specific Fault Tolerance Aspects

 

ResourceManager and JobManager run inside the master process. Failure detection and restart of that process is done by container orchestrator (Kubernetes).

Libraries are always part of the master container. The JobGraph can be recovered by re-executing the program (may yield problems on non deterministic programs) or by storing it in the checkpoint storage.

 

 


Sessions

The design here  should cover all existing functionality, except the “Yarn Session” mode. The Yarn Session mode currently behaves like a standalone cluster bootstrapped on YARN.

The core functionality of a Yarn Session is to have a set of machines that are already allocated and thus can accept a series of short jobs.


 



 

Component Design and Details

 

Resource Allocation Details (JobManager, ResourceManager, TaskManager)

There are various identifiers used to describe what component and status a message is associated with:

  • ResourceID - The unique identifier of a TaskManager (container)

  • RmLeaderID - Fencing token for the specific instance of granted leadership to a ResourceManager. Disambiguates between messages over the course of loss and re-gain of leadership.

  • JobID - Unique ID of a job over its lifetime

  • JmLeaderID - Unique ID of a JobManager, a fencing token like the current LeaderSessionID. Changes each time a new JobManager takes a job over. Disambiguates between messages sent by different JobManagers over time, to prevent that old stale JobManagers (who still think they are in charge) send interfering messages.

  • AllocationID - Slot allocation identifier, created by the JobManager when requesting a slot, constant across re-tries. Used to identify responses by the ResourceManager and to identify deployment calls towards the TaskManager that was allocated from.

  • Profile - The resource profile of the desired slot (CPU cores, Memory, Disk, …)

 

Slot Allocation with Reequesting a New TaskManager

 


Message loss handling:

 

  • Loss of messages (4) or (6) will be handled by re-try of message (4). ResourceManager will not request a duplicate instance, as there is already a slot allocated to that AllocationID, or a pending request for that AllocationID. If the slot was already released before the retry, a redundant container may be brought up and will be eventually released as unused.

  • Loss of message (10) will be handled by TaskManager registration retries. ResourceID helps deduplicate repeated registrations.

  • Loss of message (12) by timeout and retry. Duplication through retry leads to refused registration at JobManager (AllocationID collision), or to registration and release as unused.

  • Loss of message (13) compensated by heartbeat slot reports

  • Loss of message (14) by registration retry loop, disambiguated by combination of (AllocationID, ResourceID)


Slot Allocation from cached TaskManager

 


Loss of messages handled analog to above.


The slot allocation logic above achieves the following

  • TaskManager and JobManager have the important slot availability and reservation information. ResourceManager has only transient information, acts only as a broker.

  • The ResourceManager should always mark a slot as occupied before it is actually occupied at the TaskManager, thereby never trying to allocate an already used slot. The exception is an incorrect timeout and release on the ResourceManager side, which will be caught by the TaskManager rejecting a slot allocation request for an already occupied slot.

 

Failure Handling

 

TaskManager Failure

ResourceManager: detection and reaction

 

  • Detects TaskManager failure via timeout of heartbeats

  • In Yarn/Mesos, additionally detects failures via notifications by cluster framework

  • Clears TaskManager from live list

  • Sends notification to JobManagers that allocated slots from that TaskManager

  • Starts new container with same configuration as replacement

JobManager: detection and reaction

 

  • Detects TaskManager failure via timeout of heartbeats

  • May additionally receive earlier failure notifications by ResourceManager

  • Removes slots from its slot pool

  • Marks tasks in all slots from the TaskManager as failed, regular task recovery ensues

  • If not enough slots are available for configured parallelism, downscale job

Lost data on TaskManager

 

  • State of executing operators: Recovered by task restart from checkpoint

Recovery Actions

 

  • Restarted TaskManager process will lookup (leader) ResourceManager and register and offer its slots

ResourceManager Failure

TaskManager: detection and reaction

 

  • Detects ResourceManager failure by timeout of heartbeats

  • High-Availability: Notification from ZooKeeper that ResourceManager loses leader status

  • TaskManager re-enters registration loop, but does not need to cancel tasks

  • Upon registration with a new ResourceManager, transmits currently available slots and which slots are allocated to what job

JobManager: detection and reaction

 

  • Detects ResourceManager failure by timeout of heartbeats

  • High-Availability: Notification from ZooKeeper that ResourceManager loses leader status

  • JobManager waits for new ResourceManager to become available (notification via leader election service) and re-requests all slots where the allocation request is pending (see note below about redundant starts)

Lost data

 

  • Active Containers: Recovered from cluster manager master

  • Available Slots: Recovered from TaskManager registration

  • Slots allocated to which job: Recovered from TaskManager registration

  • Pending slot allocations: Lost, JobManager re-requests slots for all outstanding requests upon pending


It may happen that the ResourceManager launches a container for a slot allocation request, fails, and after recovery (the JobManager re-requests the outstanding slots), starts another container for that slot request. The second container will be released as “unused” after the idle time.

JobManager Failure

TaskManager: detection and reaction

 

  • Detects JobManager failure by timeout of heartbeats

  • High-Availability: Notification from ZooKeeper that JobManager loses leader status

  • TaskManager triggers master failure recovery (currently release all tasks)

  • TaskManager will try to re-register slots at the new JobManager for certain time

  • If registration at new JobManager is not successful after a certain time, slots are released and announced as available to ResourceManager.

ResourceManager: detection and reaction

 

  • Detects JobManager failure by timeout of heartbeats

  • High-Availability: Notification from ZooKeeper that JobManager loses leader status

  • (Optional: Notifies TaskManagers about JobManager failure)

Lost data

 

  • JobGraph and Libraries: Recovered from persistent storage or as container startup artifacts

  • Completed checkpoints: Retrieved from HA checkpoint store (ZooKeeper)

  • Execution status of tasks: All tasks are reset to latest completed checkpoint

  • Registered TaskManagers: Combination of receiving re-registrations from TaskManagers and allocation of slots from ResourceManager

Recovery Actions

 

  • Acquired leader status

  • Registers at ResourceManager (to receive notifications about TaskManager failures)

  • Triggers execution of its job (execution graph) from latest completed checkpoint

Concurrent JobManager & ResourceManager Failure

  • TaskManagers handle regular JobManager failure

  • TaskManagers will try to offer slots to a new JobManager for a certain period of time

  • TaskManagers will be in registration loop for ResourceManager

Concurrent TaskManager & ResourceManager Failure

  • JobManager misses failure notifications for TaskManager container from the cluster manager, but detects TaskManager failure via its own heartbeats

  • Pending allocation request from the JobManager time out (or get canceled upon noticing the loss of the ResourceManager), new allocation requests will happen when ResourceManager is coming back

  • JobManager may need to scale down to compensate for lost slots


 


Public Interfaces

The changes here do not affect any program API

The changes do affect Cluster / Client API, which lets people interact with Flink clusters. That API is currently not considered stable or frozen. These changes may actually be implement regardless of this FLIP.

The biggest user-facing changes are the YARN CLI experience. While this FLIP needs not necessarily change the parameters of the CLI, it will change the responses. For example, jobs will be accepted without first waiting for all TaskManagers to be allocated and available.



Compatibility, Deprecation, and Migration Plan

The standalone mode is still emulated/reproduced in a similar way as it exists prior to this FLIP.

The YARN per-job mode should behave strictly better with these changes (based on user feedback from mailing lists, issues, external conversations), so we should let this supersede the current mode.

The YARN session mode can still work as a "standalone mode bootstrapped on YARN".



Test Plan

  • The ResourceManager, JobManager, TaskManager, and SlotPool need strong unit tests for the protocol.
  • The YARN and Mesos integration have their regular tests, as they have prior to this proposal



Rejected Alternatives

(none yet)

2 Comments

  1. Till Rohrmann Based on FLINK-4319 this FLIP is not in progress anymore. It seems to be old as well. Is it save to update the FLIP's status in the head of this page? ...just to make it explicit here as well?

    1. Good point. This FLIP should be moved to completed. I will do it right away. Thanks for the pointer Matthias Pohl.