Status

Preview https://flink-packages.org/packages/heap-backend-incremental-checkpoints

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the most widely used Flink state backends are RocksDB- and Heap-based.

Compared to RocksDB, Heap-based has the following advantages:

  1. Serialization once per checkpoint, not per state modification
    1. This allows to “squash” updates to the same keys
    2. (But can also be disadvantageous as serialization isn’t amortized across the checkpoint)
  2. Shorter synchronous phase (compared to RocksDB incremental)
  3. No sorting needed
  4. No compaction needed
  5. No IO amplification
  6. No JNI overhead

This can potentially give higher throughput and efficiency.

But the use of heap backend is limited by:

  1. State must fit into memory
  2. Lack of incremental snapshots

This FLIP aims to address the latter.

As an example, there was a question on user ML, 250G state deployment that fits into memory (Jan 30, 2020).

Solution

The proposed solution consists of:

  1. Computing an increment, which includes:
    1. a set of changed keys (those included into the key groups)
    2. for each such key, a state change (e.g. appended list values) 
  2. Writing a snapshot (iteration and serialization)
  3. Referring to the previous snapshots
  4. Recovery: iterate over snapshots and apply the diffs
  5. Clean up (compaction)

Computing changed keys

Updated keys

To keep track of the updated keys, a property of existing copy-on-write hash tables can be used, namely entry versions:

  1. For each hashtable, maintain its most recent version confirmed by JM.
  2. On snapshot, only include entries with versions higher than that (entry version is updated on each write operation)

For (1), potentially multiple unconfirmed versions have to be maintained (corresponding to multiple checkpoints, to avoid full snapshot on failure, see unconfirmed checkpoints

Removed keys

Currently, entries are just “unlinked” on removal. Therefore, they wouldn’t be included in the snapshot, but will be present after recovery.

To solve this, we maintain a set of removed keys according to the following rules:

  1. An entry can be removed many times - keep (and write) it only once
  2. An entry can be re-added (and then re-removed)
  3. A snapshot with a removal can still be unconfirmed by JM by the time of the next checkpoint (see unconfirmed checkpoints)
  4. An entry key can be mutable, such as BinaryRowData (this doesn’t cause issues with non-incremental snapshots because removal takes place straight ahead, without tracking)

Removed keys is a separate set instead of deletion mark in each entry. Otherwise, it would be difficult to actually drop them because of [3].

Lookup can be optimized using Bloom filters in the future.

Computing individual state changes

Thanks to the copy-on-write semantics, state objects can safely be accessed in memory even during asynchronous snapshot phase. 

To track which parts of these objects need to be snapshotted journals can be used. They can be maintained either on the level of HeapXxxState or StateMap.Entry.

The former has the advantage of having more knowledge about the operations performed, so fewer full replacements have to be made.

OTH, placing journals in entries allows to:

  1. Bind journal lifecycle to entry/version
  2. Avoid extra hash lookup
  3. Easily access state and track changes

These properties allow accessing the state without adding significant complexity or storing updates which would increase memory overhead. Therefore, placing journals in entries was chosen.

List state

For a list, if only appends were made, it’s enough to store its length right after the previous snapshot to get the added elements. Otherwise, if it was truncated for example, the full list is snapshotted.

Map state

For a map, sets of removed and updated keys need to be maintained. Again, if there were any clear() or “black-box” operations (such as entrySet() with potential replace/remove), then the full map is snapshotted.

As an optimization, map.clear() call can remove the entry completely.

Value, Aggregating and Reducing states

Always take a full snapshot (unmodified values aren’t snapshotted thanks to CoW versions above).

Writing a snapshot

Entry iteration and filtering

To filter out entries by version, StateSnapshotTransformer can’t be used directly because of the different type required (state entry vs state diff). Instead, entries are filtered directly in the iterator. However, existing entry iterators can’t be used directly because size now differs. Instead, boolean “hasNext” is written before each entry, which allows to avoid size computation altogether, which in turn eliminates double-iteration and pre-transformation (at the cost of increased size).

State diff serialization

For each updated entry, it’s diff is serialized using existing value serializers with some simple additions (e.g. “wasCleared” flag). This means that StateDiffSerializers themselves or their state don’t have to be serialized (it’s already done for value serializers).

Referring to the previous snapshots

Here, we use an existing IncrementalStateHandle and use checkpoint ID as Shared StateHandleID (it's complemented with a backend ID and keyGroupRange). 

Once a snapshot is sent to the JM, its corresponding checkpoint ID is stored. Upon receiving a confirmation, it becomes a base for the next one.

Unconfirmed checkpoints

If, at the time of a checkpoint N, the previous one N-1 is still unconfirmed:

  1. Base a new snapshot on some older confirmed snapshot P (and re-snapshot all the changes made since then, meaning that last `N-P` unconfirmed snapshots can’t be released)
  2. If it’s not available, perform a full snapshot

Tolerating more consecutive checkpoint failures (1) reduces the number of full snapshots, but can significantly increase memory usage (as snapshots won’t be released for much longer).

Recovery and rescaling

On recovery, we filter out irrelevant keygroups, sort the state handles by ID (snapshot ID) and apply them one after another.

Clean up (compaction)

Periodically, a full snapshot is taken (configurable number of checkpoints). All subtasks must perform it for the same checkpoint, otherwise changes to state accumulate in subtask. For this, simple checkpoint_id % interval == 0 is used.

Older checkpoints are removed by JM once they aren’t referenced anymore (existing functionality).

Alternatively, compaction can be done incrementally. For that, each snapshot should have (for each map):
- vStart: minimum version of the map needed to restore using this snapshot
- vTaken: version of the map at which it was taken
Upon finalization, all snapshots that are not needed for restore (i.e. oldSnapshot.vTaken < newSnapshot.vStart for each map) can be dropped. This tracking can be done in SharedStateRegistry by adding some (ordered) mapping between StateMaps and their versions, per key group.

vStart is calculated as a minimum version across all map entries (each entry maintains a version when it was last fully snapshotted to support journaling). Each time, some configurable fraction of entries is fully snapshotted (chosen by the lowest lastFullSnapshot version).

In the first version, the simplest solution seems preferable. 

Relationship to FLIP-158

FLIP-158 proposes a similar idea but in a more generic way.

However, this generality comes at cost, as with FLIP-151 Flink is able to:

1. "Squash" the changes made to the same key. For example, if some counter was changed 10 times then FLIP-151 will send only the last value (this allows to send AND store less data compared to FLIP-158).

2. Keep in memory only the changed keys and not the values (this allows to reduce memory AND latency (caused by serialization + copying on every update) compared to FLIP-158)

3. Changelogging involves synchronous serialization on each state access

4. Journalling allows for even smaller increments for collection-typed state (e.g. snapshotting only the appended ListState values rather than the whole list)


On the other hand, integration with FLIP-158 would allow "background" materialization, eliminating periodic full checkpoints (or more complex way to achieve this).

For such an integration:

  1. on startup, IncHeapKeyedBackend notifies ChangelogStateBackend that no changes should be sent (to the changelog)
  2. on checkpoint, ChangelogStateBackend asks IncHeapKeyedBackend to create an incremental checkpoint
  3. on materialization, ChangelogStateBackend asks for a full snapshot

This integration is out of scope of this FLIP.

Proposed Changes

  1. The following (~20) classes to be refactored to allow extension (extracting methods, adding hooks, etc.):
    1. (Heap-) KeyedStateBackend, SnapshotStrategy, RestoreOperation
    2. (CopyOnWrite-) Map, Entry, State, Snapshots
    3. RegisteredKeyValueStateBackendMetaInfo
  2. Incremental versions of the following (~10) classes to be added:
    1. HashMapStateBackend
    2. CoW StateMap, StateMapSnapshot, StateTable, StateTableSnapshot
    3. StateMapEntry
    4. KeyedStateBackend, RestoreOperation, SnapshotStrategy,
    5. RegisteredKeyValueStateBackendMetaInfo
    6. StateSnapshotRestore
  3. The following new classes to be added (~10):
    1. RemovalLog
    2. For each state type: Diff, DiffSerializer, Journal, JournalFactory
    3. StateSnapshotKeyGroupReaderV7
  4. The following classes to be updated:
    1. StateTableByKeyGroupReaders: add new version
    2. Fs- and MemoryStateBackend will have additional settings to construct incremental backend versions

The backend/new classes will reside in a new module under flink/flink-state-backends.

The refactorings are mostly to allow extension and customization.

Public Interfaces

Code: see Proposed changes

Configuration:

  • Reuse existing state.backend.incremental
  • Add state.backend.incremental.max

UI: add checkpoint type (probably not in the first version)

Monitoring: expose per-subtask checkpoint durations/sizes, including sync-async phases (probably not in the first version)

Compatibility, Deprecation, and Migration Plan

Existing state can be read using the updated backend as a full snapshot (using older V6 reader). After it’s loaded, incremental snapshots can be taken.

Nothing is deprecated (at least for now). The feature is disabled by default.

TTL support: wrapping existing TTL transformers inside the new incremental StateMap iterator is enough to support TTL.

Limitations

In the first version:

  1. Single checkpoint failure (or no confirmation) leads to a non-incremental snapshot
  2. Only KV-states (sorted states, aka PQ, aka timers are snapshotted non-incrementally)
  3. Only states with immutable keys (e.g. not BinaryRowData)

Likely in subsequent versions:

  1. Single inflight checkpoint
  2. Only keyed state
  3. Only with async snapshots

Increase of:

  1. number of files comprising a checkpoint
  2. memory usage to store removed keys and state diffs
  3. recovery time (loading & applying diffs)

Test Plan

  1. Unit tests for the new code
  2. Add incremental mode: FileStateBackendTest, EventTimeWindowCheckpointingITCase, KeyedStateCheckpointingITCase
  3. Add increment-specific integration tests to FileStateBackendTest
  4. Parameterize the whole CI build (each build randomly or as a separate periodic build) using existing state.backend.incremental=true/false. This affects the following tests:
    1. About 10 IT cases in stream.sql (e.g. RankITCase, JoinITCase)
    2. Planner IT cases (e.g. GroupWindowITCase)
    3. E2E: test_streaming_sql (planner) (especially blink), test_resume_savepoint, test_ha_datastream (HA*)
    4. Others: CEPOperatorTest, WindowOperatorMigrationTest, StreamOperatorSnapshotRestoreTest, FileStateBackendTest, MemoryStateBackendTest, HeapAsyncSnapshotTtlStateTest, LocalRecoveryITCase, RegionFailoverITCase, UnalignedCheckpointITCase, 
  5. Important for checking removal correctness: WindowAggregateITCase, WindowCheckpointingITCase
  6. Load testing - similar to the evaluation below but more detailed:
    1. job runtime
    2. recovery time
    3. checkpointing times
    4. memory overhead

(in the future, side-by-side comparisons with permuted configurations can be added)

Evaluation (POC)

A prototype was built and benchmarked (not very thoroughly). A program was used with HashMap<Integer, Payload> state and some random updates to it. Keyspace was constant (1M), payload size varied. The source injected 10M keys without delay, after which it stopped.

Configuration:

Keyspace: 1M, updated keys: 10K, payload per key: 10KB

Checkpoint pause: 1s. Full heap snapshot was taken after 15 incremental.

Hardware: 6x m5.xlarge: 1 JM + 1 source + 4 operators (holding map state). Each TM had 12 GB RAM.


File full

File incremental

Rocks Incremental

Job runtime

12m22s

9m21s

18m

Checkpoint size, Mb (75/95%)

9565

114/9565

1681/6142

Checkpoint duration, s (75/95%)

16

1/15

21/49

Sync duration, ms (75/95/99/100%)

0/1.65

0/1/6/199

488/1960

Note that the use case is specifically targeted at incremental backends:

  1. no full updates (can actually use delta)
  2. no removals (don’t have to keep/send removed stream keys)
  3. big payload (save more IO per key not sent)
  4. simple serialization (snapshot is faster)

Rejected Alternatives

Merge-on-read (LSMT)

(As proposed by Yu Li during offline discussion)

Algorithm

Let's consider a Map state:

  1. on barrier, create a new map to accept new writes; make old map read-only
  2. save the old map and include a reference to it into the snapshot along with references to previous read-only maps
  3. in the background, compact (merge) old maps; save the new merged version; use reference to it for the next snapshot instead of refs to merged maps
  4. serve reads from the new map first; if not found then check old maps in order (use "thumbstone" for deletes?)

For non-map states we need a O(nlogn) merge algorithm or a data structure.

Alternatively, we can use maps on a stream_key level (as it is done for RocksDB).

So we have to choose between: 1) slower writes 2) slower compaction 3) less incremental snapshots

Memory considerations

The less frequent compactions are, the smaller are checkpoints. On the other hand, this means keeping more old versions, and therefore higher memory consumption.

In the chosen solution we also keep old versions but limited to number of concurrent checkpoints and their time.

Comparison

  1. Advantages
    1. Faster checkpointing
    2. Simplicity of checkpointing (though other parts add complexity)
    3. May also be more adaptive to new state type (SetState/QueueState or user-customized state)
  2. Disadvantages
    1. Higher memory consumption (or less efficient checkpointing)
    2. Less predictable (because of more GC and compactions)
    3. slower for non-map states (or less efficient checkpointing)
    4. State reads are slower (need to traverse several "layers")
    5. requires more effort to implement

Summarizing, it could be a good fit for some users, providing a different set of tradeoffs. So we should have both, ideally.

But, given that it requires more effort and is less universal, we should implement the journalling solution first.

Binary diff at a key-group level

Gives bigger gain (smaller checkpoint size) in exchange for additional overhead:

  1. to perform a diff (CPU)
  2. carry/load previous version:

The latter can be achieved by:

  • carrying previous version in binary form in RAM: memory overhead
  • carrying previous version in RAM and serialize in-flight: memory overhead, CPU overhead on serialization
  • reading previous checkpoint stored locally: IO latency + IO cost = no gain