Status

Motivation

With the current state registration interface, managed state (both operator and keyed state) can be lazily declared any time during processing. This poses problems in several dimensions for Flink’s state management capabilities:

  • State migration / schema evolution: Flink can only know whether or not state migration or schema evolution is required for a state at the time of the state’s declaration (i.e., when the state descriptor is provided). Consequently, state migration / schema evolution would not work with lazy state declaration because we would then need a global stop during during processing to perform full migrations.
  • Queryable state availability: Currently, a state is queryable only when it is registered at the key-value state registry, which happens at the time of the state’s declaration. This is also the case when restoring from a checkpoint; before the state is declared again, the state is not queryable.

  • JobManager lacks information about state: all information about the state that is provided through state declaration is not visible to the JobManager, i.e. serialization, operator state handle redistribution scheme, etc. For example, for operator state redistribution, it should be possible that the JobManager redistributes state partition handles with whatever mode is currently specified (SPLIT_REDISTRIBUTE or BROADCAST). However, with lazy state declaration, the only way to carry on this information to the JobManager on restore is to encode it within the written state, and therefore cannot be changed at restore time.

Proposed Changes & Public Interfaces

The problems mentioned above are restraining the possibility of pushing out more powerful state management capabilities. Towards this, we propose to limit a more restricted form of state declaration.

(1) Pre-flight, eager state declaration

All operators, as well as rich UDFs can be stateful. Previously, state can be lazily declared with a state descriptor any time during processing by using getRuntimeContext().getState(descriptor). Both concerns of state access as well as state registration is addressed through the same call.

We propose to decompose this into 2 steps: (a) pre-flight (i.e. before the job graph is submitted) state descriptor registration, and (b) runtime access to registered state using the state ID only.

(1.a) New state descriptor registration methods

For (a), new state descriptor registration methods will be added to the StreamOperator and RichFunction interfaces. These methods will only be functional when invoked pre-flight. Invocation after that would fail with an exception.

StreamOperator interface new eager state declaration methods:

public interface StreamOperator<OUT> {
    ...

    void registerKeyedState(StateDescriptor<?,?> descriptor);
    void registerOperatorState(OperatorStateDescriptor<?,?> descriptor);

    Collection<StateDescriptor<?,?>> getRegisteredKeyedStateDescriptors();
    Collection<OperatorStateDescriptor<?,?>> getRegisteredOperatorStateDescriptors();

    ...
}

RichFunction interface new eager state declaration methods:

public interface RichFunction {
    ...

    void registerKeyedState(StateDescriptor<?,?> descriptor);
    void registerOperatorState(OperatorStateDescriptor<?,?> descriptor);

    Collection<StateDescriptor<?,?>> getRegisteredKeyedStateDescriptors();
    Collection<OperatorStateDescriptor<?,?>> getRegisteredOperatorStateDescriptors();

    ...
}

Operators carry the registered state descriptors as they are shipped to JM / TMs. This essentially also exposes information about registered states of each JobVertex in the JobGraph to the JobManager, opening up opportunities for future state management capabilities.

When creating keyed / operator state backends for operators (i.e. StreamTask#createKeyedStateBackend() and StreamTask#createOperatorStateBackend()), the registered descriptors must be provided, and will be the only opportunity to do so. All other registration methods (e.g. KeyedStateBackend#getOrCreateKeyedState) will be deprecated and eventually removed.

Note that OperatorStateDescriptor would be a new class extending StateDescriptor, which should wrap operator-state specific information such as the redistribution scheme. Previously, this information was not carried by operator state descriptors.

(2.a) Runtime access to registered states via state ID only

State access for both operator and keyed state will be provided through the RuntimeContext by exposing the KeyedStateStore and OperatorStateStore:

public interface RuntimeContext {
    ...

    KeyedStateStore getKeyedStateStore();
    OperatorStateStore getOperatorStateStore();

    ...
}

The state access methods on the state stores would be:

State access methods for KeyedStateStore:

public interface KeyedStateStore {
    <T> ValueState<T> getValueState(String stateId);
    <T> ListState<T> getListState(String stateId);
    <T> ReducingState<T> getReducingState(String stateId);
    <K,V> MapState<K,V> getMapState(String stateId);
}

State access methods for OperatorStateStore:

public interface OperatorStateStore {
    <T> ListState<T> getPartitionableListState(String stateId);
}

The new methods take only the target state's ID as argument, to make it explicitly clear that the state is only accessible if it had been registered with the API described in (1.a).

All other state access / registration methods in RuntimeContext will be deprecated and eventually removed.

(2) Syntactic sugar via annotations

Besides the programmatic state declaration API described in (1), we also propose to provide a higher level, easy-to-use syntactic sugar for eager state declaration via Java annotations.

While the programmatic API is required for layered frameworks on top of Flink (such as Apache Beam) to register state, this annotation-based API is the targeted interface that Flink users should program against, and is intended to keep boilerplate state descriptor instantiation / state registration / state access code to the minimum for cleaner and more intuitive user application code.

Annotation for keyed state:

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface KeyedState {
    String stateId();
    String queryableStateName() default "";
    Class<? extends TypeInfoFactory> typeInfoFactory() default NullTypeInfoFactory.class;
    Class<? extends TypeSerializerFactory> typeSerializerFactory() default NullTypeSerializerFactory.class;
    Class<? extends Function> function() default NullFunction.class
}

Internally, a StateDescriptor will be built using the fields. Apart from the stateId, all other fields are optional:

  • queryableStateName: queryable state handle. If provided, the state would be queryable.
  • typeInfoFactory: corresponds to providing a custom TypeInformation in StateDescriptors.
  • typeSerializerFactory: corresponds to providing a custom TypeSerializer in StateDescriptors.
  • if none of typeInfoFactory or typeSerializerFactory is provided, the state type will be extracted using Java reflection and analyzed using Flink's type stack.
  • function: any functions that needs to be associated with the state, e.g. a ReduceFunction for ReducingState.

Annotation for operator state:

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface OperatorState {
    String stateId();
    OperatorStateHandle.Mode redistributeMode() default OperatorStateHandle.SPLIT_REDISTRIBUTE;
    Class<? extends TypeInfoFactory> typeInfoFactory() default NullTypeInfoFactory.class;
    Class<? extends TypeSerializerFactory> typeSerializerFactory() default NullTypeSerializerFactory.class;
}

Internally, a OperatorStateDescriptor will be built using the fields. Apart from the stateId, all other fields are optional:

  • redistributeMode: the redistribution scheme of this operator list state. For the initial version, we would not allow changing the used mode for previous state. Ideally, this can be changed freely depending on what redistribution scheme the user wishes to use for the current restore.


These annotations can be used at the operator-level or in rich UDFs. Briefly speaking, users' custom operators / UDFs should declare fields typed with the user-facing State types, and annotate them as either KeyedState or OperatorState.

Information provided through annotation fields subsume the need to provide a state descriptor, making the use of StateDescriptors specific to the programmatic API only.

Example usage:

Declaring managed keyed state:

@KeyedState(
    stateId = "myPojoListStateId",
    queryableStateName = "myPojoListStateQueryHandle", // optional
    typeSerializerFactory = MyPojoSerializerFactory.class // optional
)
private ListState<MyPojo> pojoListState;

Declaring managed operator state:

@OperatorState(
    stateId = "myPojoListStateId",
    redistributeMode = OperatorStateHandle.Mode.BROADCAST // optional
)
private ListState<MyPojo> pojoOperatorState;

When a transformation is added to the stream graph, a StateDeclarationExtractor is used to extract the declared keyed / operator states using state annotations, returning a Tuple2<Collection<StateDescriptor<?,?>>, Collection<OperatorStateDescriptor<?,?>>> (tuple of all keyed state descriptors and all operator state descriptors). The extracted state descriptors would then be registered at the operator / rich UDF using the programmatic APIs, and from there follows the same code paths as described in (1).

Annotated state fields will also be automatically initialized during the StreamOperator#initializeState phase using Java reflection. The user is not required to access state via the RuntimeContext if the state was declared using state annotations. This should also enhance the perceived user experience, such that programming against Flink managed state is similar to manipulating a local variable.

(3) Deprecate existing lazy state declaration interfaces

All existing lazy state declaration interfaces will be deprecated, and eventually removed (Flink version for removal TBD). This includes:

  • get*State methods on RuntimeContext.
  • All existing get*State methods on KeyedStateStore and OperatorStateStore.
  • getKeyedStateStore and getOperatorStateStore on FunctionInitializationContext, as this will be subsumed by exposing them through the RuntimeContext.

New state management functionalities (e.g. state migration / schema evolution) will only be available for eagerly registered state.

Compatibility, Deprecation, and Migration Plan

  • Existing API is not broken or removed, only deprecated and discouraged.
  • Users are strongly encouraged to migrate to using eager state declaration. State will not be lost as long as the same state IDs are used.
  • New state management functionalities (e.g. state migration / schema evolution) will only be available for eagerly registered state using the new API.
  • Legacy dynamic state declaration APIs are planned to be removed in future Flink versions (TBD).

Test Plan

All code in Flink that use managed state should be switched to the new API. This should ensure that using the new API does not result in different state behaviours.

Apart from this, at a minimum new integration tests should include:

  • registering states after job submission should fail, with meaningful messages
  • accessing non-registered state IDs should fail, with meaningful messages
  • state backends are instantiated with eagerly declared states' descriptors
  • state declarations are visible and correct at the JobManager

Rejected Alternatives

None so far.