DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Discussion thread | https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0 https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h |
|---|---|
| Vote thread | https://lists.apache.org/thread/drglfqv83z5n4hybdjn0lvcc3xjkrtk0 |
| JIRA | FLINK-34973 - Getting issue details... STATUS |
| Release | 2.0 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know the whole story.
Motivation
FLIP-424 introduces asynchronous state APIs with callbacks allowing state access to be executed in threads separate from the task thread, making better usage of I/O bandwidth and enhancing throughput. This FLIP proposes an execution framework for asynchronous state APIs. The execution code path for the new API is completely independent from the original one, where many runtime components are redesigned. We intend to delve into the challenges associated with asynchronous execution and provide an in-depth design analysis for each module. Furthermore, we will conduct a performance analysis of the new framework relative to the current implementation and examine how it measures up against other potential alternatives.
Asynchronous state APIs can improve system responsiveness and concurrency capabilities, but the out-of-order execution also brings in a series of challenges:
Context maintenance and switch
Each input element may involve several state accesses. In asynchronous state execution, the whole process will be split into several stages defined by chained futures and callbacks. They will be executed in task thread in order but code segments for different inputs will be mixed together. There should be a context for each callback execution recording any necessary runtime information. This also helps track whether an input element finished processing eventually.
Necessary order preservation
A record may contain multiple state requests, and state requests for different records may be interspersed with each other under the asynchronous state APIs, which may introduce some concurrency issues, such as 'dirty read'. Give the case below:
Fig 1. "Dirty read"
The records with the same key manipulate the same state entry, thus the execution order of their updates and reads will affect the final state. The new runtime framework should ensure that the execution order of those same-key records is effectively equivalent to the synchronous execution.
Watermarks and timers
Since the inputs are processed and finished out-of-order, the watermark sent to downstream and the timer trigger should be reconsidered with more cautious.
Proposed Changes
With all the above challenges, this FLIP proposes an asynchronous state execution framework. The whole picture of the framework is as follows:
Fig 2. A whole picture under asynchronous state API
The user's original code in processElement will be divided into several stages by futures and callbacks, with processElement serving as the entry point for input processing and individual callbacks dedicated to each state access. Consequently, the execution will become significantly more complex than before. Our design is founded on the following prerequisites:
- All user code segments (processElement, callbacks, onTimer etc.) will be executed in a single task thread as before.
- The order of elements with identical partitioned keys will be preserved, while allowing out-of-order execution for elements with different keys.
- Completed state access results, along with their callbacks, will be encapsulated in a mail that is inserted into the Mailbox. These mails will be processed with a higher priority than normal input processing.
- Control the total amount of data being processed and pause the processing of input if necessary.
The following chapters will introduce each part in detail.
Async Execution Controller
Fig 3. Async Execution Controller
As the most complicated component, the Async Execution Controller (AEC) receives state requests from user, and put them into execution according to some strategies. It is responsible for:
- Strictly preserving the sequence of elements bearing the same key by delaying subsequent requests until the processing of preceding ones is finalized. This is achieved by the Key Accounting unit, which holds the current in-flight key and tracks the corresponding ongoing records. For more details, please refer to 'Element Order'. The incoming state requests will split by this unit and push into different buffers:
- Active buffer: the requests in this buffer could be executed when the buffer is full or timeout is reached.
- Blocking buffer: the requests in that should wait until all preceding records with identical key finishing its execution. After which the queueing requests will move into the active buffer.
- Tracking the in-flight data(records) and blocking the input if too much data in flight (back-pressure). It invokes yield() to pause current operations, allowing for the execution of callbacks (mails in Mailbox).
Please note that the AEC utilizes 'records' (StreamRecord) to regulate the volume of in-flight data. Consequently, when a state request's callback triggers additional state requests, all these requests are associated with a single record that has already been authorized by the AEC to proceed. Therefore, they will not be impeded, and there will be no potential risk of stack overflow for recursive invocation of yield().
- To make full use of memory, the above two buffers also have the function of batching state accessing requests, which will be introduced in detail in FLIP-426 Access Remote State in Batch.
Pseudo-code when there is an incoming StateRequest that arrives AEC:
// incoming state request
if (!request.getRecord().canProceed()) {
// apply for processing
while (inFlightRecords >= limit) {
// too many inflight records, yield for callback execution.
yield();
}
inFlightRecords++;
permitToProceed(request.getRecord());
}
if (keyAccountingUnit.availableKey(request.getKey())) {
keyAccountingUnit.occupyKey(request.getKey);
put(activeBuffer, request);
} else {
put(blockingBuffer, request);
}
// run if needed
Callback execution
The StateFuture allows for the registration of a callback through its then() method, which will be invoked once the result of the state access is ready. We proposed executing these callbacks within the task thread, the primary benefits include:
- Prevention of concurrency issues when interacting with internal variables.
- Streamlined management of the execution order for callbacks and input processing.
- More precise semantics for asynchronous state access, concentrating solely on parallel state retrieval without involving other CPU-intensive parts of the user code.
The framework will execute a hook after the state request is finished. This hook encapsulates the callback function offered by then() method as a mail and delivers it to the mailbox executor, allowing the task thread to execute the callback function.
According to the existing implementation of the mailbox, the priority of the callback is higher than records. If there are both new records and old records' callbacks existing in the mailbox at the same time, callbacks will be processed first.
Fig 4. Callback execution
Here is a possible optimization: when the state request can return quickly, the callback can be executed immediately instead of being delivered to the mailbox, which is equivalent to synchronous execution.
Element Order
As described above, the user code will be executed out-of-order, as well as different records. To strictly ensure the consistency of results from synchronous and asynchronous running modes, it is proposed that:
- For records with the same key, only one is allowed to be processed at the same time. The following record will be buffered and blocked until the preceding same-key record finishes all its chained callbacks.
- For records with different keys, out-of-order execution is allowed.
The Key Accounting Unit is used to preserve the ordering between two independent chained StateFuture statements. For the "Dirty read" case in Fig.1:
- For the synchronous interface, everything is executed in sequence, and no extra work needs to be done.
- For the asynchronous interface, Record A should run with Read, Update and Output, while Record B should stay at Blocking buffer.
The core data structure within this Unit is to remember the ongoing record and its keys. A Map<Key, Record> is suitable in this scenario. The pseudo-code of classification state access is:
final Map<Key, Record> accounting;
public void buffer(Record record, Key key, StateAccess access) {
Record previous = accounting.putIfAbscent(key, record);
if (previous == null || previous == record) {
collectToActive(key, access);
} else {
collectToBlocking(key, access);
}
}
The challenge arises in determining when all the processing logic associated with Record A is fully executed. To address this, we have adopted a reference counting mechanism that tracks ongoing operations (either processing input or executing a callback) related to a single record. A record is considered completely processed when there are no remaining references from these operations. To ensure clarity and proper tracking, we have made four rules:
- For all keyed stream operators, the count increases by 1 before
Input#processElement, and the count decreases by 1 afterInput#processElement. - When calling
then(), the count increases by 1. - After the callback code block in
then()is executed, the count decreases by 1. - When the count is reduced to 0, it is considered that all operations on a record have finished.
Fig 5. The execution timeline of a record
The specific counting is as shown above when Record A is processing. ①②, ③④ and ⑤⑥ are corresponding pairs of increments and decrements that aim to track pending single code block. When the count reaches 0 after ⑥, the operation of B in the Blocking buffer will be moved to the Active buffer.
To cover the statements without a callback, in addition to the reference count marked in Fig.5, each state request itself is also protected by a paired reference count.
The counting update is totally managed by the framework and transparent to users.
Strict order of 'processElement'
According to the design above, the AEC takes control of the execution of state requests and corresponding records. However, if a StreamRecord in a keyed stream does not touch state, AEC loses control of it. It disrupts the order of records with the same key, while in some cases resulting in correctness issues. To address this, we introduce a more strict mode keeping the order of invoking 'processElement':
- Record-ordered (default): The records with same keys are strictly processed in order of arrival.
- State-ordered: For same-key records, state requests and subsequent callbacks are processed in the order in which each record makes its first state request. But the code before the first state request for each record can be processed out-of-order with requests from other records.
The State-ordered is only for internal usage. We won't expose this option to users in first version.
We have introduced the state-ordered mode above, and for a more strict mode record-ordered, there will be some change before invoking 'processElement'. To maximumly reuse the logic of AEC, in record-ordered mode, an 'empty' state request is sent to AEC with 'processElement' as its callback. It does nothing but invoke the callback once the AEC thinks it can be processed (be put into the Active Buffer). Thus the 'processElement' encapsulated into the 'empty' state request will follow the logic of AEC above and start processing after all preceding same-key records finish their execution, otherwise it will stay in Blocking Buffer or call yield() if there are too many in-flight records.
Checkpoint Trigger
As described in the "Async Execution Controller(AEC)" section, the in-flight state requests will be tracked in the Acitve Buffer and Blocking Buffer. When the Task starts to perform a checkpoint, all in-flight data must be drained before triggering the checkpoint of StateBackend. The incoming data in the AEC will be blocked until the "drain" operation is complete.
In more detail, AEC uses a inFilghtReocordNum variable to trace the current number of records in progress. Every time the AEC receives a new record, the inFilghtReocordNum increases by 1; when all processing and callback for this record have completed, the inFilghtReocordNum decreases by 1. When processing one checkpoint mail, the current task thread will give up the time slice through the yield() method of the mailbox executor, so that the ongoing state request’s callback and the blocking state requests will be drained first until inFlightRecordNum reduces to 0.
1) Impact of draining on job performance
The checkpoint draining operation is not expected to have a significant impact on job performance, as it processes ongoing user data much like it does during runtime. In other words, from the user's perspective, there is almost no interruption in data processing and output during checkpoint draining.
2) Impact of draining on checkpoint duration
The AEC controls the maximum number of in-flight records, which means that the number of records that need to be drained during the checkpoint synchronous phase is controllable. When the collision rate of blocking keys among these in-flight records is low, the in-flight data can be expeditiously processed in parallel. Conversely, a high collision rate necessitates that the conflicting keys be processed sequentially. Fortunately, the cache in a block or file could help ensure the state same-key access performance. Overall, although the draining operation may prolong the checkpoint synchronous duration, its impact is controllable.
How to build checkpoints for disaggregated state storage is beyond the scope of this FLIP, and FLIP-428 will discuss how to integrate checkpoint and optimize the checkpoint end-to-end duration by accelerating the checkpoint asynchronous phase.
Timers
Timers are used to schedule actions for later (event-time or processing-time), such as firing a window, or calling back a ProcessFunction.
In Flink, the timer is a special state, and the scheduling of timers will access and update the timer state. And the callback of timers (such as KeyedProcessFunction#onTimer()) may access other keyed states. Let's discuss these two parts separately.
1. Timer as a special state.
Currently, Flink provides two implementations for storing timers, timers can be stored on the JVM heap or RocksDB. Asynchronous access to timer may complicate timer scheduling, for convenience, in basic version of the framework, we plan to directly use the JVM heap to store timers, and the registration and access of timers are synchronous.
2. Other keyed state's operations in the callback of timers.
It should be emphasized that timers are registered on a KeyedStream. The callback of a timer is also encapsulated as a mail, which would be executed synchronously in the task thread. Hence, the trigger of timers can be regarded as a special record, and the Key Accounting Unit described in the section "Element Order" also can be used to ensure the order of state operation of timers and other records. The subtle difference is that the reference count of one record also should increase/decrease by 1 around WindowOperator#onEventTime()/onProcessingTime()and KeyedProcessOperator#onEventTime()/onProcessingTime().
Watermark
The mechanism in Flink to measure progress in event time is watermark. As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators.
There is an implicit order requirement between watermarks and records, the watermark should be emitted to downstream when all preceding records are finished. To achieve these under asynchronous state APIs, we propose a mechanism that segments inputs into distinct epochs, marked by the arrival of watermark. Records are assigned to a unique epoch based on their arrival. If all the records belonging to one epoch finished their execution, the watermark right behind those records should be processed (advance event-time && sent to downstream) immediately. There are three statuses for an epoch:
- Open: The latter non-record input has not arrived. So arriving records will be collected into this epoch.
- Closed: The records belonging to this epoch are settled since the following non-record input has arrived.
- Finished: The records of this epoch have finished execution after the epoch is closed.
A module named EpochManager is invented to handle the epoch using a queue, making sure a watermark behind an epoch should be triggered after all records in or before this epoch have finished processing. Besides this, EpochManager offers two modes for the execution order of a watermark and subsequent records:
- Strictly-ordered: Latter records should wait until the preceding watermark is triggered.
- Out-of-order (default): Latter records and preceding watermark can be processed out-of-order.
The semantics of watermarks do not define the sequence between a watermark and subsequent records. For the most part, this is inconsequential, except it may affect some current users who have previously relied on the implicit assumption of an ordered execution. In the first version, we initially support only out-of-order processing. We may consider exposing the 'Strictly-ordered' mode once we encounter a concrete use case that necessitates it.
Here we introduce the out-of-order execution mode:
- There is only one epoch open. Collect arriving records for that epoch.
- If a non-record input (watermark) arrives, close the current open epoch and register a callback for it. Open another epoch collecting the records.
- Count the pending records for each non-finished epoch, finish an epoch if all records finished execution.
- Trigger the callback when:
- An epoch's status transit to Finished.
- AND it is the very first epoch in the queue.
- Remove the epoch after triggering.
Fig 6. The order between watermarks and records in out-of-order mode
In out-of-order mode, records in subsequent epochs may be processed before the timer triggering brought by watermark, which is different from the behavior under the synchronous state APIs. In strictly-ordered mode, all subsequent inputs will be blocked until the current epoch is finished, which may bring in a small drop in throughput. User may choose between the two modes according to their needs, to achieve a better performance or strictly state access order semantics.
For jobs that use event-time, the epoch mechanism introduces additional operations on the read/write paths. We provide a relatively efficient implementation in the PoC, and overall its overhead is relatively controllable.
Other Non-record Events
For other non-record events, other than watermark and checkpoint barrier, such as RecordAttributes, EndOfDataMarker and so on, they will leverage the EpochManager to handle the execution as well. By default, they hold strict order with subsequent and preceding records. Also, we won't expose the option for the execution order of each non-record event until we have identify the concrete use cases that necessitate it.
Record Context
Within the task thread, operations of multiple records are executed in a time-sliced manner, thus context switching is performed. It is essential to maintain a context that preserves the necessary variables required by each operation. Thus the 'record context' is introduced. All operations for one record will share the same record context, which consists of at least 5 parts:
- the record itself
- the key group to which the record belongs
- the hashcode of this record
- the reference count of this record as section "Element Order" introduced.
- the epoch this record belongs to as the section "Watermark" described.
An operation and a record context constitute an execution unit, which will be encapsulated into a mail and delivered to the mailbox for execution.
Faster Checkpoint Drain
The record processing but also enlarge the size of elements to drain during sync phase of checkpointing. Thus we consider an optimization here. Since the record processing has been broken down to state requests and its callback, we could drain all the running state requests and checkpoint the buffered ones. This will greatly shorten the drain time.
We will consider a set of declarative APIs and a `declareProcess()` function that users should implement in some newly introduced AbstractStreamOperator, built on top of current execution model. User could declare each processing code block (callback) with an unique id. During a checkpoint, the buffered state requests and corresponding ids will be snapshoted, and when restoring, the restored state requests will find the corresponding code block by the restored id. The code block is declared in user code and will be loaded each time before task runs. After all state requests and their callbacks restored in AEC, the whole pipeline could be resumed.
Note that we will provide the set of declarative APIs and a `declareProcess()` in upcoming FLIPs, the raw usage of `processElement` is strictly for testing purpose.
Error Handling
The FLIP-368 proposed to reorganize the exceptions thrown by state interfaces. This FLIP also adopts the design from FLIP-368, ensuring that all state interfaces throw unchecked exceptions and, consequently, do not declare any exceptions in their signatures. In cases where an exception occurs while accessing the state, the job should fail. This relieves the user from the burden of error handling, which is particularly cumbersome in asynchronous interfaces.
Configurations
A new set of configure options is introduced as well as their default value:
- execution.async-state.enabled: true
- execution.async-state.in-flight-records-limit: 6000 ---- The record count that the AEC permit to proceed
- execution.async-state.buffer-size: 1000 ---- The size of 'Active Buffer', usually equal to or smaller than the 'execution.async-state.in-flight-records-limit'
- execution.async-state.buffer-timeout: 1s ---- The timeout for 'Active Buffer'
The following options may be added when we identify concrete use cases:
- execution.async-state.order-with-same-key: Record-ordered (or state-ordered)
- execution.async-state.event-order.watermark: Out-of-order (or Strictly-ordered)
- execution.async-state.event-order.*: Strictly-ordered (or Out-of-order) —— for non-records other than watermark and checkpoint barrier
- More type will be added if we do need the out-of-order optimization for some else non-record events.
Performance analysis
PoC Test
A PoC (repo: https://github.com/ververica/flink-poc/tree/disagg-poc-2) was developed to validate the performance, the PoC running instructions are provided in the appendix in FLIP-423 for those interested in exploring the setup further.
Environment Setup:
- Version: based on Flink 1.19
- Deployment mode: yarn per-Job
- Flink yarn cluster:
- 1 master 2 workers
- specifications:
- master: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
- worker: ecs.i2g.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
- HDFS cluster:
- 1 master 2 workers
- specifications:
- master: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
- worker: ecs.i2g.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
- yarn cluster and HDFS cluster are on the same LAN
- State backend: ForSt (Based on RocksDB 8.5.3)
- Job config:
- Memory: Task Manager 3GB, Job Manager 1600MB
- checkpoint: Disabled (Not fully supported in PoC yet)
- Stage 1: populate different data to make the state size reach a certain size.
- Stage 2: read/write the data from the key space of Stage 1, make the state size stable within a certain range.
- Max TPS results are measured in Stage 2.
- Job parameter && configuration
- Read/Write ratio: 1
- RocksDB/ForSt block cache: 512 MB
Max TPS Results(With Asynchronous Execution FLIP-425; Without IO Grouping FLP-426)
Local File Cache | TPS | State Size | ||
1 | Synchronous Execution + Local disk | - | 17.1 K | 5.52 G |
2 | Synchronous Execution + HDFS * | - | 0.85 K | 1 G * |
3 | Asynchronous Execution + HDFS | 0 GB (0) | 7 K | 5.8 G |
4 | ♾️ GB (100%) | 42.7 K | 5.8 G |
*: Performance of Synchronous API + HDFS is extremely poor, and it is hard to accumulate enough state size. The results are measured at smaller data size (1G instead of 5G)
Based on the PoC results, we can draw several key conclusions regarding the performance implications:
DFS as primary storage exhibits more than 95% TPS decrease: Utilizing DFS as the primary storage solution significantly hinders performance. Compared to the baseline (Synchronous API + Local Disk using RocksDB), disaggregated state storage without optimization exhibits a performance degradation exceeding 95% in terms of TPS.
Asynchronous Execution Model exhibits 2.5x TPS improvements: The introduction of asynchronous state APIs offers substantial performance improvements. In an extreme case with all data cached locally (line 4), this model achieves a 2.5x increase in TPS compared to the baseline.
HDFS with Asynchronous Execution achieves 40% of the baseline performance: As an initial proof-of-concept (PoC) result (line 3), asynchronous execution with HDFS without caching achieves 40% of the baseline performance. We anticipate further improvements through optimizations like I/O grouping (FLIP-426) and leveraging the local disk as a secondary cache (FLIP-429). These advancements aim to bring HDFS performance closer to that of the local disk setup.
Per-record overhead
As mentioned above, to ensure the correctness of data under asynchronous state API, we have introduced some changes for operators. Compared with synchronous execution, these changes have some overhead on critical paths:
- Epoch Manager: In order to ensure the semantics of the watermark, the Epoch Manger numbers each input record to determine the association between the record and the watermark. Nonetheless, Epoch Manager is only needed in asynchronous state access and event-time scenarios, and in theory, this overhead is relatively controllable.
- Key Accounting Unit: All state accesses under each record need to be checked by the Key Accounting Unit before execution, which will increase CPU overhead. When using Disaggregated State Storage, the bottleneck is generally in I/O rather than CPU, and the overhead caused by the Key Accounting Unit is less likely to become a bottleneck.
Strictly-ordered mode vs. Out-of-order mode for watermark processing
A performance test is also conducted evaluating how much enhancement we can achieve by enabling out-of-order for watermark processing. The test job is WordCount with a simple WatermarkStrategy, which code can be found here. The environment setup is basically identical with the PoC test above except for the state size.
| State Size | Local File Cache | TPS for Strictly-ordered mode | TPS for out-of-order mode | |
|---|---|---|---|---|
| 1 | ~140MB | 0% | 25.7 K | 43.8 K +70% |
| 3 | ~5.6 GB | 0% | 5.6 K | 6.5 K +16% |
As we can see the out-of-order mode outperforms the strictly-ordered mode, the effect is more obvious when the CPU becomes the bottleneck.
Synchronous execution with asynchronous APIs
We adapt the existing states of the Hashmap state backend and RocksDB state backend to the asynchronous state API to test the overhead of the asynchronous state API.
In detail, we wrap the original state and return a completed StateFuture. Taking "heap value state" as an example, the pseudo-code is as follows:
public class AsyncHeapValueState<K, N, SV> implements AsyncValueState<SV> {
private HeapValueState<K, N, SV> heapValueState;
public AsyncHeapValueState(HeapValueState<K, N, SV> heapValueState) {
this.heapValueState = heapValueState;
}
@Override
public StateFuture<SV> value() {
SV value = heapValueState.value();
StateFuture future = new SyncExeStateFuture<>();
future.complete(value);
return future;
}
@Override
public StateFuture<Void> update(SV value) {
heapValueState.update(value);
StateFuture future = new SyncExeStateFuture<>();
future.complete(null);
return future;
}
}
This approach requires maintaining additional StateFuture in the JVM heap, which increases CPU and memory overhead. 2 stage word count is used to test, and checkpoint is disabled, the performance comparison is as follows:
key number | heap size | state size | TPS | |
Hashmap | 1 M | 1.35 GB | 39.3 MB | 974.5 K |
Hashmap with async API | 1 M | 1.48 GB | 39.3 MB | 925.3 K -5% |
RocksDB | 40 M | 1015 MB | 683 MB | 267 K |
RocksDB with async API | 40 M | 1016 MB | 683 MB | 256.4 K -3.9% |
From the above table, the asynchronous APIs can bring ~5% TPS regression, and the impact on Hashmap state backend is more significant. More variables in java heap bring more JVM GC, while the Hashmap state backend requires more java heap memory than the RocksDB state backend.
Profiling: For test 'Hashmap with async API' we managed to capture a flame graph, which reveals that the framework overhead (including reference counting, future-related code and so on) consumes about 9% of the keyed operator cpu time. We consider it acceptable given that it is a pure memory state access and the calculation is quite simple (only plus 1 for each record). The overhead will be negligible when involving slow I/O access and asynchronous execution.
Compared with Batch-style APIs
This FLIP proposes a future-style APIs for single key-value accessing, while another alternative is to provide batch-style APIs, leaving the data gathering work to user. The advantage of batch-style APIs is that users may have some prior knowledge that can help reduce the state access when batching. But batch-style APIs lack flexibility and are difficult to use. With future-style APIs, user can still buffer inputs in advance and process iteratively within a batch in their code. The potential upper limit of performance for batch-style APIs and asynchronous APIs have no much difference. Compared with batch-style APIs, user could write their code more easily since they deal with one record with future-style APIs.
Compatibility, Deprecation, and Migration Plan
The newly introduced API is designed to coexist alongside the existing API. The code path for the new API is completely independent from the original one. We strongly recommend that users stick to either the new API or the original API exclusively, rather than mixing their usage. Although using both the asynchronous and synchronous APIs simutaneously won't cause compatibility issues, it may lead to suboptimal performance. The synchronous API can block the task thread until its execution is complete, potentially resulting in a performance regression when compared to using the asynchronous state calls exclusively.
As a subsequent step following this FLIP, all SQL operators will be transitioned to access state asynchronously. Additional FLIP(s) will be proposed to facilitate this transition once the current FLIP has been fully integrated.
Test Plan
New UT/ITs will be introduced for each module described in the section "Proposed Changes". New E2E tests of jobs using asynchronous state APIs will also be delivered.
Rejected Alternatives
Same as the "Rejected Alternatives" section in FLIP-424.
Appendix:
How to run Synchronous execution with asynchronous APIs
The modifications about “Synchronous execution with asynchronous APIs“ are mainly in this commit, it allows you to use asynchronous APIs to manipulate the state of an existing state backend.
- If you don't care about the performance testing, this test can directly run in the IDE.
- If you are curious about the performance, it is recommended to run a job on a cluster.
For performance testing:
Please refer to appendix in FLIP-423 for the previous steps about compiling.
Step 1: Update the state backend config
Modify the state backend in flink-conf.yaml
state.backend: hashmap # For Hashmap State backend # state.backend: rocksdb # For RocksDB state backend
Step 2: Write a job that uses asynchronous APIs, or you can directly use the word count we provide.
Step 3: Run the job. This script(submit-job.sh) may help.
Follow up: Generalize the Async execution model to support all keyed-ordered asynchronous processing
We realized that apart from state access, there are some actual need for asynchronous processing which also requires record order by key. It would be beneficial to generalize and popularize this model for all similar operators. A good example is the one in FLIP-519, where asynchronous look-ups are performed in order of keys. So we'd do some refactoring to existing code, keeping current design unchanged but serve for not only asynchronous state access. This includes:
- Generalize the "action", "callback" and "executor" types of the model, abstract the core part and providing the capability for inheritance.
- Providing a inner operator with higher abstraction for keyed ordered processing, make current operator of async state processing inherit it.






