Current state: Accepted
Discussion thread: https://lists.apache.org/thread.html/a10cae910f92936783e59505d7b9fe71c5f66ceea4c1c287c87164ae@%3Cdev.flink.apache.org%3E
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
HeapKeyedStateBackend is one of the two KeyedStateBackends in Flink, since state lives as Java objects on the heap in HeapKeyedStateBackend and the de/serialization only happens during state snapshot and restore, it outperforms RocksDBKeyeStateBackend when all data could reside in memory.
However, along with the advantage, HeapKeyedStateBackend also has its shortcomings, and the most painful one is the difficulty to estimate the maximum heap size (Xmx) to set, and we will suffer from GC impact once the heap memory is not enough to hold all state data. There’re several (inevitable) causes for such scenario, including (but not limited to):
To resolve this problem, we proposed a solution to support spilling state data to disk before heap memory is exhausted. We will monitor the heap usage and choose the coldest data to spill, and reload them when heap memory is regained after data removing or TTL expiration, automatically.
Similar to the idea of Anti-Caching approach  proposed for database, the main difference between supporting data spilling in HeapKeyedStateBackend and adding a big cache for RocksDBKeyedStateBackend is now memory is the primary storage device than disk. Data is either in memory or on disk instead of in both places at the same time thus saving the cost to prevent inconsistency, and rather than starting with data on disk and reading hot data into cache, data starts in memory and cold data is evicted to disk.
We will describe the overall design in more details in section 2, including the modules required to implement the solution and key technicals used. Then we will describe more details of the design of each module in section 3.
Performance of the newly proposed solution will be depicted in section 4, and limitations observed from online usage and future work will be discussed in section 5 and 6.
Note: to prevent causing unexpected regression to existing usage of HeapKeyedStateBackend, we plan to introduce a new SpillableHeapKeyedStateBackend and change it to default in future if proven to be stable.
Picture 1. Architecture
As illustrated in picture-1, we will monitor the heap status, once we found the heap usage or gc pause time is above some pre-defined threshold, we will choose the proper KG (key group) to spill. On the contrast, if the heap usage is below threshold, we will choose the proper KG to load. Below modules are introduced to support this in heap keyed backend:
There’re two kinds of key group data formats, on-heap and off-heap (on-disk), and we use different accountings for them:
HeapStatusMonitor is responsible to watch the heap status and decide whether we need to perform a spill or load operation. There’re two main factors to watch:
SpillLoadManager is responsible to choose the proper KG and do spill/load when asked by HeapStatusMonitor
MMapManager is responsible for all mmap related operations and management, including:
SpaceAllocator is responsible for allocating space on off-heap/disk to store spilled KG data.
We implemented a compacted SkipList which supports copy-on-write. A delta-chain structure is used for copy-on-write support referring to Nitro , and more details please refer to this sub page
We will introduce a new flink-statebackend-heap-spillable sub-module under the flink-state-backends module and add the new backend there, following the way RocksDB does.
Code wise, we will mainly introduce below classes according to the above design details:
And we will reuse the existing classes for heap backend whenever the code logics remain the same, such as HeapSnapshotStrategy, HeapRestoreOperation, etc.
We performed comparison test between the new heap backend and RocksDB backend with word-count job which has 566MB state size (after serialization) in PCIe-SSD environment, and below are the results:
Heap (all in memory)
RocksDB (all in memory)
Heap (memory + disk)
-Xmx=3GB (spill ratio=57%)
RocksDB (all in memory)
The solution is already deployed in production in Alibaba and used on Singles’ Day in 2018. From our online observation the solution could work well for most cases but also has several limitations, including:
We could see the major issues are related to spilled data, so please keep in mind that although we support data spilling with the solution, the premise is that the major part of your state data should be able to reside in memory, and cold data eviction should be regarded as a protection mechanism instead of a regular operation.
If only the major proportion of data resides in memory, the anti-caching  theory (performance advantage) stands. The more data could be spilled, the bigger state could be supported in HeapKeyedStateBackend. Relative to the limitations mentioned in section 5, we could improve below parts in future work:
 J. DeBrabant, A. Pavlo, S. Tu, M. Stonebraker, and S. Zdonik, “Anti-caching: A new approach to database management system architecture,” Proc. VLDB Endowment, vol. 6, pp. 1942–1953, 2013.
 https://github.com/apache/logging-log4j2, MemoryMappedFileManager
 Lakshman S, Melkote S, Liang J, Mayuram R. Nitro: a fast, scalable in-memory storage engine for NoSQL global secondary index. Proceedings of the VLDB Endowment, 2016, 9(13): 1413–1424