You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


One major advantage of Flink is its efficient and easy-to-use state management mechanism. However, this mechanism has not evolved much since it was born and hasn't kept pace with the demands of the cloud-native era. In this FLIP, we revisit the current architecture of Flink state management model and propose an augmented alternative in a disaggregated fashion.

Motivation

The past decade has witnessed a dramatic shift in Flink's deployment mode, workload patterns, and hardware improvements. We've moved from the map-reduce era where workers are computation-storage tightly coupled nodes to a cloud-native world where containerized deployments on Kubernetes become standard. At the same time, hardware improvements, like the tenfold increase in network bandwidth (from mere hundreds of Mbps to 25Gbps, comparable to local SSD bps), unlock compelling options for state storage. Fast intranet transmission and low-cost object storage make cloud storage a feasible and cost-effective choice. Flink's workload scale has also dramatically transformed. A state size of several hundred megabytes, once considered large, pales in comparison to the multi-terabyte states commonly encountered in today's applications.

Given these significant changes in deployment, hardware, and workloads, the current state access model heavily reliant on local disks no longer aligns with Flink's future in the cloud-native era.

Challenges

We discuss key challenges faced in Flink state management today in this session, including those newly introduced by cloud-native demands, and those longstanding due to the limitations of the local storage model. We aim to resolve these challenges with the new design.

Local Disk Constraints

While containerization technology provides resource isolation and secure job operations, it comes at a cost: resource is pre-allocated. As a result, local disk space is fixed and limited and becomes a bottleneck for data-intensive jobs. In the current model, we do not have good solutions except scaling up the job if running out of disk space, which is very costly.

Spiky Resource Usage

The current state model triggers periodic CPU and network I/O bursts during checkpointing as shown in Figure 1. This is because checkpointing triggers rocksdb compaction (CPU spike) and state file uploads to DFS (network I/O spike). For large-state jobs, those spikes happen almost the same time across all tasks, leading to the instability of the entire cluster. This also results in data processing TPS hampered by performance-crippling spikes during checkpoints.


Figure 1: CPU usage of Flink Job, periodic spikes incur during checkpointing

Elasticity and Fast Rescaling

Achieving zero-downtime rescaling remains a challenge for Flink, particularly for those with large state sizes. The current model involves state redistribution, download and rebuild during rescaling, hindering even near-zero downtime goals. While FLINK-31238 reduces rebuild time by merging and clipping SST files directly, downloading large state files to local disks is still a bottleneck. This significantly extends rescaling durations and can even block downscaling attempts if the aggregate state size exceeds local disk capacity.

Light and Fast Checkpoints

Checkpointing is a routine operation for Flink Engine to ensure fault tolerance and enable rescaling. Besides that, end-to-end exactly-once hinges on committing data in a granularity of each checkpoint, a fundamental requirement for streaming DB. Hence making light and fast checkpoints is crucial. While generic log-based checkpoiniting[] tackles the problem, it introduces an additional log layer to double-write state changes and incurs extra CPU and network overheads.

High-Level Overview and Design

In the current model, Flink operators read and write states through a component named State Backend as illustrated on the left-hand side in Figure 2. States are put on files on local disks if not fit in memory. As aforementioned, to recover after a failure or enable rescaling, states are flushed to disk and state files are dumped to a durable storage periodically to make checkpoints, and read back after recovery/rescaling. This model greatly relies on the capability of local disks, which leads to the challenges listed above.

The right-hand side of Figure 2 illustrates the proposed new model. In the new model, local disks are optional as a secondary cache, and the remote DFS (e.g., S3/OSS/HDFS/GFS...) serves as a primary storage for states. Therefore, the remote DFS becomes the source of truth. This is a fundamental change from relying on local disks to a complete computation and storage disaggregated model. States are streamed to DFS continuously; memory and local disks act as a cache. This disaggregated architecture embraces a cloud-native approach and unlocks several advantages:

  1. Local disk is no longer a constraint as DFS used as the primary storage.
  2. The checkpointing procedure is fast, completing within 10s regardless of state size. Continuous streaming of states to DFS significantly reduces the amount of data needed to write during the checkpointing procedure, leading to rapid completion.
  3. DB compaction is decoupled from the checkpointing procedure and not bound to a specific Task Manager anymore. This allows for independent scheduling of compaction tasks, enabling better load-balancing and eliminating disruptive resource bursts cluster-wide.
  4. As for recovery and rescaling, state files are instantly accessible after the State Backend is up, eliminating the need for time-consuming local disk downloads.

The disaggregated model also facilitates a clean architecture for state sharing and querying. Checkpoint files are feasible to be shared between operators to eliminate redundant storage. For state querying, file-level APIs are provided to enable straightforward querying that resembles normal tables without relying on backend details or state specifics. In this FLIP, we focus on the disaggregated model and how it integrates with the current Flink Engine. Sharing and querying will be explored in future FLIPs.


Hurdles for the Disaggregated State Model

Our initial Proof of Concept (PoC) results showed a significant performance drawback for the disaggregated model (state store using DFS as primary storage) compared to the current model (local disk storage). Without local disk caches, the disaggregated model achieved only 2-5% of the maximum TPS observed with local disks across various setups. Analysis excluding page cache effects reveals that the main bottleneck lies in the blocking of the main task thread during state access as shown in Figure 3, making Flink engine performance highly sensitive to state access latency.

State access operates at a per-record granularity, executed by the Flink task thread. These accesses are queued and processed sequentially within the mailbox executor. State writing is non-blocking, as writes are appended to a write buffer and asynchronously flushed to disk when full. However, state reading involves fetching data from the disk on cache misses, which causes the task thread to block and halt further processing until the data is retrieved. As illustrated below, HDFS has 20+ times higher latency than local disk (1.5ms Vs. 68us), which explains why TPS drops 95% when switching from local disk to HDFS.


Public Interfaces


Proposed Changes


Compatibility, Deprecation, and Migration Plan


Test Plan


Rejected Alternatives


  • No labels