DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Discussion thread | https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0 https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf |
|---|---|
| Vote thread | https://lists.apache.org/thread/0yvspwhsf0vyvqkkd7snzg33k95v3jbc |
| JIRA | FLINK-34981 - Getting issue details... STATUS |
| Release | 2.0 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know the whole story.
Motivation
I/O speed and latency are critical for overall data throughput, particularly in jobs that manage large states. Implementing multiple asynchronous I/O operations is a proven strategy to enhance throughput by increasing parallelism of I/O execution. However, simply expanding I/O parallelism can quickly hit a ceiling due to finite I/O bandwidth. Additionally, when it comes to remote storage access, the time taken for RPC round trips significantly outweighs the impact of I/O size on individual I/O performance. So a promising optimization is to merge adjacent I/O requests into a single operation and fetch multiple keys with one I/O call. This approach requires a pre-prepared batch of keys for the query and the identification of I/O operations that can be combined. In this FLIP, we focus on the implementation details for batching state requests and processing them in batches.
This FLIP firstly describes a way of batching in the Asynchronous Execution Controller (AEC) introduced in FLIP-425. And then for a batch of state accesses, this FLIP suggests executing smaller sub-batches in parallel while also merging neighboring I/O requests to minimize the number of I/O operations and reduce overall latency. This approach is expected to significantly boost the efficiency of state I/O, thereby enhancing the processing throughput.
Proposed Changes
As mentioned in FLIP-425 , there are two buffers in Async Execution Controller (AEC) for state accessing requests (as shown in Fig-1), apart from keeping the order of inputs, they also help reduce the overall latency.
Fig-1. Overview of Batch Execution for State
There are different state tables and different request types (PUT/GET/ITERATOR/DELETE) in two buffers. The accumulation of buffer is described in FLIP-425, where the order of incoming requests should be preserved. When the Active Buffer reaches the specified size, we group all requests in Active Buffer according to the request type and state tables and then put them into execution. The following are some implementation details.
Data Structure of Batch State Requests
We propose several classes to encapsulate individual user requests and define an interface that the executor should implement to support batched state execution. These classes and interfaces are internal, ensuring a clear border between the batching and execution layers. It allows for the potential integration of other StateBackends within the asynchronous State API execution framework if they implement the execution interface of batch.
@Internal
public class StateRequest<S, K, N, UK, V, F> {
S state;
RequestType type;
K key;
N namespace;
UK userKey;
@Nullable V value;
StateFuture<F> stateFuture;
private StateRequest(S state, RequestType type, K key, @Nullable V value, InternalStateFuture<F> stateFuture) {
this.state = state;
this.type = type;
this.key = key;
this.value = value;
this.stateFuture = stateFuture;
}
public enum RequestType {
VALUE_GET,
VALUE_PUT,
LIST_GET,
LIST_PUT,
LIST_ADD,
LIST_ITER,
MAP_GET,
MAP_PUT,
MAP_ITER,
MAP_ITER_KEY,
MAP_ITER_VALUE
}
}
And the state executor service is defined like:
@Experimental
public interface StateExecutor<K> {
CompletableFuture<Boolean> executeBatchRequests(Iterable<StateRequest<?, K, ?, ?, ?, ?>> stateRequests);
}
Note that this is only a draft version that may change during implementation.
Categorize and Process
One batch of state access requests will contain different state operation (PUT/GET/ITERATOR/DELETE) from multiple state tables. It is ensured by the AEC that the requests have no order issue so they could be executed in parallel. The requests within a batch will be divided into multiple sub-groups based on different state tables and request types, so that we could merge isomorphic I/O requests into a single operation. For example, as illustrated in Fig-2, one batch contains multiple PUT/GET state operations from two tables, then all of state-1's GET operations will be grouped together, and aggregated into one MultiGet request which is executed by the StateExecutors to access the Remote State Storage. All PUT operations of state-1 will be grouped together, and aggregated into one WriteBatch request which is also executed by the StateExecutors.
Fig-2. Categorize and Process State Requests
Leverage RocksDB MultiGet
RocksDB supports MultiGet which could improve the performance by over 100% according to different data pipelines. It could mitigate the impact of IO latency by:
- reducing the read requests: It could only read the necessary data blocks which is determined by the set of files that a given batch of keys overlaps.
- reading asynchronously and in parallel as much as possible: It could read SST asynchronously and in parallel both in single level and multi level.
In execution of grouped isomorphic requests, it is intuitive to use the RocksDB MultiGet. However there are still some challenges using this feature:
- Currently, it is available only for PosixFileSystem. But we could still support FlinkFileSystem in FLIP-427 by proxying all async interfaces e.g. MultiRead, ReadAsync, poll, abort to corresponding methods of ForStFlieSystem and communicating with callback result handle, then ForStFlieSystem could read in parallel for different requests and notify RocksDB status.
- Build Jar with Async IO support. We need to build RocksDB jar with folly which MultiGet uses. This will introduce more complex compile procedure and code paths.
Based on the data structure of batch state, we could leverage RocksDB MultiGet to handle the batch requests to improve the performance greatly in some scenarios even if there are some challenges that we need to overcome, we'd like also to support this in the future.
Code Example on How to Access State Using Batch
private <N, V> CompletableFuture<Void> processMultipleValueGetRequests(
ForStValueState<K, N, V> valueState, List<StateRequest<?, K, ?, ?>> batchValueGetRequests) {
// Get the key List from the batch state requests.
List<K> requestKeys = batchValueGetRequests.stream().map(StateRequest::getKey).collect(Collectors.toList());
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Utilize multiGet to group state access IO
List<V> values = valueState.multiGet(requestKeys);
for (int i = 0; i < values.size(); i++) {
V value = values.get(i);
// Complete the stateFuture for each state request.
batchValueGetRequests.get(i).getFuture().complete(value);
}
}, stateIOExecutors);
return future;
}
@Internal
public class ForStValueState {
@Override
public List<V> multiGet(List<K> keys) throws IOException {
List<byte[]> keys = serializeKeysWithGroupAndNamespace(keys);
List<byte[]> values = backend.db.multiGetAsList(keys);
List<V> result = new ArrayList<>(values.size());
for (byte[] value : values) {
result.add(valueSerializer.deserialize(value));
}
return result;
}
}
Public Interfaces
Some configurable options will be added, which allow users to adjust the behavior of batching, such as the batching size and so on.
Compatibility, Deprecation, and Migration Plan
A whole new code path is introduced and is fully compatible with the original one.
Test Plan
New UT/ITs will be introduced for new introduced implementation above.
Rejected Alternatives
None.

