...
Table of Contents maxLevel 1
Status
Current state: "Under discussion"Approved
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-41-Unify-Keyed-State-Snapshot-Binary-Format-for-Savepoints-td29197.html
...
Released: targeted for Flink 1.9.0
Motivation
Currently, Flink managed user keyed state is serialized with different binary formats across different state backends.
...
- Unify across all state backends a savepoint format for keyed state that is more future-proof and applicable for potential new state backends. Checkpoint formats, by definition, are still allowed to be backend specific.
- Rework abstractions related to snapshots and restoring, to reduce the overhead and code duplication when attempting to implement a new state backend.
Current Status
...
NOTE - the remainder of this document uses the following abbreviations.
...
- the binary format of the key and values in each state entry, and
- metadata or markers written along with the contiguous state entries to facilitate reading them from state handle streams on restore.
State entries key value binary layout
The difference in binary layout for key and values in state entries across state backends relates to how they are tailored for how each backend maintains working state.
...
RocksDBKeyedStateBackend | HeapKeyedStateBackend | Note | |
---|---|---|---|
ValueState |
|
|
|
ListState |
|
|
|
MapState |
|
|
|
AggregatingState |
| [NS, K, SV] | |
ReducingState |
| [NS, K, SV] | |
FoldingState |
| [NS, K, SV] | |
Timers | [KG :: TS :: K :: NS, (empty)] |
|
|
Metadata or markers for state iteration
The heap and RocksDB backend has different approaches in how they write metadata or markers in the checkpoint stream to facilitate iterating through contiguous state values of key groups at restore time.
The reason for the difference is mainly due to the fact that for the heap backend, the number of state values is known upfront when taking a snapshot. The following subsections describes this in detail.
HeapKeyedStateBackend
For the HeapKeyedStateBackend
, the binary layout of the contiguous state values of a single key group is as follows:
...
Please see HeapRestoreOperation#readKeyGroupStateData()
for the implementation.
RocksDBKeyedStateBackend
For the RocksDBKeyedStateBackend
, the binary layout of the contiguous state values of a single key group is as follows:
...
Please see RocksDBFullRestoreOperation#readKvStateData()
for the implementation.
Proposal
This proposal covers 2 goals:
- Define the unified binary format for keyed state backends.
- Extend / adjust internal abstractions around SnapshotStrategy and RestoreOperation so that new backends added in the future writes keyed state in savepoints in the unified format.
Unified binary format for keyed state
We propose to unify the binary layout for all currently supported keyed state backends to match the layout currently adopted by RocksDB, for all levels including the layout of contiguous state values of a key group as well as layout of individual state primitives.
...
To conclude this, the format that RocksDB currently uses is the more general approach for arbitrarily large keyed state. It is feasible to allow heap backend to work with RocksDB backend's current format, but not the other way around.
Implementation
The goal for the proposed implementation is the following:
...
The following sections goes over the main design choices.
Rework SnapshotStrategy
class hierarchy to differentiate between savepoint and checkpoint strategies
Currently, the binary layout for keyed state in savepoints for the heap and RocksDB state backend is defined by HeapSnapshotStrategy
and RocksFullSnapshotStrategy
, respectively. Those classes are used for both savepoints as well as full checkpoints. To be able to define the layout for savepoints and checkpoints of keyed state independently, the hierarchy for SnapshotStrategy
needs to be extended to differentiate a KeyedBackendSavepointStrategyBase
and a KeyedBackendCheckpointStrategyBase
. All existing strategies, including HeapSnapshotStrategy
, RocksFullSnapshotStrategy
and RocksDBIncrementalSnapshotStrategy
should be rebased onto the checkpoint strategy base (and potentially renamed for better clarity).
...
public interface SnapshotResources {
void release();
List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots();
}
...
Unifying the format for keyed state via KeyedBackendSavepointStrategyBase
KeyedBackendSavepointStrategyBase
is responsible for defining the unified binary layout for keyed state in savepoints. Subclasses would only be responsible for providing a state backend specific KeyedStateSavepointSnapshotResource
. Apart from the state snapshot and meta info snapshots, the KeyedStateSavepointSnapshotResource
additionally provides iterators for snapshotted keyed state -
...
Within the asynchronous part of the snapshot, keyed state snapshot iterators are obtained from the KeyedStateSavepointSnapshotResource
. Please refer to the next section for more details regarding the iterators.
Iterating state entries for keyed state snapshots: KeyedStateSnapshotIterator
and KeyedStateSnapshotPerKeyGroupMergeIterator
A KeyedStateSnapshotIterator
is an iterator that iterates a single registered state entries in key group order. The asynchronous part of the snapshot operation then uses a KeyedStateSnapshotPerKeyGroupMergeIterator
to combine multiple KeyedStateSnapshotIterator
to iterate across all registered states key group by key group.
...
The iterator returns a (byte[], byte[])
key value pair for each state entry. Subclass implementations are responsible for serializing state objects in the case of state backend that only lazily serializes state on snapshots (e.g. the heap backend). This should also provide enough flexibility for state backends that may have a mix of serialized and non-serialized working state, such as the disk-spilling heap backend that is currently being discussed [1].
Restore procedures and migrating from older versions
Likewise to the rework of the SnapshotStrategy
hierarchy to differentiate between savepoints and checkpoints, the RestoreOperation
hierarchy should also be changed correspondingly to include KeyedBackendSavepointRestoreOperation
and KeyedBackendCheckpointRestoreOperation
. All existing implementations for keyed backend restore procedures, namely HeapRestoreOperation
, RocksDBFullSnapshotRestoreOperation
, and RocksDBIncrementalSnapshotRestoreOperation
should be rebased onto the checkpoint restore operation and possibly renamed for clarity purposes.
...
- If it is a
SavepointKeyGroupsStateHandle
, then the restored state is assured to be a savepoint of the new unified format. It is safe to restore state from the state handles usingKeyedBackendSavepointRestoreOperation
. - If it is a
KeyGroupsStateHandle
, then we have either restored from a checkpoint after the rework, or a savepoint before the rework. Either way, it will be safe to continue using the existing keyed state restore operations (HeapRestoreOperation
,RocksDBFullSnapshotRestoreOperation
, andRocksDBIncrementalSnapshotRestoreOperation
) to read state, since these were previously used for savepoints as well.
Migrating from Previous Savepoints
With the proposed implementation, users will be able to seamlessly migrate from previous savepoints of older Flink versions. When reading from older versions, old read paths will be used. From then on, new savepoints of keyed state will be written in the new unified format.
References
...