DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Accepted""
| Discussion thread: | https://lists.apache.org/thread/z3th724l6vnylgv601gvcbdy4oy2wy7r |
|---|---|
| Vote thread: | https://lists.apache.org/thread/q1rksclrjllpqjvkklfqgp5dx3j2t38w |
| JIRA: | FLINK-37874 - Getting issue details... STATUS |
| Released: |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Users have requested support in async IO for sequential processing under the same key while enabling concurrency across different keys when performing lookup join [1]. Similarly, there is a requirement to process records in order based on upsertKey (which is the unique key in the stream deduced by table planner) while allowing concurrent processing of different upsertKeys to achieve better throughput when processing update stream in Flink SQL. Currently, the Flink system only supports record-level ordered output and unordered output for asynchronous lookup join which cannot ensure the order of process under the same key. This FLIP proposes the Async Key Ordered Lookup Join feature to address this need.
Public Interfaces
Introduce a new option called `table.exec.async-lookup.key-ordered-enabled` to control the enabling of Async Key Ordered Lookup Join.
public static final ConfigOption<Boolean> TABLE_EXEC_ASYNC_LOOKUP_KEY_ORDERED =
key("table.exec.async-lookup.key-ordered-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"When true async lookup joins would follow the upsert key order in update stream (if no upsert key, "
+ "total record is considered as upsert key). This feature does not works for append-only stream. ");
Current optimization will only be enabled when the input stream is a non-append-only stream, and both `table.exec.async-lookup.output-mode = "ALLOW_UNORDERED"` and `table.exec.async-lookup.key-ordered-enabled = "true"` are set.
table.exec.async-lookup.key-ordered-enabled | table.exec.async-lookup.output-mode | lookup join in same key ordered with different key unordered |
true | ALLOW_UNORDERED | yes |
false | ALLOW_UNORDERED | no |
true | ORDERED | no |
false | ORDERED | no |
Proposed Changes
Overview
The main purpose is to implement a lookup join that could process by the order of upsertKey (concurrent between keys) . The current solution is to introduce a operator in the DataStream module to support key ordered processing. And the capability of this operator is reused in the table module to perform lookup join by the order of upsert key.
Introduce the KeyedAsyncWaitOperator
The KeyedAsyncWaitOperator is an operator that supports processing in the order of user-defined keys. It uses a framework similar to the Asynchronous Execution Model in FLIP-425 to ensure this feature. The AbstractKeyedAsyncStreamOperator abstract the framework in the high level and KeyedAsyncWaitOperator inherit from the operator to finish the implementation.
@Internal
public class KeyedAsyncWaitOperator <IN, OUT>
extends AbstractKeyedAsyncStreamOperator<IN, OUT>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput{
}
KeyedAsyncWaitOperator VS AsyncWaitOperator
Data processing difference
The KeyedAsyncWaitOperator processes records and outputs results in the order of the key. In contrast, the AsyncWaitOperator processes all records asynchronously with results output either in record order or unorder. We use unordered output as an example.
State difference
KeyedAsyncWaitOperator uses keyed state while AsyncWaitOperator uses operator state.
Process in key ordered
As mentioned in the Asynchronous Execution Model framework introduced in FLIP-425, we would use the similar structure to process inputs with key ordered while enabling parallel processing for inputs with different keys.
Below is the sequence diagram:
Compatibility, Deprecation, and Migration Plan
The parameter `table.exec.async-lookup.key-ordered-enabled` is disabled by default, ensuring that this feature remains backward compatible.
Test Plan
New UT/ITs will be added to verify this feature.
Rejected Alternatives
Adaptation in AsyncWaitOperator
The core idea is to add the logic for outputting records in key order in the AsyncWaitOperator. This primarily involves introducing a new unordered queue implementation of the StreamElementQueue interface which groups and stores input records by key to ensure output is ordered by key.
The problem is it could lead to unordered processing for the records with the same key when concurrency changes. The root is AsyncWaitOperator currently uses operator list state. When concurrency changes, data with the same key may be processed on different subtasks.
If the AsyncWaitOperator were to switch to keyed state and use the entire row as the key in the existing ordered and unordered modes . This could result in records being reordered when the operator restores which could disrupt the order of records in the stream.
Adaptation in Async lookup join runner
The core idea is to add key ordered logic in the async lookup join runner which locates in the table module. This adaption do not need to alter the existing implementation logic of async lookup join in any operator.
The problem is it would encounter the same challenge as adaptation in AsyncWaitOperator.
Adaptation in table module
The core idea is introduce KeyedAsyncWaitOperator in the Table module.
The problem is the capability of preserving key ordered is limited to the Table module. And this capability could be extended to operators in the DataStream module to make it more generalized. For example, by adding a keyOrderedWait function to AsyncDataStream.
Appendix
[1] https://lists.apache.org/thread/wczzjhw8g0jcbs8lw2jhtrkw858cmx5n

