Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


by Yuan Mei, Zakelly Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei 


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release2.0    2.1

...

The current state model triggers periodic CPU and network I/O bursts during checkpointing as shown in Figure 1. This is because checkpointing triggers rocksdb compaction (CPU spike) and state file uploads to DFS (network I/O spike). For large-state jobs, those spikes happen almost the same time across all tasks, leading to the instability of the entire cluster. This also results in data processing TPS hampered by performance-crippling spikes during checkpoints.


Figure 1: CPU usage of Flink Job, periodic spikes incur during checkpointing

Elasticity and Fast Rescaling

...

High-Level Overview and Design

Figure 2: The Disaggregated Model of State Management

In the current model, Flink operators read and write states through a component named State Backend as illustrated on the left-hand side in Figure 2. States are put on files on local disks if not fit in memory. As aforementioned, to recover after a failure or enable rescaling, states are flushed to disk and state files are dumped to a durable storage periodically to make checkpoints, and read back after recovery/rescaling. This model greatly relies on the capability of local disks, which leads to the challenges listed above.

...

Our initial Proof of Concept (PoC) results showed a significant performance drawback for the disaggregated model (state store using DFS as primary storage) compared to the current model (local disk storage). Without local disk caches, the disaggregated model achieved only 2-5% of the maximum TPS observed with local disks across various setups. Analysis excluding page cache effects reveals that the main bottleneck lies in the blocking of the main task thread during state access as shown in Figure 3, making Flink engine performance highly sensitive to state access latency.

Figure 3: Current state access model in task thread

State access operates at a per-record granularity, executed by the Flink task thread. These accesses are queued and processed sequentially within the mailbox executor. State writing is non-blocking, as writes are appended to a write buffer and asynchronously flushed to disk when full. However, state reading involves fetching data from the disk on cache misses, which causes the task thread to block and halt further processing until the data is retrieved. As illustrated below, HDFS has 20+ times higher latency than local disk (1.5ms Vs. 68us), which explains why TPS drops 95% when switching from local disk to HDFS.

Table 1: Access Latency across different storage medium

Proposed Changes

Unlocking the disaggregated model requires tackling three key engine challenges:

...

  • Fast Checkpoint:Since most state files already reside in DFS, only small incremental updates need to be uploaded during checkpointing, drastically reducing network transfer time. In addition, both working state and checkpoints can reference the same underlying physical files, eliminating duplication. This saves storage space and further accelerates checkpoints.
  • Restore: With DFS plays as the primary storage, downloading large state files to local disks is avoided, significantly reducing restore time. Local disks (cache) can be gradually warmed up after the job starts, further optimizing performance.
  • Rescale: Rescaling leverages existing solutions like ClipDB/IngestDB to accelerate rebuilding the state store on DFS directly. Notice that since file downloads are eliminated, local disk constraints for downscaling are no longer an issue.

Figure .x 4: Checkpoint/Restore/Rescale Mechanism (Current Model on the left; Disaggregated Model on the right)

For more detailed information on sharing the underlying file system between checkpoints and working states, along with changes to checkpointing, restoring, and rescaling procedures, please refer to FLIP-428.

...

To optimize CPU efficiency across diverse scenarios, we are actively exploring an "Adaptive Local Cache" capable of intelligently transitioning between the aforementioned caching solutions based on workload characteristics. As depicted in Figure XXX, this solution aims to achieve optimal performance regardless of the prevailing conditions. Initial testing shows that with an adaptive local cache, we can achieve at least the same performance while states can fit into the local disk. More details will be revealed in future FLIP(s).

Figure 5

Remote Compaction (FLIP-430)

...