Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The JobClient represents the user facing component of the distributed system. It is used to communicate with the JobManager and as such it is responsible for submitting Flink jobs, querying the state of the submitted jobs and receiving status messages of the currently running job.

The JobClient is also an actor with which you communicate via messages. There exist two messages related to job submission: SubmitJobDetached and SubmitJobWait. The first message submits a job and de-registers from receiving any status messages and the final job result. The detached mode is useful if you want to submit your job to a Flink cluster in a fire and forget manner.

The SubmitJobWait message submits a job to the JobManager and registers to receive status messages for this job. Internally this is done by spawning a helper actor which is used as the receiver of the status messages. Once the job has terminated, a JobResultSuccess with the duration and the accumulator results is sent to the spawned helper actor by the JobManager. Upon receiving this message, the helper actor forwards the message to the client which issued the SubmitJobWait message initially and then terminates.

Asynchronous vs. Synchronous Messages

Wherever possible, Flink tries to use asynchronous messages and to handle responses as futures. Futures and the few existing blocking calls have a timeout after which the operation is considered failed. This prevents the system from getting deadlocked in case a message gets lost or a distributed component crashes. However, if you happen to have a really large cluster or a slow network, timeouts might be triggered wrongly. Therefore, the timeout for these operations can be specified via "akka.ask.timeout" in the configuration.

Before an actor can talk to another actor it has to retrieve an ActorRef for it. The lookup for this operation requires also a timeout. In order to make the system fail fast if an actor is not started, the lookup timeout is set to a
smaller value than the regular timeout. In case that you experience lookup timeouts, you can increase the lookup time via "akka.lookup.timeout" in the configuration.

Another peculiarity of Akka is that it sets a limit for the maximum message size it can send. The reason for this is that it reserves a serialization buffer of the same size and does not want to waste memory. If you should ever encounter a transmission error because the message exceeded the maximum size, you can increase the framesize via "akka.framesize" in the configuration.

Failure Detection

Failure detection in a distributed system is crucial for its robustness. When running on a commodity cluster, it can always happen that some of the components fail or are no longer reachable. The reasons for such a failure are polymorphic and can reach from hardware breakdown to network outages. A robust distributed system should be able to detect failed components and recover from it.

Flink detects failed components by using Akka's DeathWatch mechanism. DeathWatch allows actors to watch other actors even though they are not supervised by this actor or even living in a different actor system. Once a watched actor dies or is no longer reachable, a Terminated message is sent to the watching actor. Consequently, upon receiving such a message, the system can take steps against it. Internally, the DeathWatch is realized as heartbeat and a failure detector which, based on the heartbeat-interval, hearbeat-pause and failure threshold, estimates when an actor is likely to be dead. The heartbeat-interval can be controlled by setting the "akka.watch.heartbeat.interval" value in the configuration. The acceptable heartbeat-pause can be specified via "akka.watch.heartbeat.pause". The heartbeat-pause should be a multiple of the heartbeat-interval, otherwise a lost heartbeat directly triggers the DeathWatch. The failure threshold can be specified via "akka.watch.threshold" and it effectively controls the sensitivity of the failure detector. More details about the DeathWatch mechanism and the failure detector can be found here.

In Flink, the JobManager watches all registered TaskManagers and the TaskManagers watch the JobManager. This way, both components know when the other component is no longer reachable. The JobManager reacts by marking the respective TaskManager as dead which prevents that future tasks are deployed to it. Moreover, it fails all tasks which are currently running on this task manager and reschedules their execution on a different TaskManager. In case that the TaskManager was only marked dead because of a temporary connection loss, then it can simply re-register itself at the JobManager once the connection has been re-established.

The TaskManager also watches the JobManager. This monitoring allows the TaskManager to enter a clean state by failing all currently running tasks when it detects a failed JobManager. Additionally, the TaskManager will try to reconnect to the JobManager in case that the triggered death was only caused by network congestion or a connection loss.

Future Development

At the moment, only three components, the JobClient, JobManager and TaskManager, are implemented as actors. In order to better exploit concurrency while improving scalability, it is conceivable to realize more components as actors. A promising candidate could be the ExecutionGraph whose individual ExecutionVertices or even the  associated Execution object could be implemented as an actor. Such a fine-grained actor model would have the advantage that the state updates could be directly sent to the respective Execution object. This way, the JobManager would be noticeably relieved from being a single point of communication.

Configuration

  • akka.ask.timeout: Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: 100 s).

- `akka.ask.timeout`: 
- `akka.lookup.timeout`: Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d) (DEFAULT: **10 s**).
- `akka.framesize`: Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier (DEFAULT: **10485760b**).
- `akka.watch.heartbeat.interval`: Heartbeat interval for Akka's DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **akka.ask.timeout/10**).
- `akka.watch.heartbeat.pause`: Acceptable heartbeat pause for Akka's DeathWatch mechanism. A low value does not allow a irregular heartbeat. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **akka.ask.timeout**).
- `akka.watch.threshold`: Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to detect a dead TaskManager. A thorough description of Akka's DeathWatch can be found [here](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector) (DEFAULT: **12**).
- `akka.transport.heartbeat.interval`: Heartbeat interval for Akka's transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: **1000 s**).
- `akka.transport.heartbeat.pause`: Acceptable heartbeat pause for Akka's transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: **6000 s**).
- `akka.transport.threshold`: Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value (DEFAULT: **300**).
- `akka.tcp.timeout`: Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value (DEFAULT: **akka.ask.timeout**).
- `akka.throughput`: Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness (DEFAULT: **15**).
- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. Set this value to 'on' in case of debugging (DEFAULT: **off**).
- `akka.startup-timeout`: Timeout after which the startup of a remote component is considered being failed (DEFAULT: **akka.ask.timeout**).