Authors: Yu Li, Pengfei Li

Status

Current stateAccepted

Discussion threadhttps://lists.apache.org/thread.html/a10cae910f92936783e59505d7b9fe71c5f66ceea4c1c287c87164ae@%3Cdev.flink.apache.org%3E

JIRA:

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

HeapKeyedStateBackend is one of the two KeyedStateBackends in Flink, since state lives as Java objects on the heap in HeapKeyedStateBackend and the de/serialization only happens during state snapshot and restore, it outperforms RocksDBKeyeStateBackend when all data could reside in memory.

However, along with the advantage, HeapKeyedStateBackend also has its shortcomings, and the most painful one is the difficulty to estimate the maximum heap size (Xmx) to set, and we will suffer from GC impact once the heap memory is not enough to hold all state data. There’re several (inevitable) causes for such scenario, including (but not limited to):

To resolve this problem, we proposed a solution to support spilling state data to disk before heap memory is exhausted. We will monitor the heap usage and choose the coldest data to spill, and reload them when heap memory is regained after data removing or TTL expiration, automatically.

Similar to the idea of Anti-Caching approach [1] proposed for database, the main difference between supporting data spilling in HeapKeyedStateBackend and adding a big cache for RocksDBKeyedStateBackend is now memory is the primary storage device than disk. Data is either in memory or on disk instead of in both places at the same time thus saving the cost to prevent inconsistency, and rather than starting with data on disk and reading hot data into cache, data starts in memory and cold data is evicted to disk.

We will describe the overall design in more details in section 2, including the modules required to implement the solution and key technicals used. Then we will describe more details of the design of each module in section 3.

Performance of the newly proposed solution will be depicted in section 4, and limitations observed from online usage and future work will be discussed in section 5 and 6.

Note: to prevent causing unexpected regression to existing usage of HeapKeyedStateBackend, we plan to introduce a new SpillableHeapKeyedStateBackend and change it to default in future if proven to be stable.

Design Overview

Picture 1. Architecture

As illustrated in picture-1, we will monitor the heap status, once we found the heap usage or gc pause time is above some pre-defined threshold, we will choose the proper KG (key group) to spill. On the contrast, if the heap usage is below threshold, we will choose the proper KG to load. Below modules are introduced to support this in heap keyed backend:

Design Details

KeyGroupSizeAccountingManager

There’re two kinds of key group data formats, on-heap and off-heap (on-disk), and we use different accountings for them:

HeapStatusMonitor

HeapStatusMonitor is responsible to watch the heap status and decide whether we need to perform a spill or load operation. There’re two main factors to watch:

SpillLoadManager

SpillLoadManager is responsible to choose the proper KG and do spill/load when asked by HeapStatusMonitor

MMapManager

MMapManager is responsible for all mmap related operations and management, including:

SpaceAllocator

SpaceAllocator is responsible for allocating space on off-heap/disk to store spilled KG data.

Data Structure for off-heap/on-disk Data

We implemented a compacted SkipList which supports copy-on-write. A delta-chain structure is used for copy-on-write support referring to Nitro [4], and more details please refer to this sub page

Implementation

We will introduce a new flink-statebackend-heap-spillable sub-module under the flink-state-backends module and add the new backend there, following the way RocksDB does.

Code wise, we will mainly introduce below classes according to the above design details:

And we will reuse the existing classes for heap backend whenever the code logics remain the same, such as HeapSnapshotStrategy, HeapRestoreOperation, etc.

Performance Evaluation (Preview)

We performed comparison test between the new heap backend and RocksDB backend with word-count job which has 566MB state size (after serialization) in PCIe-SSD environment, and below are the results:

Backend

QPS (K/s)

Note

Heap (all in memory)

400

-Xmx=10GB

RocksDB (all in memory)

100

Cache=10GB

Heap (memory + disk)

160

-Xmx=3GB (spill ratio=57%)

RocksDB (all in memory)

100

Cache=3GB

Limitations

The solution is already deployed in production in Alibaba and used on Singles’ Day in 2018. From our online observation the solution could work well for most cases but also has several limitations, including:

We could see the major issues are related to spilled data, so please keep in mind that although we support data spilling with the solution, the premise is that the major part of your state data should be able to reside in memory, and cold data eviction should be regarded as a protection mechanism instead of a regular operation.

Future Work​

If only the major proportion of data resides in memory, the anti-caching [1] theory (performance advantage) stands. The more data could be spilled, the bigger state could be supported in HeapKeyedStateBackend. Relative to the limitations mentioned in section 5, we could improve below parts in future work:

  1. Fine-grained IO control on data spilling
  2. Reduce cpu cost during checkpoint
  3. Support incremental checkpoint

References​

[1] J. DeBrabant, A. Pavlo, S. Tu, M. Stonebraker, and S. Zdonik, “Anti-caching: A new approach to database management system architecture,” Proc. VLDB Endowment, vol. 6, pp. 1942–1953, 2013.

[2] https://github.com/apache/logging-log4j2, MemoryMappedFileManager

[3] http://www.mapdb.org/blog/mmap_files_alloc_and_jvm_crash

[4] Lakshman S, Melkote S, Liang J, Mayuram R. Nitro: a fast, scalable in-memory storage engine for NoSQL global secondary index. Proceedings of the VLDB Endowment, 2016, 9(13): 1413–1424