You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


One major advantage of Flink is its efficient and easy-to-use state management mechanism. However, this mechanism has not evolved much since it was born and hasn't kept pace with the demands of the cloud-native era. In this FLIP, we revisit the current architecture of Flink state management model and propose an augmented alternative in a disaggregated fashion.

Motivation

The past decade has witnessed a dramatic shift in Flink's deployment mode, workload patterns, and hardware improvements. We've moved from the map-reduce era where workers are computation-storage tightly coupled nodes to a cloud-native world where containerized deployments on Kubernetes become standard. At the same time, hardware improvements, like the tenfold increase in network bandwidth (from mere hundreds of Mbps to 25Gbps, comparable to local SSD bps), unlock compelling options for state storage. Fast intranet transmission and low-cost object storage make cloud storage a feasible and cost-effective choice. Flink's workload scale has also dramatically transformed. A state size of several hundred megabytes, once considered large, pales in comparison to the multi-terabyte states commonly encountered in today's applications.

Given these significant changes in deployment, hardware, and workloads, the current state access model heavily reliant on local disks no longer aligns with Flink's future in the cloud-native era.

Challenges

We discuss key challenges faced in Flink state management today in this session, including those newly introduced by cloud-native demands, and those longstanding due to the limitations of the local storage model. We aim to resolve these challenges with the new design.

Local Disk Constraints

While containerization technology provides resource isolation and secure job operations, it comes at a cost: resource is pre-allocated. As a result, local disk space is fixed and limited and becomes a bottleneck for data-intensive jobs. In the current model, we do not have good solutions except scaling up the job if running out of disk space, which is very costly.

Spiky Resource Usage

The current state model triggers periodic CPU and network I/O bursts during checkpointing as shown in Figure 1. This is because checkpointing triggers rocksdb compaction (CPU spike) and state file uploads to DFS (network I/O spike). For large-state jobs, those spikes happen almost the same time across all tasks, leading to the instability of the entire cluster. This also results in data processing TPS hampered by performance-crippling spikes during checkpoints.


Figure 1: CPU usage of Flink Job, periodic spikes incur during checkpointing

Elasticity and Fast Rescaling

Achieving zero-downtime rescaling remains a challenge for Flink, particularly for those with large state sizes. The current model involves state redistribution, download and rebuild during rescaling, hindering even near-zero downtime goals. While FLINK-31238 reduces rebuild time by merging and clipping SST files directly, downloading large state files to local disks is still a bottleneck. This significantly extends rescaling durations and can even block downscaling attempts if the aggregate state size exceeds local disk capacity.

Light and Fast Checkpoints

Checkpointing is a routine operation for Flink Engine to ensure fault tolerance and enable rescaling. Besides that, end-to-end exactly-once hinges on committing data in a granularity of each checkpoint, a fundamental requirement for streaming DB. Hence making light and fast checkpoints is crucial. While generic log-based checkpoiniting[] tackles the problem, it introduces an additional log layer to double-write state changes and incurs extra CPU and network overheads.

Public Interfaces


Proposed Changes


Compatibility, Deprecation, and Migration Plan


Test Plan


Rejected Alternatives


  • No labels