Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Furthermore, benchmark testing for AsyncStateOperators will be introduced later to prevent significant regressions from logic changes in each version, thereby improving maintainability.

For specific details regarding the testing plan, please refer to the "Roadmap" section.

Roadmap

The roadmap for this FLIP will align with that in FLIP-423[3].

  1. Flink 2.0: aim

    1. Aim to complete the AsyncState operators for Join, Agg, and WindowAgg, and add harness tests and IT tests for data correctness  for them.

      1. These streaming operators are widely used and play a significant role in the majority of stateful nexmark queries.

    2. Aim to include compatibility tests for the state transitions between async and sync for these Async State operators.
  2. Flink 2.1 and later:

    1. Complete the AsyncState operators for Over, Window(such as WindowAgg, WindowRank, etc) and other stateful operators, and add harness tests and IT tests for data correctness  for them.

    2. Refactor the sync and async state operators, leveraging shared logical calculations while abstracting the state access details.

    3. Optimize each operator based on the Nexmark results.

    4. Establish benchmark testing for AsyncStateOperators to prevent significant performance regressions.

There are still some tasks to be planned, such as integrating SQL AsyncStateOperator with FLIP-455[2]. Since this only involves internal changes and does not affect the public API, a separate FLIP follow-up is not required.

...

Code Block
titleAsync State API
// use `thenCombine`
AtomicLong var1 = new AtomicLong();
AtomicLong var2 = new AtomicLong();
valueState1
  .asyncValue()
  .thenCombine(
    valueState2.asyncValue(),
    (v1, v2) -> {
      var1.set(v1 + record.f0);
      var2.set(v2 + record.f1);
      return null;
    })
  .thenAccept(
    VOID -> {
      if (Objects.equals(var1.get(), var2.get())) {
        print("equals");
      } else {
        print("not equals");
      }
    })
  .thenAccept(
    VOID ->
      StateFutureUtils.combineAll(
        Arrays.asList(
          valueState1.asyncUpdate(var1.get()),
          valueState2.asyncUpdate(var1.get()))));

// or use `combineAll`
// The performance differences between the `thenCombine` and `combineAll` are minimal.
// See more at the following section.
AtomicLong var1 = new AtomicLong();
AtomicLong var2 = new AtomicLong();
StateFutureUtils.combineAll(
  Arrays.asList(
    valueState1
      .asyncValue()
      .thenAccept(v1 -> var1.set(v1 + record.f0)),
    valueState2
      .asyncValue()
      .thenAccept(v2 -> var2.set(v2 + record.f1))))
  .thenAccept(
    VOID -> {
      if (Objects.equals(var1.get(), var2.get())) {
        print("equals");
      } else {
        print("not equals");
      }
    })
  .thenAccept(
    VOID ->
    StateFutureUtils.combineAll(
      Arrays.asList(
        valueState1.asyncUpdate(var1.get()),
        valueState2.asyncUpdate(var1.get()))));

...


Time spent (ms)

thenCombine

6718.214

combineAll

6722.227

This indicates that the performance gap between thenCombine  and combineAll  is negligible, allowing developers to choose either option without any major impact.

Testing branch:https://github.com/xuyangzhong/flink/tree/async_state_api_benchmark

...