This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree
Skip to end of metadata
Go to start of metadata


Current stateReleased

Discussion thread

JIRA FLINK-4379 - Getting issue details... STATUS

Released: Flink 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


This FLIP proposes the addition a non-partitioned state abstraction that can be rescaled.

Currently (as of cd94aad), stream task state consists of non-partitioned operator and function state as well as partitioned key-value state.

Dynamic scaling for partitioned state has been discussed in the Dynamic Scaling: Key Groups design document (no associated FLIP as the design doc predates the introduction of the FLIP process) and the initial implementation can be found in pull request #1988.

The main question for rescalable state is how to split or merge state when scaling up or down, respectively. For partitioned state the main observation is that splitting and merging can be based on the state key. This key is exposed to the runtime (see KvState) and it is possible to automatically redistribute state based on the key. For non-partitioned state on the other hand this is not possible as the state abstraction is agnostic to the contained state and the state is exposed as a single unit for each sub task.

Example: Kafka Source

Let's look at Flink's Kafka source as an example of non-partitioned state usage. Consider the following initial setup with 4 Kafka partitions (KP1-4) and 2 Flink Kafka source tasks (S1-2). Each sub task will have state for two Kafka partitions. When snapshotting, the state for two partitions (partition index, current offset, etc.) will be stored as one unit by each sub task, (KP1, KP2) and (KP2, KP3).

Scaling Down

When we now resume from this state with a scaled down job, only state (KP1, KP2) of  S1 will be restored. The remaining state (KP3, KP4) cannot be matched to any sub task. This means that only partitions 1 and 2 will be consumed again by Flink, skipping partitions 3 and 4, which were previously mapped to sub task 2 (figure left). What we want instead is that after scaling down all state is merged and restored by S1 (figure right):

Scaling Up

When we scale our job up, the Kafka partitions will continue to be only mapped to subtasks 1 and 2, leaving sub tasks 3 and 4 idling (figure left). What we want instead is that after scaling up all state is split and distributed over all four sub tasks:

Public Interfaces

On the API level, the main objective is to allow the user to expose multiple units of state that can be redistributed. With respect to Kafka source example above, this would require the ability to express that KP1, KP2, KP3, KP4 are separate units of state. This sections describes one option to accomplish this.

Add Checkpointed variants 

We add a CheckpointedBase inteface and add new sub types that are repartitionable.


Checkpointed is the non-partitioned variant we currently have, the List variants snapshot and restore a List<T>.

  • CheckpointedList: automatically reassigns state when rescaling. The assignment of state to subtasks can be fixed if no rescaling happens.
  • CheckpointedUnionedList: We collect the state from each function as a list and on recovery we send each operator the union of all collected lists. In restoreState, the operator can then select the parts of the unioned list which are relevant for the current run. This can be used if the user needs low-level control of how to assign state.
public interface CheckpointedList<T extends Serializable> {

List<T> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

Proposed Changes

We gather the state handles as we currently do for all sub tasks and marked them to be merged. On restore, the checkpoint coordinator merges the state handles and sends them to the tasks. Each task then creates a lazily deserializing list over the state elements. This is independent of the API options outlined above.

Compatibility, Deprecation, and Migration Plan

Existing users should not be affected by the proposed changes. All existing non-partitioned state variants should be kept. If the rescalabe APIs are accepted it should (in theory) be possible for users to easily replace all existing non-partitioned state variants with the redistributable variants. Given that the Checkpointed interface is marked as @PublicEvolving it should be possible to actually enforce such a change.

Test Plan

The implementation should be tested by running a job, triggering a savepoint and then resuming from this savepoint with both lower and higher parallelism. If support for rescaling of keyed state is already available, this should ideally be tested both separately and together with keyed state.

Rejected Alternatives

User-defined Split/Merge Methods

Instead of exposing the redistributable state units via a list and unioning them on restore, we could keep the current non-partitioned state interfaces and require user-defined split and merge methods.

This would have the advantage that users can keep on using the current APIs. On the other hand, state would likely need to be deserialized completely on the JobManager and then split or merged via the user-specified function, then serialized again, and send to the tasks. This would mean that the JobManager would not be able continue treating user state as black boxes.




  • No labels