Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Flink streaming jobs, the large state of Join nodes has been a persistent concern for users. Since streaming jobs are long-running, the state of join generally increases in size over time. Although users can set the state TTL to mitigate this issue, it is not applicable to all scenarios and does not provide a fundamental solution. An oversized state can lead to a series of problems, including but not limited to:
Resource bottlenecks in individual tasks
Slow checkpointing, which affects job stability during the checkpointing process
Long recovery time from state
In addition, when analyzing the join state, we find that in some scenarios, the state within the join actually contains redundant data from the source tables.
To address this issue, we aim to introduce DeltaJoin, which is based on the core idea of leveraging a bidirectional lookup join approach to reuse source table data as a substitute for join state.
Example
Let's take the following example of a stream join.
We can optimize it as follows:
Scoping and Limitations
As a first version, we will define the scope in which it will be effective and its limitations.
Only support scenarios that involve wide table construction, meaning the sink table possesses the “ignore delete” attribute. This allows the entire plan to avoid the complexities associated with UPDATE_BEFORE and DELETE.
Don't include any other stateful operators in the plan besides joins, such as aggregation or ranking.
Public Interfaces
Configuration Options
A series of new configuration options related to the DeltaJoin will be introduced.
// whether to enable delta join // AUTO: Optimizer will try to use delta join first. If failed, it will fallback to regular join. // FORCE: Enforce to use delta join. If failed, an exception will be thrown. // NONE: Don't try to use delta join. // default is 'AUTO' table.optimizer.delta-join.strategy: AUTO/FORCE/NONE // whether to enable the cache in delta join // default is true table.exec.delta-join.cache-enable: true/false // the cache size of the left side in delta join // default is 10000L table.exec.delta-join.left-cache-size // the cache size of the right side in delta join // default is 10000L table.exec.delta-join.right-cache-size
New Interfaces used for ScanTableSource
/** * ... */ @PublicEvolving public interface SupportsDeltaJoin { /** * The source can select specific join keys, such as primary keys or indexes, to provide efficient lookup performance. */ List<String> acceptedLookupKeys(List<String> allJoinKeys); }
Proposed Changes
This FLIP will not dig into detailed implementation specifics, but it will offer a brief overview of the implementation approach.
When to convert Join to DeltaJoin
A new resolver will be introduced in the `StreamCommonSubGraphBasedOptimizer#postOptimize`
method. This resolver will optimize the Join in the plan that meet the specified pattern conditions into Delta Join. The reason this optimization is not implemented as a rule in the `PHYSICAL_REWRITE`
phase is that `PHYSICAL_REWRITE`
only has visibility into partial plans (`RelNodeBlock`
) and cannot access the entire global plan.
Deal with unexpected -U and -D in runtime
Since we anticipate that the initial version will not support UPDATE_BEFORE and DELETE, a special node DropRetract will been added after the source to directly discard these two types of data.
Benchmark
TBD
Compatibility, Deprecation, and Migration Plan
These APIs are newly introduced and do not involve compatibility issues.
Test Plan
HarnessTests and IT tests will be added to validate the changes.