Current state: Implemented
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-76-Unaligned-checkpoints-td33651.html
Released: MVP in Flink 1.11, fully implemented in Flink 1.13
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, triggering a checkpoint will cause a checkpoint barrier to flow from the sources through the DAG towards the sinks. This checkpoint barrier guarantees a consistent snapshot of the DAG at a given point in time.
For each operator with multiple input channels, an alignment phase is started whenever the checkpoint barrier is received on one channel, which causes further inputs of that channel to be blocked until the checkpoint barrier from the others channels are received.
This approach works fairly well for modest utilization but exhibits two main issues on a back-pressured pipeline:
This FLIP proposes a way to perform checkpoints with a non-blocking alignment of checkpoint barriers. It provides the following benefits.
The key idea is to allow checkpoint barriers to be forwarded to downstream tasks before the synchronous part of the checkpointing has been conducted (see Fig. 1). To that end, we need to store in-flight data as part of the checkpoint as described in greater details in this FLIP.
Figure 1: Checkpoint barriers overtake in-flight records. Colored buffers need to be persisted together with the operator state.
We first evaluated with POCs two options when to persist:
In the following experiments, we had a simple topology (source → map → map → sleepy map → measure map), where each channel was a random shuffle. The sleepy map slept on average 0, 0.01, and 0.1 ms per record to induce backpressure. We compared a POC for adhoc spilling, a POC for continuous spilling, and the base commit in the master (1.11-SNAPSHOT). For each approach and sleep time, we measured 3 times and took the median while persisting on HDFS in an EMR cluster with 1 master and 5 slaves, each having 32 cores and 256 GB RAM, with a total parallelism of 160.
We can see that for the baseline, the checkpoint duration strongly increases with sleep time and thus backpressure. For both POCs, the checkpoint duration, however, remained stable and even decreases for higher backpressure because of lower overall data volume. Surprisingly, the checkpointing times for continuous spilling are higher than adhoc for lower sleep times. We suspect that we have additional backpressure for continuously write large amounts of data. With increasing backpressure and thus decreasing data volume, continuous spilling reaches sub-seconds checkpointing times.
Nevertheless, continuous spilling seems to have rather big impact of overall throughput. While adhoc POC showed 10% performance decrease, continuous POC is clearly bottlenecked for higher volume. Primarily for that reason, we decided to go with the adhoc approach in this FLIP. While sub-seconds checkpointing times of continuous spilling surely would be a nice asset, the primary goal of decoupling checkpointing times from backpressure is also reached with adhoc spilling.
In this FLIP, we
Whenever we receive checkpoint barriers, we forward them immediately to the output. To achieve a consistent checkpoint, we need to persist all data in the input and output buffers between the received and forwarded checkpoint barrier. In contrast to the current checkpoints, we not only “freeze” the current operator state during the synchronous checkpointing phase, but also “freeze” the channels (see Fig. 1).
For operators with one input channel, the checkpoint barrier directly overtakes all input data, triggers the synchronous parts of the checkpointing algorithm in the operator and is forwarded to the end of the output buffer.
If an operator has multiple input channels, there are two options:
Here, the tradeoff is between storage size for in-flight data and checkpoint latency and time where an input channel is blocked. The current state can even be incorporated by waiting for checkpoint barrier to arrive at the operator and inserts the checkpoint barrier before the output.
We want to implement all 3 versions to allow a fine-grain evaluation. Depending on the outcome, we will choose a specific default option and exhibit a (hidden) configuration “checkpointing policy”:
Note, that during downscaling the size limit of “max inflight data” might be temporarily exceeded during recovery (see recovery section).
For persistence, existing state management primitives will be reused to colocate operator state and inflight data, which offers the following advantages.
However, some drawbacks may require to use alternatives ways in the future.
The checkpoint format is only implicitly extended by adding more (keyed) state with conventional naming.
In general, inflight data is stored in state handles per operator sub task that are ultimately managed by CheckpointCoordinator. We need to add or modify the following components.
TaskStateManagerImpl (existing class) - holds state assigned to task
From a high level perspective, inflight data is restored where it was previously spilled:
This approach facilitates an easy integration into existing checkpointing mechanism. Restored buffers take precedence over all newly produced/received buffers. The following steps explain the assignment of state and mapping it to in-memory data structures (without rescaling) in more detail:
Since we hook into existing checkpointing mechanism, most of the rescaling challenges are solved in the same way.
To place a state into a task CheckpointCoordinator uses modulo operation. For example, when scaling out:
Subtask uses the same logic to determine the right InputChannel/SubPartition.
To match multi-buffer records, both upstream and downstream must sort records in the same order
For each key group, we need to place channel and operator state on the same subtask. For output buffers, we know (at least conceptually) the keys, so we can store state as KeyedStateHandle and then reuse operator state redistribution code.For input buffers, we only know a set of groups - the groups assigned to this task.
But, given that placement of key groups among the tasks is deterministic, we can: 1) compute old and new placements and 2) give each new task input buffers that could contain it’s key groups. The task then must filter out irrelevant records by keys (after deserialization). For regular (single-buffer) records it’s trivial.
For multi-buffer record, if it doesn’t belong to this subtask, it will never receive remaining buffers and therefore can’t deserialize and filter it out. Possible solutions:
To guarantee progress even during recovery, we need to implement three features:
Thus, in an environment with frequent crashes (under backpressure), progress can be made as soon as the operators themselves are recovered fast enough.
Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.
For compatibility, in documentation, clearly state that users of the checkpoint API can only use unaligned checkpoints if they do not require a consistent state across all operators.
There are two primary goals. First, we want to provide a minimal viable product (MVP) that breaks up the vicious cycle of overload and instabilities of a cluster with slow checkpoints and more accumulated load resulting from recoveries from outdated checkpoints. Second, we aim for the full FLIP implementation that will among other things allow rescaling from unaligned checkpoints to directly counter overload.
In particular, the following improvements will be achieved through full release over MVP:
We aim to make unaligned checkpoints the default behavior after the full implementation.