Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents
maxLevel1

Status

Current state"Under discussion"Approved

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-41-Unify-Keyed-State-Snapshot-Binary-Format-for-Savepoints-td29197.html

...

Released: targeted for Flink 1.9.0

Motivation

Currently, Flink managed user keyed state is serialized with different binary formats across different state backends.

...

  • Unify across all state backends a savepoint format for keyed state that is more future-proof and applicable for potential new state backends. Checkpoint formats, by definition, are still allowed to be backend specific.
  • Rework abstractions related to snapshots and restoring, to reduce the overhead and code duplication when attempting to implement a new state backend. 

Current Status

...

NOTE - the remainder of this document uses the following abbreviations. 

...

  1. the binary format of the key and values in each state entry, and
  2. metadata or markers written along with the contiguous state entries to facilitate reading them from state handle streams on restore.

State entries key value binary layout

The difference in binary layout for key and values in state entries across state backends relates to how they are tailored for how each backend maintains working state.

...


RocksDBKeyedStateBackend

HeapKeyedStateBackend

Note

ValueState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

  • CompositeKey(...) represents the composite key built via RocksDBSerializedCompositeKeyBuilder#buildCompositeKeyNamespace.
  • Moreover, the MSB bit of the composite key may also be flipped if it is the last entry of a state for a given key group (see RocksFullSnapshotStrategy#setMetaDataFollowsFlagInKey). Note: this only applies to snapshotted state; keys in working state should never have the MSB bit flipped.

ListState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

  • The format of SV for ListState is different between heap and RocksDB backend.
  • For the heap backend, the format is defined by the serializer obtained from the state descriptor, which is a ListSerializer.
  • RocksDB backend defines its own format in RocksDBListState#serializeValueList.

MapState

[CompositeKey(KG, K, NS) :: UK, UV]

[NS, K, SV]

  • For the heap backend, each serialized state data is the complete user MapState, written using the serializer obtained from the state descriptor, which is a MapSerializer.
  • For RocksDB backend, each serialized state data is a single entry in the user MapState. User map key and value serializers are obtained from the MapSerializer provided by the state descriptor.

AggregatingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]
ReducingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

FoldingState

[CompositeKey(KG, K, NS), SV]

[NS, K, SV]

Timers

[KG :: TS :: K :: NS, (empty)]

TS :: K :: NS

  • the timestamp is a long value with MSB sign bit flipped.
  • Value is empty in RocksDB

Metadata or markers for state iteration

The heap and RocksDB backend has different approaches in how they write metadata or markers in the checkpoint stream to facilitate iterating through contiguous state values of key groups at restore time.
The reason for the difference is mainly due to the fact that for the heap backend, the number of state values is known upfront when taking a snapshot. The following subsections describes this in detail.

HeapKeyedStateBackend

For the HeapKeyedStateBackend, the binary layout of the contiguous state values of a single key group is as follows:

...

Please see HeapRestoreOperation#readKeyGroupStateData() for the implementation.

RocksDBKeyedStateBackend

For the RocksDBKeyedStateBackend, the binary layout of the contiguous state values of a single key group is as follows:

...

Please see RocksDBFullRestoreOperation#readKvStateData() for the implementation.

Proposal

This proposal covers 2 goals:

  • Define the unified binary format for keyed state backends.
  • Extend / adjust internal abstractions around SnapshotStrategy and RestoreOperation so that new backends added in the future writes keyed state in savepoints in the unified format.

Unified binary format for keyed state

We propose to unify the binary layout for all currently supported keyed state backends to match the layout currently adopted by RocksDB, for all levels including the layout of contiguous state values of a key group as well as layout of individual state primitives.

...

To conclude this, the format that RocksDB currently uses is the more general approach for arbitrarily large keyed state. It is feasible to allow heap backend to work with RocksDB backend's current format, but not the other way around.

Implementation

The goal for the proposed implementation is the following:

...

The following sections goes over the main design choices.

Rework SnapshotStrategy class hierarchy to differentiate between savepoint and checkpoint strategies

Currently, the binary layout for keyed state in savepoints for the heap and RocksDB state backend is defined by HeapSnapshotStrategy and RocksFullSnapshotStrategy, respectively. Those classes are used for both savepoints as well as full checkpoints. To be able to define the layout for savepoints and checkpoints of keyed state independently, the hierarchy for SnapshotStrategy needs to be extended to differentiate a KeyedBackendSavepointStrategyBase and a KeyedBackendCheckpointStrategyBase. All existing strategies, including HeapSnapshotStrategyRocksFullSnapshotStrategy and RocksDBIncrementalSnapshotStrategy should be rebased onto the checkpoint strategy base (and potentially renamed for better clarity).

...

public interface SnapshotResources {
    void release();
    List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots();
}

...

Unifying the format for keyed state via KeyedBackendSavepointStrategyBase

KeyedBackendSavepointStrategyBase is responsible for defining the unified binary layout for keyed state in savepoints. Subclasses would only be responsible for providing a state backend specific KeyedStateSavepointSnapshotResource. Apart from the state snapshot and meta info snapshots, the KeyedStateSavepointSnapshotResource additionally provides iterators for snapshotted keyed state -

...

Within the asynchronous part of the snapshot, keyed state snapshot iterators are obtained from the KeyedStateSavepointSnapshotResource. Please refer to the next section for more details regarding the iterators.

Iterating state entries for keyed state snapshots: KeyedStateSnapshotIterator and KeyedStateSnapshotPerKeyGroupMergeIterator

KeyedStateSnapshotIterator is an iterator that iterates a single registered state entries in key group order. The asynchronous part of the snapshot operation then uses a KeyedStateSnapshotPerKeyGroupMergeIterator to combine multiple KeyedStateSnapshotIterator to iterate across all registered states key group by key group.

...

The iterator returns a (byte[], byte[]) key value pair for each state entry. Subclass implementations are responsible for serializing state objects in the case of state backend that only lazily serializes state on snapshots (e.g. the heap backend). This should also provide enough flexibility for state backends that may have a mix of serialized and non-serialized working state, such as the disk-spilling heap backend that is currently being discussed [1].

Restore procedures and migrating from older versions

Likewise to the rework of the SnapshotStrategy hierarchy to differentiate between savepoints and checkpoints, the RestoreOperation hierarchy should also be changed correspondingly to include KeyedBackendSavepointRestoreOperation and KeyedBackendCheckpointRestoreOperation. All existing implementations for keyed backend restore procedures, namely HeapRestoreOperation, RocksDBFullSnapshotRestoreOperation, and RocksDBIncrementalSnapshotRestoreOperation should be rebased onto the checkpoint restore operation and possibly renamed for clarity purposes.

...

  • If it is a SavepointKeyGroupsStateHandle, then the restored state is assured to be a savepoint of the new unified format. It is safe to restore state from the state handles using KeyedBackendSavepointRestoreOperation.
  • If it is a KeyGroupsStateHandle, then we have either restored from a checkpoint after the rework, or a savepoint before the rework. Either way, it will be safe to continue using the existing keyed state restore operations (HeapRestoreOperation, RocksDBFullSnapshotRestoreOperation, and RocksDBIncrementalSnapshotRestoreOperation) to read state, since these were previously used for savepoints as well.

Migrating from Previous Savepoints

With the proposed implementation, users will be able to seamlessly migrate from previous savepoints of older Flink versions. When reading from older versions, old read paths will be used. From then on, new savepoints of keyed state will be written in the new unified format.

References

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-to-support-disk-spilling-in-HeapKeyedStateBackend-td29109.html

...