You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Current »

The problem: The JobManager (JM) is a single point of failure. When it crashes, TaskManagers (TM) fail all running jobs and try to reconnect to the same JM. A failed JM looses all state and can not resume the running jobs; even if it recovers and the TMs reconnect.

Solution: implement JM fault tolerance/high availability by having multiple JM instances running with one as leader and the other(s) in standby. The exact coordination and state update protocol between JM, TM, and clients is the topic of this document.

JIRA: FLINK-2287

Having standby JM instances requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK).

Pros:

  • Proven solution (other projects use it for this as well)
  • Apache TLP with large community, docs, and library with required "recipies" like leader election (see below)

ZK dependency

  • There is no separate ZK dependency for the client and the server
  • Makes it easy to start ZK in "managed" mode (see below)
  • Will we shade ZK away?

JM/TM/Client configuration

  • Add HA mode for JMs (allowing non-HA and HA JM operation)
  • TM, clients need to:
    • connect directly to configured JM (non-HA)
    • connect to JM leader (HA)
  • Note: as of current stable ZK version (3.4.6) the ZK quorum needs to be statically configured (see ZK-107)
  • ZK client configuration in Flink only requires the address of the ZK quorum to connect to

ZK server configuration

  1. ZK managed by Flink (default): Flink startup scripts start ZK servers for HA mode

    zookeeper.mode: "managed" (default)
    zookeeper.dir: "/flink" (default)
    zookeeper.quorum: "host:port,[...,]host:port"
    zookeeper.property.<zk-property>: ... (set single ZK properties)
    zookeeper.conf=path-to-zk-conf (similar to current Hadoop conf; has precedence over Flink conf)
  2. Dedicated ZK cluster (user sets up ZK)
    zookeeper.mode: "dedicated"
    zookeeper.quorum: "host:port,[...,]host:port"zookeeper.dir: "/flink" (default)

Startup scripts

  • Startup scripts need to provided unique ID for zookeeper.dir znode to use for the JM, TM, instances in order to allow multiple Flink clusters to run with the same ZK installation, e.g. coordination happens in zookeeper.dir/ID/ for each started cluster
  • Add an optional file `backup-masters` or `masters`
  • `bin/start-cluster-streaming.sh` takes extra argument for backup masters to start

Leader Election

  • TaskManager and Client retrieve leading JobManager using ZooKeeper

  • Every time they lose connection to the current JobManager, they try to reconnect to the current leader using ZooKeeper

 

  • No labels