DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-424 introduced a new State API set which enables the async state query, and FLIP-425 introduced the async state processing. These are highly anticipated features for Flink 2.0. For SQL users, the FLIP-473 implements SQL operators using the new State APIs and they could easily make use of the async state processing as well as the disaggregated state management. However, it is also important that the Datastream users could leverage the capability of async state query and disaggregated state management. This FLIP proposes to expose and enable them in Datastream(V1) APIs.
Public Interfaces
This FLIP plans to add a new method in `KeyedStream`, `SingleOutputStreamOperator` and, or any other related DS API classes, allowing users to choose to enable the async processing and use state v2 APIs. `WindowedStream`
@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
/**
* Enable async state processing, which allows to perform state access asynchronously via State
* API V2 under {@link org.apache.flink.api.common.state.v2}. If enabling the async state, all
* the created state from {@link org.apache.flink.api.common.functions.RuntimeContext} should be
* State V2.
*/
@Experimental KeyedStream<T, KEY> enableAsyncState();
}
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
/**
* Enable async state processing for previous transformation,
* which allows to perform state access asynchronously via State
* API V2 under {@link org.apache.flink.api.common.state.v2}. If enabling the async state, all
* the created state from {@link org.apache.flink.api.common.functions.RuntimeContext} should be
* State V2.
*/
@Experimental SingleOutputStreamOperator<T> enableAsyncState();
}
Here's an example showing how users enable async state in their DS jobs:
// Example 1: text is source stream
text.flatMap(new Tokenizer())
.name("tokenizer")
.keyBy(value -> value.f0)
.enableAsyncState()
.sum(1)
// or .enableAsyncState() after sum here
.name("counter");
// Example 2:
env.fromData(1, 2)
.keyBy(Integer::intValue)
.process(someFunction)
.enableAsyncState();
If the `enableAsyncState()` is not invoked, the behavior won't change and user could only use the state v1 APIs. Note that `enableAsyncState()` can only be invoked under keyed context (that is, after a `keyBy()`), otherwise an error will be thrown.
Besides, the `RuntimeContext` will provide a series of state creating methods that conforms to the State V2 APIs, like:
@Public
public interface RuntimeContext {
@Experimental
<T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
org.apache.flink.runtime.state.v2.ValueStateDescriptor<T> stateProperties);
@Experimental
<T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties);
@Experimental
<T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties);
@Experimental
<IN, ACC, OUT> org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState(
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
@Experimental
<UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties);
}
Note: All the new interfaces will be annotated as `@Experimental` until the new state APIs get promoted.
Proposed Changes
The async state processing and the original code path differs in the operator level. So in the implementation of `KeyedStream` or any other stream interfaces, there will be a conditional branch to choose the new introduced async operators according to the user input.
Compatibility, Deprecation, and Migration Plan
No impact to existing users and code.
Test Plan
New IT cases will be introduced. Some existing IT cases for Datastream APIs or jobs will be parameterized to verify the new async code path.
Rejected Alternatives
Introduce a new AsyncKeyedStream class...
...with all the new implementation goes in it. A KeyedStream can perform `async()` to transform to a `AsyncKeyedStream`. That means we should rewrite the whole `AsyncKeyedStream` class and introduce any necessary derived classes like `AsyncWindowedStream`. The effort is huge and it is not good for further maintenance.
Introduce a series new methods by adding 'async' to name...
..., such as `asyncReduce`, `asyncSum` and `asyncProcess`. Well it seems ok with those methods, but we'd prefer to convey that the state can be accessed asynchronously instead of the whole record processing run in parallel to the users. In contrast, introducing a parameter or interface with name 'enableAsyncState' is more proper.
Introduce new param `enableAsyncState` in `sum` or new function `enableAsyncState` in `RichFunction`
We'd prefer a chain-style API, which is more consistent with the current streaming API style.