Background

State is the foundation of stateful computation. But at the API level, the main concern for users is how to create and use a state in process function. From this point, the current DataStream V1's support for state mainly faces the following issues:

  1. The bridge for user applications to access state is StateDescriptor, but StateDescriptor is not an interface. It and its subclasses are filled with a large amount of implementation related codes.
  2. With the current state registration interface, managed state (both operator and keyed state) can be lazily declared any time during processing. This poses problems in several dimensions for Flink’s state management capabilities. For example: JobManager lacks information about state. More details are discussed in FLIP-22[1].

We will discuss how to better declare and access state on DataStream API V2 and solve the above problems in this FLIP.

Proposed Changes

DataStream API V2 supports declaring state in Process Function and accessing and updating state during data processing. In general, we follow the principle of "declare first, use later." First of all, we define a StateDeclaration interface to represent the declaration of state. Next, you'll see how to declare and use state in Process Function. Finally, we'll discuss the legality of state usage. In addition, there are some state-related classes that need to be moved to the flink-core-api  module.

Before giving specific changes, let's look at what a stateful process function looks like:

StatefulFunction.java
private static class StatefulFunction implements OneInputStreamProcessFunction<Long, Long> {
    static final StateDeclaration.ListStateDeclaration<Long> LIST_STATE_DECLARATION =
            StateDeclarations.listStateBuilder("example-list-state", Types.LONG).redistributeBy(RedistributionStrategy.SPLIT).build();
    
	@Override
    public Set<StateDeclaration> usesStates() {
        return Collections.singleton(LIST_STATE_DECLARATION);
    } 

    @Override
    public void processRecord(Long record, Collector<Long> output, RuntimeContext ctx)
            throws Exception {
        Optional<ListState<Long>> stateOptional =
                ctx.getStateManager().getState(LIST_STATE_DECLARATION);
        ListState<Long> state = stateOptional.get();
        // do something with this state. For example, update the state by this record.
        state.update(Collections.singletonList(record));
    }
}

Introduce StateDeclaration to Declare States

In response to the first issue raised in the background, we extracted an interface called StateDeclaration  to declare a specific state. 

We believe that two types of information need to be provided for a state declaration:

  1. Name: Used as the unique identifier of the state, which also corresponds to the name attribute of the underlying StateDescriptor .
  2. RedistributionMode: Defines how the state is redistributed between different partitions. For the keyed partition stream, since the state is bounded within the partition, no redistribution is required. But for the non-keyed partition stream, the partition will change with the parallelism, so the state must define how to redistribute.

The overall interface of StateDeclaration  are as follows. For each type of state, a corresponding sub-interface is provided.

StateDeclaration.java
/** {@link StateDeclaration} represents a declaration of the specific state used. */
public interface StateDeclaration extends Serializable {

    /** Get the name of this state. */
    String getName();

    /**
     * Get the {@link RedistributionMode} of this state. More details see the doc of {@link
     * RedistributionMode}.
     */
    RedistributionMode getRedistributionMode();

    /**
     * {@link RedistributionMode} is used to indicate whether this state supports redistribution
     * between partitions and how to redistribute this state during rescaling.
     */
    enum RedistributionMode {
        /**
         * Not supports redistribution.
         *
         * <p>For example : KeyedState is bind to a specific keyGroup, so it is can't support
         * redistribution between partitions.
         */
        NONE,

        /**
         * This state can be safely redistributed between different partitions, and the specific
         * redistribution strategy is determined by the state itself.
         *
         * <p>For example: ListState's redistribution algorithm is determined by {@link
         * ListStateDeclaration.RedistributionStrategy}.
         */
        REDISTRIBUTABLE,

        /** States are guranteed to be identical in different partitions, thus redistribution is not a problem. */
        IDENTICAL
    }
}

/** This represents a declaration of the value state. */
public interface ValueStateDeclaration<T> extends StateDeclaration {
    /** Get type descriptor of this value state. */
    TypeDescriptor<T> getTypeDescriptor();
}

/** This represents a declaration of the list state. */
public interface ListStateDeclaration<T> extends StateDeclaration {
    /**
     * Get the {@link RedistributionStrategy} of this list state.
     *
     * @return the redistribution strategy of this list state.
     */
    RedistributionStrategy getRedistributionStrategy();

    /**
     * {@link RedistributionStrategy} is used to guide the assignment of states during rescaling.
     */
    enum RedistributionStrategy {
        /**
         * The whole state is logically a concatenation of all lists. On restore/redistribution, the
         * list is evenly divided into as many sub-lists as there are parallel operators. Each
         * operator gets a sub-list, which can be empty, or contain one or more elements.
         */
        SPLIT,
        /**
         * The whole state is logically a concatenation of all lists. On restore/redistribution,
         * each operator gets the complete list of state elements.
         */
        UNION
    }

    /** Get type descriptor of this list state's element. */
    TypeDescriptor<T> getTypeDescriptor();
}

/** This represents a declaration of the map state. */
public interface MapStateDeclaration<K, V> extends StateDeclaration {
    /** Get type descriptor of this map state's key. */
    TypeDescriptor<K> getKeyTypeDescriptor();

    /** Get type descriptor of this map state's value. */
    TypeDescriptor<V> getValueTypeDescriptor();
}

/** This represents a declaration of the reducing state. */
public interface ReducingStateDeclaration<T> extends StateDeclaration {
    /** Get type descriptor of this reducing state. */
    TypeDescriptor<T> getTypeDescriptor();
}

/** This represents a declaration of the aggregating state. */
public interface AggregatingStateDeclaration<IN, ACC, OUT> extends StateDeclaration {
    /** Get type descriptor of this aggregating state's input. */
    TypeDescriptor<IN> getInTypeDescriptor();

    /** Get type descriptor of this aggregating state's output. */
    TypeDescriptor<OUT> getOutTypeDescriptor();

    /** Get the aggregate function of this state. */
    AggregateFunction<IN, ACC, OUT> getAggregateFunction();
}

/** This represents a declaration of the broadcast state. */
public interface BroadcastStateDeclaration<K, V> extends StateDeclaration {
    /** Get type descriptor of this broadcast state's key. */
    TypeDescriptor<K> getKeyTypeDescriptor();

    /** Get type descriptor of this broadcast state's value. */
    TypeDescriptor<V> getValueTypeDescriptor();
}

For ListState, it has two redistribution strategies: SPLIT and Union, which respectively indicate whether to redistribute state evenly according to the parallelism or all state can be broadcast to each parallelism when rescaling.

For ease of use, we will provide an auxiliary class called StateDeclarations, which encapsulates a series of methods for creating various StateDeclaration  instances. We adopt the builder pattern to build StateDeclaration. For the list states, the redistribution strategy can be set via redistributeBy. For other types of state, this also makes it easy to extend some options like withStateTTL in the future. But for convenience, we also provide some common shortcuts to avoid lengthy chains of calls.

Note: The implementation of these methods is omitted, only their signatures are shown.

StateDeclarations.java
/** This is a helper class for declaring various states. */
public class StateDeclarations {
    /** Get the builder of {@link ListStateDeclaration}. */
    public static <T> ListStateDeclarationBuilder<T> listStateBuilder(
            String name, TypeDescriptor<T> elementTypeDescriptor)

    /** Get the builder of {@link MapStateDeclaration}. */
    public static <K, V> MapStateDeclarationBuilder<K, V> mapStateBuilder(
            String name,
            TypeDescriptor<K> keyTypeDescriptor,
            TypeDescriptor<V> valueTypeDescriptor)

    /** Get the builder of {@link ValueStateDeclaration}. */
    public <T> ValueStateDeclarationBuilder<T> valueStateBuilder(
            String name, TypeDescriptor<T> valueType)

    /** Get the builder of {@link ReducingStateDeclaration}. */
    public <T> ReducingStateDeclarationBuilder<T> reducingStateBuilder(
            String name, TypeDescriptor<T> typeDescriptor)

    /** Get the builder of {@link AggregatingStateDeclaration}. */
    public <IN, OUT, ACC> AggregatingStateDeclarationBuilder<IN, OUT, ACC> aggregatingStateBuilder(
            String name, AggregateFunction<IN, ACC, OUT> aggregateFunction)

    /**
     * Get the {@link ListStateDeclaration} of list state with {@link RedistributionMode#NONE}. If
     * you want to configure it more elaborately, use {@link #listStateBuilder(String,
     * TypeDescriptor)}.
     */
    public static <T> ListStateDeclaration<T> listState(
            String name, TypeDescriptor<T> elementTypeDescriptor)

    /**
     * Get the {@link MapStateDeclaration} of map state with {@link RedistributionMode#NONE}. If you
     * want to configure it more elaborately, use {@link #mapStateBuilder(String, TypeDescriptor,
     * TypeDescriptor)}.
     */
    public static <K, V> MapStateDeclaration<K, V> mapState(
            String name,
            TypeDescriptor<K> keyTypeDescriptor,
            TypeDescriptor<V> valueTypeDescriptor)      
    
    /**
     * Get the {@link ValueStateDeclaration} of value state. If you want to configure it more
     * elaborately, use {@link #valueStateBuilder(String, TypeDescriptor)}.
     */
    public static <T> ValueStateDeclaration<T> valueState(String name, TypeDescriptor<T> valueType)

    /**
     * Get the {@link ReducingStateDeclaration} of list state. If you want to configure it more
     * elaborately, use {@link #reducingStateBuilder(String, TypeDescriptor)}.
     */
    public static <T> ReducingStateDeclaration<T> reducingState(
            String name, TypeDescriptor<T> typeDescriptor)

    /**
     * Get the {@link AggregatingStateDeclaration} of aggregating state. If you want to configure it
     * more elaborately, use {@link #aggregatingStateBuilder(String, AggregateFunction)}.
     */
    public static <IN, OUT, ACC> AggregatingStateDeclaration<IN, OUT, ACC> aggregatingState(
            String name, AggregateFunction<IN, ACC, OUT> aggregateFunction) 

    /** Builder for {@link ListStateDeclaration}. */
    public static class ListStateDeclarationBuilder<T> {

        ListStateDeclarationBuilder<T> redistributeBy(RedistributionStrategy strategy)

        ListStateDeclaration<T> build()
    }

    /** Builder for {@link MapStateDeclaration}. */
    public static class MapStateDeclarationBuilder<K, V> {
        MapStateDeclarationBuilder<K, V> broadcast()

        MapStateDeclaration<K, V> build()
    }

    /** Builder for {@link ValueStateDeclaration}. */
    public static class ValueStateDeclarationBuilder<T> {
        ValueStateDeclaration<T> build()
    }

    /** Builder for {@link ReducingStateDeclaration}. */
    public static class ReducingStateDeclarationBuilder<T> {
        ReducingStateDeclaration<T> build()
    }

    /** Builder for {@link AggregatingStateDeclaration}. */
    public static class AggregatingStateDeclarationBuilder<IN, OUT, ACC> {
        AggregatingStateDeclaration<IN, OUT, ACC> build()
    }
}

Declare State in Process Function

We will introduce the following method to the Process Function interface to declare all the states it can use:

usesStates method
/**
 * Explicitly declares states upfront.Each specific state must be declared in this method before
 * it can be used.
 *
 * @return all declared states used by this process function.
 */
default Set<StateDeclaration> usesStates() {
    return Collections.emptySet();
}

Regarding the second point in background, ProcessFunction follows the principle of declaring before using state. We will check during the compile time to organize some illegal state accesses in advance. In addition, we can passing the declaration of states to JobMaster. Therefore, it has more opportunities for further optimization.

Using State in Process Function

In order to use state in Process Function, we extend the StateManager interface introduced in FLIP-410[2] and add the following six methods. The return value of each getState method is an Optional. If you want to get a state that is not declared or has no access, an exception is returned.

extends methods to StateManager
    /**
     * Get the specific list state.
     *
     * @param stateDeclaration of this state.
     * @return the list state corresponds to the state declaration.
     */
    <T> Optional<ListState<T>> getState(ListStateDeclaration<T> stateDeclaration) throws Exception;

    /**
     * Get the specific value state.
     *
     * @param stateDeclaration of this state.
     * @return the value state corresponds to the state declaration.
     */
    <T> Optional<ValueState<T>> getState(ValueStateDeclaration<T> stateDeclaration)
            throws Exception;

    /**
     * Get the specific map state.
     *
     * @param stateDeclaration of this state.
     * @return the map state corresponds to the state declaration.
     */
    <K, V> Optional<MapState<K, V>> getState(MapStateDeclaration<K, V> stateDeclaration)
            throws Exception;

    /**
     * Get the specific reducing state.
     *
     * @param stateDeclaration of this state.
     * @return the reducing state corresponds to the state declaration.
     */
    <T> Optional<ReducingState<T>> getState(ReducingStateDeclaration<T> stateDeclaration)
            throws Exception;

    /**
     * Get the specific aggregating state.
     *
     * @param stateDeclaration of this state.
     * @return the aggregating state corresponds to the state declaration.
     */
    <IN, ACC, OUT> Optional<AggregatingState<IN, OUT>> getState(
            AggregatingStateDeclaration<IN, ACC, OUT> stateDeclaration) throws Exception;

    /**
     * Get the specific broadcast state.
     *
     * @param stateDeclaration of this state.
     * @return the broadcast state corresponds to the state declaration.
     */
    <K, V> Optional<BroadcastState<K, V>> getState(BroadcastStateDeclaration<K, V> stateDeclaration)
            throws Exception;

The Legitimacy of State Declaration and Access

In Process Function, not all state declarations and accesses are legal, depending on the specific type of input streams.

For OneInputStreamProcessFunction, the legality of state declaration and access are listed in the following table:


Stream Type

State RedistributionMode

NONE

REDISTRIBUTABLE

IDENTICAL

Global

(tick)

(tick)

Non-Keyed

(tick)

Keyed

(tick)

(tick)

Broadcast

  1. The state of None RedistributionMode can only be accessed over keyed stream or global stream.
  2. The reason why last line and column are not supported is that BroadcastStream can only be used as a side-input, and IDENTICAL state cannot be accessed inside OneInputStreamProcessFunction.

We'll check the legality of state declaration in advance. If an illegal state is declared in usesStates method, exception will be thrown when the job is compiled.
For the two inputs case, things get a little more complicated. Since different inputs may come from different type of streams, the legality of state access is also different for the data processing functions of each input edge.
The specific rules are shown in the following table:


(K, NK, G, B denote the shorthand for Keyed, Non-Keyed, Global, Broadcast Stream, respectively)

Inputs

processRecordFromFirstInput

processRecordFromSecondInput

NONE

REDISTRIBUTABLE

IDENTICAL

NONE

REDISTRIBUTABLE

IDENTICAL

K + K

(tick)

(tick)

(tick)

(tick)

NK + NK

(tick)

(tick)

G + G

(tick)

(tick)

B + K

(tick)

(tick)

(tick)

(tick)

(tick)

(tick)

Read-Only

B + NK

(tick)

(tick)

(tick)

(tick)

Read-Only

Note: For B + K / B + NK, the first input is assumed to be the broadcast input. The only way to access non-broadcast state on the broadcast side is through the applyToAllPartitions  method of NonPartitionedContext/TwoOutputNonPartitionedContext.

Similarly, the state declaration can be checked at compile time. That is, an exception is thrown early for states that are not supported by either stream. For things that can only be determined at runtime, such as if one of the inputs illegally accesses state that only the other input has access to, an exception will be returned.

We will move several state-related interfaces/classes that the V2 API depends on to the flink-core-api module.

The dependencies between them are shown in the figure below:


Since state is often bound to a specific type, we also propose to move TypeInformation related classes to flink-core-api. This includes:

Classification

Class Name

TypeInformation Related

TypeInfo

TypeInformation

TypeInfoFactory

TypeSerializer Related


TypeSerializer

TypeSerializerSchemaCompatibility

TypeSerializerSnapshot

Serialization Related

DataInputView

DataOutputView

Compatibility, Deprecation, and Migration Plan

The contents described in this FLIP are all new APIs and do not involve compatibility issues.

Test Plan

Unit tests and integration test(A simple stateful job) will be added to guard these changes.



[1]  FLIP-22: Eager State Declaration

[2] FLIP-410: Config, Context and Processing Timer Service of DataStream API V2



  • No labels