Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

One major advantage of Flink is its efficient and easy-to-use state management mechanism. However, this mechanism has not evolved much since it was born and hasn't kept pace with the demands of the cloud-native era. In this FLIP, we revisit the current architecture of Flink state management model and propose an augmented alternative in a disaggregated fashion.

Motivation

The past decade has witnessed a dramatic shift in Flink's deployment mode, workload patterns, and hardware improvements. We've moved from the map-reduce era where workers are computation-storage tightly coupled nodes to a cloud-native world where containerized deployments on Kubernetes become standard. At the same time, hardware improvements, like the tenfold increase in network bandwidth (from mere hundreds of Mbps to 25Gbps, comparable to local SSD bps), unlock compelling options for state storage. Fast intranet transmission and low-cost object storage make cloud storage a feasible and cost-effective choice. Flink's workload scale has also dramatically transformed. A state size of several hundred megabytes, once considered large, pales in comparison to the multi-terabyte states commonly encountered in today's applications.

Given these significant changes in deployment, hardware, and workloads, the current state access model heavily reliant on local disks no longer aligns with Flink's future in the cloud-native era.

Challenges

We discuss key challenges faced in Flink state management today in this session, including those newly introduced by cloud-native demands, and those longstanding due to the limitations of the local storage model. We aim to resolve these challenges with the new design.

Local Disk Constraints

While containerization technology provides resource isolation and secure job operations, it comes at a cost: resource is pre-allocated. As a result, local disk space is fixed and limited and becomes a bottleneck for data-intensive jobs. In the current model, we do not have good solutions except scaling up the job if running out of disk space, which is very costly.

Spiky Resource Usage

The current state model triggers periodic CPU and network I/O bursts during checkpointing as shown in Figure 1. This is because checkpointing triggers rocksdb compaction (CPU spike) and state file uploads to DFS (network I/O spike). For large-state jobs, those spikes happen almost the same time across all tasks, leading to the instability of the entire cluster. This also results in data processing TPS hampered by performance-crippling spikes during checkpoints.

...

Figure 1: CPU usage of Flink Job, periodic spikes incur during checkpointing

Elasticity and Fast Rescaling

Achieving zero-downtime rescaling remains a challenge for Flink, particularly for those with large state sizes. The current model involves state redistribution, download and rebuild during rescaling, hindering even near-zero downtime goals. While FLINK-31238 reduces rebuild time by merging and clipping SST files directly, downloading large state files to local disks is still a bottleneck. This significantly extends rescaling durations and can even block downscaling attempts if the aggregate state size exceeds local disk capacity.

Light and Fast Checkpoints

Checkpointing is a routine operation for Flink Engine to ensure fault tolerance and enable rescaling. Besides that, end-to-end exactly-once hinges on committing data in a granularity of each checkpoint, a fundamental requirement for streaming DB. Hence making light and fast checkpoints is crucial. While generic log-based checkpoiniting[] tackles the problem, it introduces an additional log layer to double-write state changes and incurs extra CPU and network overheads.

High-Level Overview and Design

Figure 2: The Disaggregated Model of State Management

...

The disaggregated model also facilitates a clean architecture for state sharing and querying. Checkpoint files are feasible to be shared between operators to eliminate redundant storage. For state querying, file-level APIs are provided to enable straightforward querying that resembles normal tables without relying on backend details or state specifics. In this FLIP, we focus on the disaggregated model and how it integrates with the current Flink Engine. Sharing and querying will be explored in future FLIPs.

Hurdles for the Disaggregated State Model

Our initial Proof of Concept (PoC) results showed a significant performance drawback for the disaggregated model (state store using DFS as primary storage) compared to the current model (local disk storage). Without local disk caches, the disaggregated model achieved only 2-5% of the maximum TPS observed with local disks across various setups. Analysis excluding page cache effects reveals that the main bottleneck lies in the blocking of the main task thread during state access as shown in Figure 3, making Flink engine performance highly sensitive to state access latency.

...

Table 1: Access Latency across different storage medium

Proposed Changes

Unlocking the disaggregated model requires tackling three key engine challenges:

  1. Implementing a non-blocking execution model: This unleashes full CPU potential while waiting for I/O or network operations.
  2. Developing a disaggregated data store: This store must seamlessly handle both writing and reading from DFSs.
  3. Integrating with Flink Engine: The data store needs to be integrated with the Flink Engine and existing state management mechanism (checkpoint/recover/rescale).

Non-blocking Asynchronous Execution Model: Parallel I/O (FLIP-425)

In FLIP-425, we propose a non-blocking execution model enabling asynchronous state access. In this new model, asynchronous State APIs(FLIP-424) are introduced. These APIs register callbacks with the task thread mailbox executor and allow for efficient chained execution. However, asynchronous access presents several new challenges:

...

FLIP-425 introduces a new component named "Asynchronous Execution Controller(AEC)" to address these challenges and orchestrate the asynchronous execution. A new set of asynchronous state APIs is also introduced accordingly within this asynchronous paradigm. Please refer to FLIP-425 for a deeper dive.

Batching for Network I/O: Beyond Parallel I/O (FLIP-426)

The asynchronous model introduced in FLIP1 effectively helps boost overall throughput with parallel I/O operations. However, simply expanding I/O parallelism is not enough for remote storage. For network I/O, the substantial round-trip time of RPC calls far outweighs the individual I/O size's impact. Hence, how to efficiently merge I/O requests into single network I/O operations and fetching multiple keys within a single call plays an important role in reducing the overhead of individual interactions and leverages available bandwidth more effectively. This becomes critically important for large-state jobs, where frequent state data fetching can lead to bottlenecks.

...

  1. Categorizing state operations: This allows for fine-grained management and optimization of different types of data access.
  2. Grouping I/Os together: Batching adjacent requests reduces individual RPC overhead and leverages network efficiency.
  3. Integrating with the "Asynchronous Execution Controller": Integrating with FLIP-425's orchestration mechanism for asynchronous execution.

Disaggregated State Store: ForSt (FLIP-427)

Figure 2 illustrates our adopted approach of using an embedded key-value (k-v) store to complement remote storage. We introduce ForSt, a disaggregated version of frocksdb (the default local disk state store for Flink) to achieve this purpose.

...

Detailed design can be found in FLIP-427 and PoC implementation can be found at https://github.com/ververica/ForSt/tree/disagg-poc.

Faster Checkpoint/Restore/Rescale: Leverage Shared DFS (FLIP-428)

By moving primary state storage to the remote filesystem (DFS) and using local disks optionally for caching, ForSt offers a tiered storage system for Flink.This approach provides significant improvements for checkpointing, restoring, and rescaling by leveraging the fact that working state and checkpoint files share the same underlying file system.

...

For more detailed information on sharing the underlying file system between checkpoints and working states, along with changes to checkpointing, restoring, and rescaling procedures, please refer to FLIP-428.

Tiered Storage: Local Disk as a Secondary Cache (FLIP-429)

While the asynchronous execution model (FLIP-425) and network I/O grouping (FLIP-426) have significantly enhanced performance, access latency in Table 1 reveal that direct access to remote storage (DFS) remains 95% slower than local disk on average. Therefore, efficiently utilizing available local disks is crucial to maximize overall performance. In this context, local disks act as an optional secondary cache.

...

Figure 5: Adaptive Local Cache

Remote Compaction (FLIP-430)

Compaction is at the heart of LSM-based storage to enable competitive read performance and reduce space amplification, but it introduces bursts of high I/O and CPU usage. This poses a challenge in pre-allocated resource scenarios, potentially leading to underutilization or performance hiccups.

...

The specific design details of the Remote Compaction architecture are still under development and will be finalized in future design documents.

Public Interface Changes

This section details the steps involved in enabling disaggregated state management from a user perspective and related public interface changes.

StateBackend: ForSt

We introduce a new state backend "ForSt" (named "For Stream") to manage disaggregated states. Existing state backends, "rocksdb" utilizing local disk and "heap" residing in memory, remain unaltered. Users can use them as before. To leverage the "ForSt" state backend, users need to configure the backend and additionally specify a working directory for states.

Code Block
languageyml
titleConfigure to use ForSt
state.backend.type: forst
# specify another storage other than the checkpoint storage (optional).
state.backend.forst.working-dir: hdfs:///path-to-your-storage


Asynchronous State APIs (FLIP-424) 

As aforementioned, a new suite of asynchronous state APIs (FLIP-424) has been introduced to seamlessly integrate with the Asynchronous Execution Model (FLIP-425). Existing synchronous state APIs remain unaltered, ensuring a smooth transition for users. Importantly, as demonstrated in the PoC results within FLIP-425, mimicking synchronous behavior using the asynchronous model incurs negligible overhead (~3%). 

...


DataStream API users transitioning to asynchronous state APIs will find the code structure largely unchanged, with the primary differences lying in state initialization and the use of future-based asynchronous APIs for processing. For comprehensive details please refer to FLIP-424.

PoC Results


Environment Setup:

  • Version: based onFlink 1.19
  • Deployment mode: yarn per-Job
  • Flink yarn cluster:
    • 1 master 2 workers
    • specifications: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
  • HDFS cluster:
    • 1 master 2 workers
    • specifications: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
    • yarn cluster and HDFS cluster are on the same LAN
  • State backend: ForSt (Based on RocksDB 8.5.3)

...

The PoC running instructions are provided in the Appendix for those interested in exploring the setup further.

Road Map + Launching Plan

As discussed, the disaggregated state evolution requires adjustments across the entire Flink Engine stack. To ensure a smooth and successful transition, we've divided the work into several stages with well-defined milestones:

...

DataStream users need to rewrite the stateful operators in a call-back way as demonstrated in the example. We will try to provide tools later to facilitate migration.

Rejected Alternatives

Tiered Storage: DFS as complimentary when the local disk is out of space. While initially appealing due to its intuitive nature and potential performance benefits within local capacity, the tiered storage solution with local disk overflow to remote storage ultimately proves unsuitable for disaggregated state management in Flink.

...

  1. Heavy Checkpointing Procedure: A considerable amount of files need to be uploaded during checkpointing.
  2. Limited Data Structure Flexibility: Confining local disk data to the SST format restricts potential performance gains from alternative caching structures.
  3. Inaccurate Warm/Cold Distinction: File-level classification of data as warm or cold inaccurately reflects actual access patterns, leading to suboptimal resource allocation.
  4. More Complicated File Management: This architecture indicates that both local disk and DFS play part of the primary storage, hence needs to unify the file management of the local disk and DFS, which is complicated in extreme cases of error handling e.t.c. 

Appendix: How to run the PoC

ForSt PoC Branch: https://github.com/ververica/ForSt/tree/disagg-poc

...