...
Furthermore, benchmark testing for AsyncStateOperators will be introduced later to prevent significant regressions from logic changes in each version, thereby improving maintainability.
For specific details regarding the testing plan, please refer to the "Roadmap" section.
Roadmap
The roadmap for this FLIP will align with that in FLIP-423[3].
Flink 2.0: aim
Aim to complete the AsyncState operators for Join, Agg, and WindowAgg, and add harness tests and IT tests for data correctness for them.
These streaming operators are widely used and play a significant role in the majority of stateful nexmark queries.
- Aim to include compatibility tests for the state transitions between async and sync for these Async State operators.
Flink 2.1 and later:
Complete the AsyncState operators for Over, Window(such as WindowAgg, WindowRank, etc) and other stateful operators, and add harness tests and IT tests for data correctness for them.
Refactor the sync and async state operators, leveraging shared logical calculations while abstracting the state access details.
Optimize each operator based on the Nexmark results.
Establish benchmark testing for AsyncStateOperators to prevent significant performance regressions.
There are still some tasks to be planned, such as integrating SQL AsyncStateOperator with FLIP-455[2]. Since this only involves internal changes and does not affect the public API, a separate FLIP follow-up is not required.
...
Code Block | ||
---|---|---|
| ||
// use `thenCombine`
AtomicLong var1 = new AtomicLong();
AtomicLong var2 = new AtomicLong();
valueState1
.asyncValue()
.thenCombine(
valueState2.asyncValue(),
(v1, v2) -> {
var1.set(v1 + record.f0);
var2.set(v2 + record.f1);
return null;
})
.thenAccept(
VOID -> {
if (Objects.equals(var1.get(), var2.get())) {
print("equals");
} else {
print("not equals");
}
})
.thenAccept(
VOID ->
StateFutureUtils.combineAll(
Arrays.asList(
valueState1.asyncUpdate(var1.get()),
valueState2.asyncUpdate(var1.get()))));
// or use `combineAll`
// The performance differences between the `thenCombine` and `combineAll` are minimal.
// See more at the following section.
AtomicLong var1 = new AtomicLong();
AtomicLong var2 = new AtomicLong();
StateFutureUtils.combineAll(
Arrays.asList(
valueState1
.asyncValue()
.thenAccept(v1 -> var1.set(v1 + record.f0)),
valueState2
.asyncValue()
.thenAccept(v2 -> var2.set(v2 + record.f1))))
.thenAccept(
VOID -> {
if (Objects.equals(var1.get(), var2.get())) {
print("equals");
} else {
print("not equals");
}
})
.thenAccept(
VOID ->
StateFutureUtils.combineAll(
Arrays.asList(
valueState1.asyncUpdate(var1.get()),
valueState2.asyncUpdate(var1.get())))); |
...
Time spent (ms) | |
thenCombine | 6718.214 |
combineAll | 6722.227 |
This indicates that the performance gap between thenCombine
and combineAll
is negligible, allowing developers to choose either option without any major impact.
Testing branch:https://github.com/xuyangzhong/flink/tree/async_state_api_benchmark
...