This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/k6x03x7vjtn3gl1vrknkx8zvyn319bk9
JIRA:
-
FLINK-36157Getting issue details...
STATUS
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-424[1] introduced an asynchronous state architecture to mitigate I/O bottlenecks in Flink by offloading state access to separate threads from the main thread in each task, thereby optimizing I/O and computational resource utilization.
Building upon this infrastructure, this FLIP proposes introducing new SQL operators based on these asynchronous state APIs (referred to as AsyncStateOperators, which correspond to the old operators called SyncStateOperators). Our goal is to enhance throughput and reduce latency in stateful operations by leveraging the new asynchronous state architecture in Flink SQL jobs.
Public Interfaces
Configuration Options
To enable these new AsyncStateOperators, a new configuration option will be introduced.
// default is "false" table.exec.async-state.enabled
When the configuration is set to `true`, all available AsyncStateOperators will replace the corresponding SyncStateOperators.
We plan to gradually adapt each existing SyncStateOperator to AsyncStateOperator over multiple Flink versions. This single configuration is used to uniformly manage their toggle.
Note: The existing configuration following also needs to be set to the corresponding state backend that supports async state when table.exec.async-state.enabled
is set to `true`.
// set a state backend that supports async state // like `org.apache.flink.state.forst.ForStStateBackendFactory` state.backend: ..
User-defined Aggregate Functions and Table Aggregate Functions
Currently, user can directly use the subclasses of the interface DataView
, such as MapView
and ListView
, in User-defined Aggregate Functions and Table Aggregate Functions without being aware of StateDataView
. However, the way state is accessed in StateDataView
has changed, and therefore the APIs for User-defined Aggregate Functions and Table Aggregate Functions will be adjusted accordingly. These details will be addressed in a separate FLIP introduced later.
Proposed Changes
This FLIP will not dig into detailed implementation specifics, but it will offer a brief overview of the implementation approach.
Phase of adopting AsyncStateOperator
The choice between using a SyncStateOperator and an AsyncStateOperator will be made when the ExecNode is translating into Transformation. Taking the regular join operator as an example.
The only difference between these two types of operators is in the way they access the state, and their semantics and context (such as join spec, left & right upsert keys, etc.) are identical. Therefore, it is feasible to reuse the ExecNodeGraph optimized and translated from the RelNode tree.
Quick tip: In theory, the implementation of AsyncStateOperator and SyncStateOperator differs only in their state handling. Their state schemas, business logic, and others are the same. Therefore, within the same Flink version, when the SQL and other Flink configurations remain unchanged, or when using the same compiled plan, users can freely switch between AsyncStateOperator and SyncStateOperator by toggling the configuration table.exec.async-state.enabled
, as they are fully compatible. One known exception is that after the SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2], switching back and forth between aligned and unaligned checkpoints is not compatible, which is an expected behavior.
Changes relying on more internal state APIs
In addition to some async state APIs proposed by FLIP-424[1], some operators also rely on other internal async state APIs. For example:
Some operators with bundles that use the
setCurrentKey
interface, such as StreamExecGlobalGroupAggregate when enabling minibatch.Some window operators that use the
setCurrentNamespace
interface, such as StreamExecWindowAggregate.
The corresponding internal async state APIs have been or will soon be supported.
New Async State Operators Developing Guidelines
Basic Guidelines
Given the significant differences between asynchronous and synchronous programming, and to enhance code performance, readability, and maintainability, we have established some basic guidelines to aid developers in writing SQL AsyncStateOperator operators.
If accessing multiple states without dependencies, maximize parallelism for accessing these states like initializing variables from them, updating them, and so on.
If subsequent code does not depend on the callback for state access, avoid blocking the main thread during state access.
Do not use
CompletableFuture
independently within an operator; instead, useStateFuture
provided by the state.If multiple programming approaches with the same semantics have no significant performance differences, choose the one with better readability and maintainability.
Best practices for different state accessing patterns
Currently, we have summarized over 20 stateful operators into 9 common patterns of accessing state.
For details on these patterns and their corresponding paradigms, please refer to the Appendix chapter.
Compatibility, Deprecation, and Migration Plan
These APIs are newly introduced and do not involve compatibility issues.
Test Plan
HarnessTests and IT tests will be added to validate the changes.
Additional, tests to verify the state compatibility during transitions between sync and async state operators will be introduced.
Furthermore, benchmark testing for AsyncStateOperators will be introduced later to prevent significant regressions from logic changes in each version, thereby improving maintainability.
For specific details regarding the testing plan, please refer to the "Roadmap" section.
Roadmap
The roadmap for this FLIP will align with that in FLIP-423[3].
Flink 2.0:
Aim to complete the AsyncState operators for Join, Agg, and WindowAgg, and add harness tests and IT tests for data correctness for them.
These streaming operators are widely used and play a significant role in the majority of stateful nexmark queries.
- Aim to include compatibility tests for the state transitions between async and sync for these Async State operators.
Flink 2.1 and later:
Complete the AsyncState operators for Over, Window(such as WindowAgg, WindowRank, etc) and other stateful operators, and add harness tests and IT tests for data correctness for them.
Refactor the sync and async state operators, leveraging shared logical calculations while abstracting the state access details.
Optimize each operator based on the Nexmark results.
Establish benchmark testing for AsyncStateOperators to prevent significant performance regressions.
There are still some tasks to be planned, such as integrating SQL AsyncStateOperator with FLIP-455[2]. Since this only involves internal changes and does not affect the public API, a separate FLIP follow-up is not required.
Rejected Alternatives
Introduce Configuration Options for Each AsyncStateOperator
Introduce separate configurations for each new operator like below.
table.exec.async-state.join.enabled table.exec.async-state.agg.enabled table.exec.async-state.rank.enabled ......
However, given that there are currently nearly 20+ stateful operators, these options would be difficult to maintain.
Introducing AsyncStateExecNode and SyncStateExecNode
Treat AsyncStateOperator and SyncStateOperator as two entirely different operators with different execution methods. The choice between using AsyncStateExecNode or SyncStateExecNode is determined during the translation to ExecGraph.
However, AsyncStateExecNode and SyncStateExecNode are almost identical. There are no semantic or contextual differences between them. They only differ slightly in the creation of their respective operators. This is similar to the existing SyncLookupJoin and AsyncLookupJoin implementations in the current code.
Reference
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs
Appendix
Patterns and Corresponding Paradigms
1. Sequentially Access a Single State
Sequentially access a single state, using operations such as get, update, and clear in order.
Long cnt = valueState.value(); if (cnt == null) { cnt = 0L; } else if (cnt % 2 == 0) { cnt = cnt / 2 + record; } else { cnt = cnt + 1 + record; } valueState.update(cnt);
valueState .asyncValue() .thenApply( cnt -> { if (cnt == null) { return 0L; } else if (cnt % 2 == 0) { return cnt / 2 + record; } else { return cnt + 1 + record; } }) .thenAccept(updateCnt -> valueState.asyncUpdate(updateCnt));
Rejected Programming Paradigms
// From a maintainability perspective, // reuse code as much as possible. valueState .asyncValue() .thenApply( cnt -> { if (cnt == null) { return valueState.asyncUpdate(0L); } else if (cnt % 2 == 0) { return valueState.asyncUpdate(cnt / 2 + record); } else { return valueState.asyncUpdate(cnt + 1 + record); } });
2. State Access Located within Conditional Branches
Access one or more states within the branches of conditional statements.
Integer value = mapState.get(key); if(value != null){ if(value > 1){ mapState.put(key, value - 1); } else { mapState.remove(key); } } else { mapState.put(key, 100); }
mapState.asyncGet(key).thenAccept(value -> { if (value != null) { if (value > 1) { mapState.asyncPut(key, value - 1); } else { mapState.asyncRemove(key); } } else { mapState.asncPut(key, 100); } })
3. Simultaneous Access to Multiple Independent States
public void clear(){ mapState1.clear(); mapState2.clear(); valueState1.clear(); valueState2.clear(); } // or public void clear(State... states) { for (State state : states) { state.clear(); } }
public void clear(){ mapState1.asyncClear(); mapState2.asyncClear(); valueState1.asyncClear(); valueState2.asyncClear(); } // or use `StateFutureUtils#combineAll` // if subsequent logic depends on this block public StateFuture<Void> clear(){ return StateFutureUtils.combineAll( Arrays.asList( mapState1.asyncClear(), mapState2.asyncClear(), valueState1.asyncClear(), valueState2.asyncClear())); }
Rejected Programming Paradigms
// From a maintainability perspective, // it is somewhat redundant public void clear(){ StateFutureUtils .completedVoidFuture() .thenAccept(VOID -> mapState1.asyncClear()) .thenAccept(VOID -> mapState2.asyncClear()) .thenAccept(VOID -> valueState1.asyncClear()) .thenAccept(VOID -> valueState2.asyncClear()) .thenAccept(VOID -> finish()); }
4. Sequentially Invoke Multiple States with Subsequent Calculations Depending on Their Results
Long var1 = valueState1.value() + tuple.f0; Long var2 = valueState2.value() + tuple.f1; if (Objects.equals(var1, var2)) { print("equals"); } else { print("not equals"); } valueState1.update(var1); valueState2.update(var2);
// use `thenCombine` AtomicLong var1 = new AtomicLong(); AtomicLong var2 = new AtomicLong(); valueState1 .asyncValue() .thenCombine( valueState2.asyncValue(), (v1, v2) -> { var1.set(v1 + record.f0); var2.set(v2 + record.f1); return null; }) .thenAccept( VOID -> { if (Objects.equals(var1.get(), var2.get())) { print("equals"); } else { print("not equals"); } }) .thenAccept( VOID -> StateFutureUtils.combineAll( Arrays.asList( valueState1.asyncUpdate(var1.get()), valueState2.asyncUpdate(var1.get())))); // or use `combineAll` // The performance differences between the `thenCombine` and `combineAll` are minimal. // See more at the following section. AtomicLong var1 = new AtomicLong(); AtomicLong var2 = new AtomicLong(); StateFutureUtils.combineAll( Arrays.asList( valueState1 .asyncValue() .thenAccept(v1 -> var1.set(v1 + record.f0)), valueState2 .asyncValue() .thenAccept(v2 -> var2.set(v2 + record.f1)))) .thenAccept( VOID -> { if (Objects.equals(var1.get(), var2.get())) { print("equals"); } else { print("not equals"); } }) .thenAccept( VOID -> StateFutureUtils.combineAll( Arrays.asList( valueState1.asyncUpdate(var1.get()), valueState2.asyncUpdate(var1.get()))));
5. Iterating Through Map/ List State without Mutual Dependencies During Traversal
long sum = 0L; for (Map.Entry<Long, List<RowData>> entry : mapState.entries()) { sum += entry.getKey(); } print(sum); mapState.put(rowData.getLong(0), rowData.getLong(1)); // or long sum = 0L; Iterator<Map.Entry<Long, List<RowData>>> it = mapState.entries().iterator(); while (it.hasNext()) { sum += entry.getKey(); } print(sum); mapState.put(rowData.getLong(0), rowData.getLong(1));
AtomicLong sum = new AtomicLong(0L); mapState .asyncEntries() .thenAccept( entryStateIterator -> entryStateIterator.onNext( entry -> { sum.addAndGet(entry.getValue()); })) .thenAccept(VOID -> { // update state before to enable parallel processing // with cpu computation "print" mapState.asyncPut(rowData.getLong(0), rowData.getLong(1)); print(sum.get()); });
6. Iterating Through List State with Mutual Dependencies During Traversal
long var = 0L; for (Long element : listState.get()) { if(var % 2 == 0){ var = var / 2 + element; } else { var = var * 2 + element; } } print(var); listState.add(var.get());
AtomicLong var = new AtomicLong(0L); listState .asyncGet() .thenCompose( listStateIterator -> listStateIterator.onNext( element -> { if (var.get() % 2 == 0) { var.set(var.get() / 2 + element); } else { var.set(var.get() * 2 + element); } })) .thenAccept( VOID -> { // update state before to enable parallel processing // with cpu computation "print" listState.asyncAdd(var.get()); print(var.get()); });
7. Non-List State Iteration with State Access and Mutual Dependencies During Traversal
long var = 0L; for (Long data : list) { if (var % 2 == 0) { curVar = preVar + data / 2; } else { curVar = preVar + data; } mapState.put(data, curVar); } print(var);
StateFuture<Long> varFuture = StateFutureUtils.completedFuture(0L); for (Long data : list) { varFuture = varFuture.thenCompose( preVar -> { final long curVar; if (preVar % 2 == 0) { curVar = preVar + data / 2; } else { curVar = preVar + data; } // The list may contain the same key multiple times, // so the order dependency needs to be considered return mapState .asyncPut(data, curVar) .thenCompose( VOID -> StateFutureUtils.completedFuture(curVar)); }); } varFuture.thenAccept(var -> print(var));
8. Non-Map/List State Iteration with State Access and No Mutual Dependencies During Traversal
for (Map.Entry<Long, Long> entry : cacheMap.entrySet()) { print(dataState.get(entry.getKey())); mapState.update(entry.getKey(), entry.getValue()); }
or (Map.Entry<Long, Long> entry : cacheMap.entrySet()) { mapState .asyncGet(entry.getKey()) .thenAccept(value -> { // update state before to enable parallel processing // with cpu computation "print" mapState.asyncPut(entry.getKey(), entry.getValue()); print(value); }); }
9. Iterating Through Map/List State Using Iterator to Update/Remove Data
mapState.put(rowData.getLong(0), rowData.getLong(1)); Iterator<Map.Entry<Long, Long>> it = mapState.entries().iterator(); boolean isEmpty = true; while (it.hasNext()) { isEmpty = false; Map.Entry<Long, Long> entry = it.next(); if (entry.getValue() % 2 == 0) { // remove it in state it.remove(); } else { // update back to state entry.setValue(entry.getValue() + rowData.getLong(2)); } }
mapState .asyncPut(record.f0, record.f1) .thenCompose(VOID -> mapState.asyncEntries()) .thenAccept( entryStateIterator -> entryStateIterator.onNext( entry -> { if (entry.getValue() % 2 == 0) { mapState.asyncRemove(entry.getKey()); } else { mapState.asyncPut( entry.getKey(), entry.getValue() + rowData.getLong(0)); } }));
State Access Patterns in Various Stateful Operators
No. | Operator Name | No. of Patterns |
1 | Join | 1, 2, 3, 5, 8 |
2 | Temporal Join | 1, 2, 3, 5, 8, 9 |
3 | LookupJoin | 1, 2 |
4 | IntervalJoin | 1, 2, 3, 5, 9 |
5 | Rank | 1, 2, 5, 7, 8 |
6 | Limit | 1, 2, 5, 7, 8 |
7 | SortLimit | 1, 2, 5, 7, 8 |
8 | Deduplicate | 1, 2, 8 |
9 | OverAggregate | 1, 2, 3, 4, 5, 7, 8 |
10 | GlobalGroupAggregate | 1, 2 |
11 | GroupAggregate | 1, 2 |
12 | GroupTableAggregate | 1, 2 |
13 | GroupWindowAggregate | 1, 2, 3, 5, 8 |
14 | IncrementalGroupAggregate | none |
15 | WindowAggregate | 1, 2, 3, 5, 8 |
16 | GlobalWindowAggregate | 1, 2, 3, 5, 8 |
17 | WindowDeduplicate | 1, 2 |
18 | WindowJoin | 1, 2, 3, 4, 6 |
19 | WindowRank | 1, 2, 5, 8 |
20 | WindowTableFunction | 1, 2, 3, 4, 5 |
21 | Sort | — |
22 | TemporalSort | 1, 2, 3, 5 |
23 | ChangelogNormalize | 1, 2, 8 |
24 | Sink | — |
25 | Source | — |
26 | Match | 1, 2, 5, 7, 9 |
27 | PythonGroupAggregate | — |
28 | PythonGroupTableAggregate | — |
29 | PythonGroupWindowAggregate | — |
30 | PythonOverAggregate | — |
POC for join operator
https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8
Performance comparison between `thenCombine` and `combineAll`
Test Environment:
Inter Core i7 2.6GHz + 16GB
Use local dir as forst (an async statebackend) remote dir
Use mini-cluster
10000 Records
Warmup Round: 1
Test Round: 10
Result:
Time spent (ms) | |
thenCombine | 6718.214 |
combineAll | 6722.227 |
This indicates that the performance gap between thenCombine
and combineAll
is negligible, allowing developers to choose either option without any major impact.
Testing branch:https://github.com/xuyangzhong/flink/tree/async_state_api_benchmark