This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-424[1] introduced an asynchronous state architecture to mitigate I/O bottlenecks in Flink by offloading state access to separate threads from the main thread in each task, thereby optimizing I/O and computational resource utilization.

Building upon this infrastructure, this FLIP proposes introducing new SQL operators based on these asynchronous state APIs (referred to as AsyncStateOperators, which correspond to the old operators called SyncStateOperators). Our goal is to enhance throughput and reduce latency in stateful operations by leveraging the new asynchronous state architecture in Flink SQL jobs.

Public Interfaces

Configuration Options

To enable these new AsyncStateOperators, a new configuration option will be introduced.

// default is "false"
table.exec.async-state.enabled

When the configuration is set to `true`, all available AsyncStateOperators will replace the corresponding SyncStateOperators.

We plan to gradually adapt each existing SyncStateOperator to AsyncStateOperator over multiple Flink versions. This single configuration is used to uniformly manage their toggle.

Note: The existing configuration following also needs to be set to the corresponding state backend that supports async state when table.exec.async-state.enabled is set to `true`.

// set a state backend that supports async state
// like `org.apache.flink.state.forst.ForStStateBackendFactory`
state.backend: ..

User-defined Aggregate Functions and Table Aggregate Functions

Currently, user can directly use the subclasses of the interface DataView, such as MapView and ListView, in User-defined Aggregate Functions and Table Aggregate Functions without being aware of StateDataView. However, the way state is accessed in StateDataView has changed, and therefore the APIs for User-defined Aggregate Functions and Table Aggregate Functions will be adjusted accordingly. These details will be addressed in a separate FLIP introduced later.

Proposed Changes

This FLIP will not dig into detailed implementation specifics, but it will offer a brief overview of the implementation approach.

Phase of adopting AsyncStateOperator

The choice between using a SyncStateOperator and an AsyncStateOperator will be made when the ExecNode is translating into Transformation. Taking the regular join operator as an example.

The only difference between these two types of operators is in the way they access the state, and their semantics and context (such as join spec, left & right upsert keys, etc.) are identical. Therefore, it is feasible to reuse the ExecNodeGraph optimized and translated from the RelNode tree.

Quick tip: In theory, the implementation of AsyncStateOperator and SyncStateOperator differs only in their state handling. Their state schemas, business logic, and others are the same. Therefore, within the same Flink version, when the SQL and other Flink configurations remain unchanged, or when using the same compiled plan, users can freely switch between AsyncStateOperator and SyncStateOperator by toggling the configuration table.exec.async-state.enabled, as they are fully compatible. One known exception is that after the SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2], switching back and forth between aligned and unaligned checkpoints is not compatible, which is an expected behavior.

Changes relying on more internal state APIs

In addition to some async state APIs proposed by FLIP-424[1], some operators also rely on other internal async state APIs. For example:

  1. Some operators with bundles that use the setCurrentKey interface, such as StreamExecGlobalGroupAggregate when enabling minibatch.

  2. Some window operators that use the setCurrentNamespace interface, such as StreamExecWindowAggregate.

The corresponding internal async state APIs have been or will soon be supported.

New Async State Operators Developing Guidelines

Basic Guidelines

Given the significant differences between asynchronous and synchronous programming, and to enhance code performance, readability, and maintainability, we have established some basic guidelines to aid developers in writing SQL AsyncStateOperator operators.

  1. If accessing multiple states without dependencies, maximize parallelism for accessing these states like initializing variables from them, updating them, and so on.

  2. If subsequent code does not depend on the callback for state access, avoid blocking the main thread during state access.

  3. Do not use CompletableFuture independently within an operator; instead, use StateFuture provided by the state.

  4. If multiple programming approaches with the same semantics have no significant performance differences, choose the one with better readability and maintainability.

Best practices for different state accessing patterns

Currently, we have summarized over 20 stateful operators into 9 common patterns of accessing state.

For details on these patterns and their corresponding paradigms, please refer to the Appendix chapter.

Compatibility, Deprecation, and Migration Plan

These APIs are newly introduced and do not involve compatibility issues.

Test Plan

HarnessTests and IT tests will be added to validate the changes.

Additional, tests to verify the state compatibility during transitions between sync and async state operators will be introduced.

Furthermore, benchmark testing for AsyncStateOperators will be introduced later to prevent significant regressions from logic changes in each version, thereby improving maintainability.

For specific details regarding the testing plan, please refer to the "Roadmap" section.

Roadmap

The roadmap for this FLIP will align with that in FLIP-423[3].

  1. Flink 2.0:

    1. Aim to complete the AsyncState operators for Join, Agg, and WindowAgg, and add harness tests and IT tests for data correctness  for them.

      1. These streaming operators are widely used and play a significant role in the majority of stateful nexmark queries.

    2. Aim to include compatibility tests for the state transitions between async and sync for these Async State operators.
  2. Flink 2.1 and later:

    1. Complete the AsyncState operators for Over, Window(such as WindowAgg, WindowRank, etc) and other stateful operators, and add harness tests and IT tests for data correctness  for them.

    2. Refactor the sync and async state operators, leveraging shared logical calculations while abstracting the state access details.

    3. Optimize each operator based on the Nexmark results.

    4. Establish benchmark testing for AsyncStateOperators to prevent significant performance regressions.

There are still some tasks to be planned, such as integrating SQL AsyncStateOperator with FLIP-455[2]. Since this only involves internal changes and does not affect the public API, a separate FLIP follow-up is not required.

Rejected Alternatives

Introduce Configuration Options for Each AsyncStateOperator

Introduce separate configurations for each new operator like below.

table.exec.async-state.join.enabled
table.exec.async-state.agg.enabled
table.exec.async-state.rank.enabled
......

However, given that there are currently nearly 20+ stateful operators, these options would be difficult to maintain.

Introducing AsyncStateExecNode and SyncStateExecNode

Treat AsyncStateOperator and SyncStateOperator as two entirely different operators with different execution methods. The choice between using AsyncStateExecNode or SyncStateExecNode is determined during the translation to ExecGraph.

However, AsyncStateExecNode and SyncStateExecNode are almost identical. There are no semantic or contextual differences between them. They only differ slightly in the creation of their respective operators. This is similar to the existing SyncLookupJoin and AsyncLookupJoin implementations in the current code.

Reference

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs

[2] https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5DFLIP-455%3A+Declare+async+state+processing+and+checkpoint+the+in-flight+requests

[3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-RoadMap+LaunchingPlan

Appendix

Patterns and Corresponding Paradigms

1. Sequentially Access a Single State

Sequentially access a single state, using operations such as get, update, and clear in order.

Sync State API
Long cnt = valueState.value();
if (cnt == null) {
  cnt = 0L;
} else if (cnt % 2 == 0) {
  cnt = cnt / 2 + record;
} else {
  cnt = cnt + 1 + record;
}
valueState.update(cnt);

Async State API
valueState
  .asyncValue()
  .thenApply(
    cnt -> {
      if (cnt == null) {
        return 0L;
      } else if (cnt % 2 == 0) {
        return cnt / 2 + record;
      } else {
        return cnt + 1 + record;
      }
    })
  .thenAccept(updateCnt -> valueState.asyncUpdate(updateCnt));

Rejected Programming Paradigms

Async State API
// From a maintainability perspective, 
// reuse code as much as possible.
valueState
  .asyncValue()
  .thenApply(
    cnt -> {
      if (cnt == null) {
        return valueState.asyncUpdate(0L);
      } else if (cnt % 2 == 0) {
        return valueState.asyncUpdate(cnt / 2 + record);
      } else {
        return valueState.asyncUpdate(cnt + 1 + record);
      }
    });

2. State Access Located within Conditional Branches

Access one or more states within the branches of conditional statements.

Sync State API
Integer value = mapState.get(key);
if(value != null){
  if(value > 1){
    mapState.put(key, value - 1);
  } else {
    mapState.remove(key);
  }
} else {
  mapState.put(key, 100);
}

Async State API
mapState.asyncGet(key).thenAccept(value -> {
  if (value != null) {
    if (value > 1) {
      mapState.asyncPut(key, value - 1);
    } else {
      mapState.asyncRemove(key);
    }
  } else {
    mapState.asncPut(key, 100);
  }
})

3. Simultaneous Access to Multiple Independent States

Sync State API
public void clear(){
  mapState1.clear();
  mapState2.clear();
  valueState1.clear();
  valueState2.clear();
}

// or
public void clear(State... states) {
   for (State state : states) {
     state.clear();
   }
}

Async State API
public void clear(){
  mapState1.asyncClear();
  mapState2.asyncClear();
  valueState1.asyncClear();
  valueState2.asyncClear();
}

// or use `StateFutureUtils#combineAll` 
// if subsequent logic depends on this block
public StateFuture<Void> clear(){
  return StateFutureUtils.combineAll(
    Arrays.asList(
      mapState1.asyncClear(),
      mapState2.asyncClear(),
      valueState1.asyncClear(),
      valueState2.asyncClear()));
}

Rejected Programming Paradigms

Async State API
// From a maintainability perspective,
// it is somewhat redundant
public void clear(){
  StateFutureUtils
    .completedVoidFuture()
    .thenAccept(VOID -> mapState1.asyncClear())
    .thenAccept(VOID -> mapState2.asyncClear())
    .thenAccept(VOID -> valueState1.asyncClear())
    .thenAccept(VOID -> valueState2.asyncClear())
    .thenAccept(VOID -> finish());
}

4. Sequentially Invoke Multiple States with Subsequent Calculations Depending on Their Results

Sync State API
Long var1 = valueState1.value() + tuple.f0;
Long var2 = valueState2.value() + tuple.f1;

if (Objects.equals(var1, var2)) {
  print("equals");
} else {
  print("not equals");
}

valueState1.update(var1);
valueState2.update(var2);

Async State API
// use `thenCombine`
AtomicLong var1 = new AtomicLong();
AtomicLong var2 = new AtomicLong();
valueState1
  .asyncValue()
  .thenCombine(
    valueState2.asyncValue(),
    (v1, v2) -> {
      var1.set(v1 + record.f0);
      var2.set(v2 + record.f1);
      return null;
    })
  .thenAccept(
    VOID -> {
      if (Objects.equals(var1.get(), var2.get())) {
        print("equals");
      } else {
        print("not equals");
      }
    })
  .thenAccept(
    VOID ->
      StateFutureUtils.combineAll(
        Arrays.asList(
          valueState1.asyncUpdate(var1.get()),
          valueState2.asyncUpdate(var1.get()))));

// or use `combineAll`
// The performance differences between the `thenCombine` and `combineAll` are minimal.
// See more at the following section.
AtomicLong var1 = new AtomicLong();
AtomicLong var2 = new AtomicLong();
StateFutureUtils.combineAll(
  Arrays.asList(
    valueState1
      .asyncValue()
      .thenAccept(v1 -> var1.set(v1 + record.f0)),
    valueState2
      .asyncValue()
      .thenAccept(v2 -> var2.set(v2 + record.f1))))
  .thenAccept(
    VOID -> {
      if (Objects.equals(var1.get(), var2.get())) {
        print("equals");
      } else {
        print("not equals");
      }
    })
  .thenAccept(
    VOID ->
    StateFutureUtils.combineAll(
      Arrays.asList(
        valueState1.asyncUpdate(var1.get()),
        valueState2.asyncUpdate(var1.get()))));

5. Iterating Through Map/ List State without Mutual Dependencies During Traversal

Sync State API
long sum = 0L;
for (Map.Entry<Long, List<RowData>> entry : mapState.entries()) {
  sum += entry.getKey();
}
print(sum);
mapState.put(rowData.getLong(0), rowData.getLong(1));

// or
long sum = 0L;
Iterator<Map.Entry<Long, List<RowData>>> it = mapState.entries().iterator();
while (it.hasNext()) {
  sum += entry.getKey();
}
print(sum);
mapState.put(rowData.getLong(0), rowData.getLong(1));

Async State API
AtomicLong sum = new AtomicLong(0L);
mapState
  .asyncEntries()
  .thenAccept(
    entryStateIterator ->
    entryStateIterator.onNext(
      entry -> {
        sum.addAndGet(entry.getValue());
      }))
  .thenAccept(VOID -> {
    // update state before to enable parallel processing
    // with cpu computation "print"
    mapState.asyncPut(rowData.getLong(0), rowData.getLong(1));
    print(sum.get());
  });

6. Iterating Through List State with Mutual Dependencies During Traversal

Sync State API
long var = 0L;
for (Long element : listState.get()) {
  if(var % 2 == 0){
    var = var / 2 + element;
  } else {
    var = var * 2 + element;
  }
}

print(var);
listState.add(var.get());

Async State API
AtomicLong var = new AtomicLong(0L);
listState
  .asyncGet()
  .thenCompose(
    listStateIterator ->
      listStateIterator.onNext(
        element -> {
          if (var.get() % 2 == 0) {
            var.set(var.get() / 2 + element);
          } else {
            var.set(var.get() * 2 + element);
          }
        }))
  .thenAccept(
    VOID -> {
      // update state before to enable parallel processing
      // with cpu computation "print"
      listState.asyncAdd(var.get());
      print(var.get());
    });

7. Non-List State Iteration with State Access and Mutual Dependencies During Traversal

Sync State API
long var = 0L;
for (Long data : list) {
  if (var % 2 == 0) {
    curVar = preVar + data / 2;
  } else {
    curVar = preVar + data;
  }
  mapState.put(data, curVar);
}
print(var);

Async State API
StateFuture<Long> varFuture = StateFutureUtils.completedFuture(0L);
for (Long data : list) {
  varFuture =
    varFuture.thenCompose(
      preVar -> {
        final long curVar;
        if (preVar % 2 == 0) {
          curVar = preVar + data / 2;
        } else {
          curVar = preVar + data;
        }
        // The list may contain the same key multiple times, 
        // so the order dependency needs to be considered
        return mapState
          .asyncPut(data, curVar)
          .thenCompose(
            VOID -> StateFutureUtils.completedFuture(curVar));
      });
}
varFuture.thenAccept(var -> print(var));

8. Non-Map/List State Iteration with State Access and No Mutual Dependencies During Traversal

Sync State API
for (Map.Entry<Long, Long> entry : cacheMap.entrySet()) {
  print(dataState.get(entry.getKey()));
  mapState.update(entry.getKey(), entry.getValue());
}

or (Map.Entry<Long, Long> entry : cacheMap.entrySet()) {
  mapState
    .asyncGet(entry.getKey())
    .thenAccept(value -> {
      // update state before to enable parallel processing
      // with cpu computation "print"
      mapState.asyncPut(entry.getKey(), entry.getValue());
      print(value);
    });
}

9. Iterating Through Map/List State Using Iterator to Update/Remove Data

Sync State API
mapState.put(rowData.getLong(0), rowData.getLong(1));
Iterator<Map.Entry<Long, Long>> it = mapState.entries().iterator();
boolean isEmpty = true;
while (it.hasNext()) {
  isEmpty = false;
  Map.Entry<Long, Long> entry = it.next();
  if (entry.getValue() % 2 == 0) {
    // remove it in state
    it.remove();
  } else {
    // update back to state
    entry.setValue(entry.getValue() + rowData.getLong(2));
  }
}

Async State API
mapState
  .asyncPut(record.f0, record.f1)
  .thenCompose(VOID -> mapState.asyncEntries())
  .thenAccept(
    entryStateIterator ->
      entryStateIterator.onNext(
        entry -> {
          if (entry.getValue() % 2 == 0) {    
            mapState.asyncRemove(entry.getKey());
          } else {
            mapState.asyncPut(
              entry.getKey(),
              entry.getValue() + rowData.getLong(0));
          }
        }));

State Access Patterns in Various Stateful Operators

No.

Operator Name

No. of Patterns

1

Join

1, 2, 3, 5, 8

2

Temporal Join

1, 2, 3, 5, 8, 9

3

LookupJoin

1, 2

4

IntervalJoin

1, 2, 3, 5, 9

5

Rank

1, 2, 5, 7, 8

6

Limit

1, 2, 5, 7, 8

7

SortLimit

1, 2, 5, 7, 8

8

Deduplicate

1, 2, 8

9

OverAggregate

1, 2, 3, 4, 5, 7, 8

10

GlobalGroupAggregate

1, 2

11

GroupAggregate

1, 2

12

GroupTableAggregate

1, 2

13

GroupWindowAggregate

1, 2, 3, 5, 8

14

IncrementalGroupAggregate

none

15

WindowAggregate

1, 2, 3, 5, 8

16

GlobalWindowAggregate

1, 2, 3, 5, 8

17

WindowDeduplicate

1, 2

18

WindowJoin

1, 2, 3, 4, 6

19

WindowRank

1, 2, 5, 8

20

WindowTableFunction

1, 2, 3, 4, 5

21

Sort

22

TemporalSort

1, 2, 3, 5

23

ChangelogNormalize

1, 2, 8

24

Sink

25

Source

26

Match

1, 2, 5, 7, 9

27

PythonGroupAggregate

28

PythonGroupTableAggregate

29

PythonGroupWindowAggregate

30

PythonOverAggregate

POC for join operator

https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8

Performance comparison between `thenCombine` and `combineAll`

Test Environment:

  • Inter Core i7 2.6GHz + 16GB

  • Use local dir as forst (an async statebackend) remote dir

  • Use mini-cluster

  • 10000 Records

  • Warmup Round: 1

  • Test Round: 10

Result:


Time spent (ms)

thenCombine

6718.214

combineAll

6722.227

This indicates that the performance gap between thenCombine  and combineAll  is negligible, allowing developers to choose either option without any major impact.

Testing branch:https://github.com/xuyangzhong/flink/tree/async_state_api_benchmark

  • No labels