Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know the whole story.

Motivation

FLIP-423[1] introduced the disaggregated state management and the FLIP-425[2] introduced the new execution model of asynchronous state access in an event-driven way. This model has the potential to significantly boost performance by leveraging parallel I/O operations. However, it does lead to increased draining times during checkpoints, presenting a trade-off between system throughput and checkpoint synchronization delay. This balance can be calibrated through adjusting the buffer size. As a follow-up FLIP for FLIP-425, this FLIP proposes a faster way of checkpoint by snapshot state requests that are waiting in the buffer of "Asynchronous Execution Controller (AEC)" as part of the checkpoint. By this approach, we expect only a great optimization for the draining time overhead compared with the original plan in FLIP-425, especially under a high back-pressure scenario. To achieve the snapshot of state requests, the callbacks from user should be persisted across job attempts. This FLIP introduces a novel approach for declaring element processing where all callbacks are re-declared and bound to the corresponding previous state requests during the operator's initialization phase. This ensures that the entire pipeline can be accurately restored and operations can resume smoothly after a job restart.


For ease of understanding, the FLIP covers three key aspects:
1. (Public API) Additional APIs for conditional branchings under `StateFuture` —— Under "Public Interfaces" section
2. (Internal API) A new API set for declaring and defining the record processing —— First half of "Proposed Changes"
3. (Implementation) Method to snapshot the in-flight state requests —— Second half of "Proposed Changes"

Public Interfaces

Enhanced Condition declaration

During our PoC testing of the async states, we found it would be better if providing APIs for the conditional branchings. It is very useful when writing jobs in chain-style. Thus we plan to provide following ones to the original API set in FLIP-424:

    /**
     * Apply a condition test on the result of this StateFuture, and try to perform one action out
     * of two based on the result. Gather the results of the condition test and the selected action
     * into a StateFuture of tuple. The relationship between the action result and the returned new
     * StateFuture are just like the {@link #thenCompose(FunctionWithException)}.
     *
     * @param condition the condition test.
     * @param actionIfTrue the function to apply if the condition returns true.
     * @param actionIfFalse the function to apply if the condition returns false.
     * @param <U> the type of the output from actionIfTrue.
     * @param <V> the type of the output from actionIfFalse.
     * @return the new StateFuture with the result of condition test, and result of action.
     */
    <U, V> StateFuture<Tuple2<Boolean, Object>> thenConditionallyCompose(
            FunctionWithException<? super T, Boolean, ? extends Exception> condition,
            FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception>
                    actionIfTrue,
            FunctionWithException<? super T, ? extends StateFuture<V>, ? extends Exception>
                    actionIfFalse);

    /**
     * Apply a condition test on the result of this StateFuture, and try to perform the action if
     * test result is true. Gather the results of the condition test and the action (if applied)
     * into a StateFuture of tuple. The relationship between the action result and the returned new
     * StateFuture are just like the {@link #thenCompose(FunctionWithException)}
     * (FunctionWithException)}.
     *
     * @param condition the condition test.
     * @param actionIfTrue the function to apply if the condition returns true.
     * @param <U> the type of the output from actionIfTrue.
     * @return the new StateFuture with the result of condition test, and result of action.
     */
    <U> StateFuture<Tuple2<Boolean, U>> thenConditionallyCompose(
            FunctionWithException<? super T, Boolean, ? extends Exception> condition,
            FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception>
                    actionIfTrue);

The thenConditionallyAccept and thenConditionallyApply will be added as well.


Besides these above, there is no further change on Public APIs. In next section, some new interfaces will be added to declare async state processing on operators, which are only for internal usage (e.g. SQL operator developing).

Proposed Changes

The core change lies in how to do checkpoint and restore on state requests. We cannot checkpoint the requests that are already in progress, otherwise there will be inconsistency on state. The requests stay in the buffer of AEC could be safely snapshotted. A state request consists of five following parts:

  • The target state
  • Request description: the request type and input
  • The stream record and corresponding key group
  • The callback of this request
  • Local variables for processing each record

Serializing the request description and the stream record is a straightforward task. The state itself can easily be retrieved in the new job by its name, so only the name should be serialized and stored in checkpoint. For the callback from user, we do NOT plan to serialize the anonymous callback class or lambda itself, as this may bring in more problems (as described in previous discussion[3]):

  • Not compatible across different JREs
  • Problems when updating dependencies of the code inside the lambdas, if the updated dependencies are not binary compatible.
  • No way to fix bugs inside the lambdas - users might get stuck in an unrecoverable state

Consequently, we propose that users redeclare their lambda expressions upon each job restart and re-associate them with the requests from previous checkpoints. When redeclaring a lambda, users would provide a unique name or identifier for it; the checkpoint snapshot would record this identifier, which would then be used to match with the newly declared lambda in the new job.

Also, the local variables for each record need a serializer to perform checkpoint, we propose a way to declare them as well.

Internal Interfaces for Process Declaration

To enable users declare the lambdas before the task run, this FLIP proposes to introduce a declareProcess alongside the original processElement, if any Input or Operator wants to declare the callbacks upfront, it should implements the corresponding interface. If so, the processElement will not take effect and be overridden by the result of declareProcess. The Interface definition:

interface for one input
 /** Input that can declare the processing by a predefined function. */
@Internal
public interface DeclarativeProcessingInput<IN> extends Input<IN> {

/**
* A hook for declaring the process in {@code processElement}. If subclass wants to define its
* {@code processElement} in declarative way, it should implement this class. If so, the {@code
* processElement} will not take effect and instead, the return value of this method will become
* the processing logic. This method will be called after {@code open()} of operator.
*
* @param context the context that provides useful methods to define named callbacks.
* @return the whole processing logic just like {@code processElement}.
*/
ThrowingConsumer<StreamRecord<IN>, Exception> declareProcess(DeclarationContext context) throws DeclarationException;

}
Interface for operator with two inputs
/**
* Operator with two inputs that can declare the processing of each input by a predefined function.
*/
@Internal
public interface DeclarativeProcessingTwoInputOperator<IN1, IN2, OUT> extends TwoInputStreamOperator<IN1, IN2, OUT> {

/**
* A hook for declaring the process in {@code processElement1}. If subclass wants to define its
* {@code processElement1} in declarative way, it should implement this class. If so, the {@code
* processElement1} will not take effect and instead, the return value of this method will
* become the processing logic. This method will be called after {@code open()} of operator.
*
* @param context the context that provides useful methods to define named callbacks.
* @return the whole processing logic just like {@code processElement}.
*/
ThrowingConsumer<StreamRecord<IN1>, Exception> declareProcess1(DeclarationContext context) throws DeclarationException;

/**
* A hook for declaring the process in {@code processElement2}. If subclass wants to define its
* {@code processElement2} in declarative way, it should implement this class. If so, the {@code
* processElement2} will not take effect and instead, the return value of this method will
* become the processing logic. This method will be called after {@code open()} of operator.
*
* @param context the context that provides useful methods to define named callbacks.
* @return the whole processing logic just like {@code processElement}.
*/
ThrowingConsumer<StreamRecord<IN2>, Exception> declareProcess2(DeclarationContext context) throws DeclarationException;
}

The return value of those interfaces match the type of processElement

Triggerable Declaration

There is an existing interface Triggerable  defining the behavior when timer is fired. We also provide corresponding methods to declare this process as follows:

@Internal
public interface DeclarativeTriggerable<K, N> {

    /** Declare a process which will be invoked when an event-time timer fires. */
    ThrowingConsumer<InternalTimer<K, N>, Exception> declareOnEventTime(DeclarationContext context) throws DeclarationException;

    /** Declare a process which will be invoked when a processing-time timer fires. */
    ThrowingConsumer<InternalTimer<K, N>, Exception> declareOnProcessingTime(DeclarationContext context) throws DeclarationException;

}

DeclarationContext

The new introduced DeclarationContext is the entry for users to declare processing. It provide the following methods (Implementation omitted):

DeclarationContext
public class DeclarationContext {

    // ------------- Declaring Callback part ----------------

    /** Declare a callback with a name. */
    public <T> NamedConsumer<T> declare(
            String name, ThrowingConsumer<T, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with a name. */
    public <T, V> NamedFunction<T, V> declare(
            String name, FunctionWithException<T, V, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with a name. */
    public <T, U, V> NamedBiFunction<T, U, V> declare(
            String name, BiFunctionWithException<T, U, V, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with an automatically assigned name. */
    public <T> NamedConsumer<T> declare(ThrowingConsumer<T, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with an automatically assigned name. */
    public <T, V> NamedFunction<T, V> declare(
            FunctionWithException<T, V, ? extends Exception> callback) throws DeclarationException {
    }

    /** Declare a callback with an automatically assigned name. */
    public <T, U, V> NamedBiFunction<T, U, V> declare(
            BiFunctionWithException<T, U, V, ? extends Exception> callback)
            throws DeclarationException {
    }

    // ----------- End of Declaring Callback part ------------

    // ----------- Declare callback in chain-style part ------------
    /**
     * Declaring a processing chain.
     * @param first the first code block
     * @return the chain itself.
     * @param <IN> the in type of the first block
     * @param <T> the out type of the state future given by the first block
     */
    public <IN, T> DeclarationChain<IN>.DeclarationStage<T> declareChain(
            FunctionWithException<IN, StateFuture<T>, Exception> first)
            throws DeclarationException {
    }

    /**
     * Declaring a processing chain starting from a given record.
     *
     * @return the chain itself.
     * @param <IN> the in type of starting record
     */
    public <IN> DeclarationChain<IN>.DeclarationStage<IN> withRecord() throws DeclarationException {
        return new DeclarationChain<IN>(this, StateFutureUtils::completedFuture).firstStage();
    }  

    // ----------- End of Declare callback in chain-style part ------------

    // ----------- Declare variables part ------------ 

    /**
     * Declare a variable used across the callbacks.
     * @param type the type information of the variable
     * @param name the unique name of this variable
     * @param initialValue the initial value when the variable created.
     * @return the variable itself that can used by lambdas.
     * @param <T> the variable type.
     */
    public <T> DeclaredVariable<T> declareVariable(TypeInformation<T> type, String name, Supplier<T> initialValue) throws DeclarationException {
    }

    // ----------- End of Declare variables part ------------  
}

Declare Callbacks

And example usage when an operator extends and implements the declareProcess :

Example operator
// Under customized operator class, the INPUT = Tuple2<Integer, String>>  @Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {

    NamedFunction<Integer, StateFuture<Integer>> adder =
            context.declare(
                    "counter",    // can be omitted with an auto-assigned name
                    (v) -> {
                        int updated = v == null ? 1 : (v + 1);
                        return FutureUtils.wrapWithAnotherResult(state.asyncUpdate(updated), updated);
                    });
    NamedConsumer<Integer> output =
            context.declare(
                    "output",     // can be omitted with an auto-assigned name
                    (v) -> {
                        context.getCollector().collect(v);
                    });
    return (e) -> {
        state.asyncGet().thenCompose(adder).thenAccept(output);
    };
}

Chain-style declaration

The new introduced DeclarationContext will provide a chain-style of way to declare a series of chained `thenCompose`, `thenAccept` and `thenApply`, which is the most common use case for async processing. With this, processing can be declared similarly as before in `processElement`.

Chain-style declaration
@Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {

    return context.<StreamRecord<Tuple2<Integer, String>>, Integer>declareChain(e -> state.asyncGet())
            .thenCompose(v -> {
                int updated = v == null ? 1 : (v + 1);
                return FutureUtils.wrapWithAnotherResult(state.asyncUpdate(updated), updated);
            })
            .withName("counter")   // can be omitted with an auto-assigned name
            .thenAccept(v -> context.getCollector().collect(v))
            .withName("output")   // can be omitted with an auto-assigned name
            .finish();
}     

The declared chain can also used to define a parallel processing by assembling chains in the final consumer:

Chain-style declaration with parallel processing
@Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {
    ThrowingConsumer<Integer, Exception> chain1 = context.<Integer, Integer>declareChain(c -> {
                return state1.asyncGet();
            }).thenCompose(v -> {
                int updated = v == null ? 1 : (v + 1);
                return FutureUtils.wrapWithAnotherResult(state1.asyncUpdate(updated), updated);
            })
            .withName("adder1")
            .thenAccept(v -> context.getCollector().collect(v))
            .withName("collector")
            .finish();

    ThrowingConsumer<Integer, Exception> chain2 = context.<Integer, Integer>declareChain(c -> {
                return state2.asyncGet();
            }).thenCompose(v -> {
                int updated = v == null ? 1 : (v + 1);
                return FutureUtils.wrapWithAnotherResult(state2.asyncUpdate(updated), updated);
            })
            .withName("adder2")
            .finish();

    return (e) -> {
        chain1.accept(e.getValue().f0);
        chain2.accept(e.getValue().f0);
    };
}

Declare Variables

There might be some local variables under processElement which can be accessed across the processing procedure. In async state processing, it could be accessed by multiple callbacks. While the state requests can be materialized in checkpoint, those variables should also be persisted. The DeclarationContext provides a method named declareVariable which allows for declaring a variable as part of the RecordContext. The declared variables could be safely accessed in callback lambdas, and could be persisted in checkpoint for each in-flight record.

An example usage:

Declare Variables Example
@Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {
    DeclaredVariable<Integer> updated = context.declareVariable(Types.INT, "updated", () -> 0);
    return context.<StreamRecord<Tuple2<Integer, String>>, Integer>declareChain(
                    e -> {
                        return state.asyncGet();
                    })
            .thenCompose(v -> {
                updated.set(v == null ? 1 : (v + 1));
                return state.asyncUpdate(updated.get());
            })
            .withName("adder")
            .thenAccept(v -> context.getCollector().collect(updated.get()))
            .withName("collector")
            .finish();
}

Note: Only the variables accessed across the callbacks should be declared. The local variables under a single callback lambda need not to be declared. There is also no need for global variables that do not contain any single-record-related state to be declared.

Complex Example in Chain-style

A more complex example for a job processing would be (You can compare this with the example in FLIP-424):

private final ValueState<RowData> aState;
 
private final MapState<RowData, Tuple2<RowData, Integer>> bState;
 
private final KeySelector<RowData, RowData> uniqueKeySelector;        

@Override
public ThrowingConsumer<StreamRecord<RowData>, Exception> declareProcess(
        DeclarationContext context) throws DeclarationException {
    return context.<StreamRecord<RowData>>withRecord().thenConditionallyAccept(
            (element) -> {
                return RowDataUtil.isAccumulateMsg(element.getValue());
            },
            // True branch
            context.<Boolean>withRecord()
                    .thenApply(e -> bState.asyncValues())
                    .thenCompose((iterator) -> {
                        return iterator.onHasNext((value) -> {
                            // value is Tuple2<RowData, Integer>
                            if (joinCondition.apply(input, value.f0)) {
                                output(getRecord().getValue(), value.f0);
                                RowData key = uniqueKeySelector.getKey(value.f0);
                                return bState.asyncPut(
                                        key,
                                        Tuple2.of(value.f0, value.f1 + 1));
                            } else {
                                return FutureUtils.emptyFuture();
                            }
                        });
                    }).thenAccept((e) -> {
                        return aState.asyncUpdate(getRecord().getValue());
                    })
            // Omit false branch
    );
}

PoC

A simple PoC (https://github.com/apache/flink/pull/24719) has been implemented for DeclarationContext as well as the APIs.

Limitations

To enable the ability of checkpoint the in-flight requests, the users (SQL developers) should:

  • Try to use named callbacks everywhere. —— If there are some anonymous callbacks, the checkpoint will not proceed until all anonymous callbacks and corresponding requests finish. See next section for more details.
  • Try to use declared variables. —— If other local variables captured in any named callbacks, they cannot be properly serialized into the checkpoint.

The framework will perform a auto detection, if there is any un-declared callbacks and variables, the corresponding lambda(with StateFuture) or the whole pipeline will disable this enhanced in-flight checkpointing.

Checkpoint the in-flight state requests

Not every state request within the AEC is suitable for serialization and inclusion in a checkpoint. A request must fulfill the following criteria to be eligible for checkpoint inclusion:

  1. The request should not be in progress when the checkpoint is triggered.
  2. The request's callback must be a named one. And the whole pipeline does not reference(capture) any un-declared local variables for record processing. 
  3. The request's callback should depend on a single request rather than being associated with multiple requests (for example, not designed for operations like thenCombine() or StateFutureUtils.combineAll() or any iteration).

The rationale for condition 3 arises from the complexity involved in tracking dependencies, orchestrating the linkage of several requests to a single callback instance during recovery, and managing partially completed results of multiple requests. Upon triggering a checkpoint, the AEC will stop firing any requests that satisfy conditions 2 and 3. All existing requests that do not meet these conditions will continue to be processed and drained before the checkpoint is executed. Figure 1 illustrates the overview of the AEC and state requests at the moment of checkpoint arrival.

Fig.1 the overview of AEC and requests at the time of checkpoint arrival

Compared with original plan of draining all existing state requests in AEC, if all the user's callbacks are named ones, current plan will hold most of requests in checkpoint, which greatly accelerate the draining process.

Configuration

We plan to make the state request checkpoint as a part of unaligned checkpoint, as they are similar in semantics of checkpointing in-flight data. When unaligned checkpoint enabled, the in-flight state requests can be included in checkpoint, while otherwise, the checkpoint proceed in original way (drain before snapshot). Ideally most of the SQL operators with async state processing will be on top of the declarative APIs, which provides the possibility of including state requests in unaligned checkpoints.

Whether to introduce a separate option apart from unaligned checkpoint or enable this by default could be discussed later if we meet the concrete users' need of doing so.

Trade-off

As a price for speeding up checkpointing, the size of checkpoint would be enlarged more or less. It depends on the amount of in-flight requests and the average length of users' record as well as the registered variables. We expect this to be the similar size as the unaligned checkpoint, which is acceptable in most production env.

Recovery & Rescaling

The checkpoint of state requests is organized in key-group, for the convenience of rescaling. During recovery, all the related state requests will be restored into the AEC. After the declaration of named callbacks, they will be re-associated with the requests in AEC. Afterwards the whole pipeline will proceed.

Compatibility, Deprecation, and Migration Plan

Since the async execution model (FLIP-425) is not released yet, no compatibility issues need to be considered.

The processElement still takes effect when user does not implement the declareProcess. User could restore their job written by declareProcess from the checkpoint of processElement version, but not the other way around.

Reject Alternatives

  • Serialize the anonymous callback class or lambda itself in checkpoint.


[1] https://cwiki.apache.org/confluence/x/R4p3EQ

[2] https://cwiki.apache.org/confluence/x/S4p3EQ

[3] https://lists.apache.org/thread/4gxj5wb7tpjd8bhfsx8q8t6k09x25fgn