Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

                        https://lists.apache.org/thread/cmkh3n6l86vx4v16chhrjr22g6w0zort                         

Vote threadhttps://lists.apache.org/thread/6xfqt3m1tbornkqq1p7twrzybqwhwprh
JIRA


Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In general, Broadcast Hash Join is currently the most efficient join strategy available in Flink. However, its prerequisite is that the input data on one side must be sufficiently small; otherwise, it may lead to memory overuse or other issues. Currently, due to the lack of precise statistics, it is difficult to make accurate estimations during the Flink SQL Planning phase. For example, when an upstream Filter operator is present, it is easy to overestimate the size of the table, whereas with an expansion operator, the table size tends to be underestimated. Moreover, once the join operator is determined, it cannot be modified at runtime.

To address this issue, we plan to introduce Adaptive Broadcast Join capability based on the StreamGraph job submission and dynamic optimization framework as outlined in FLIP-468 and FLIP-469. This will allow the join operator to dynamically optimize to Broadcast Join based on the actual input data volume at runtime and fallback when the optimization conditions are not met.

Public Interfaces

New Configuration Option

TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_THRESHOLD
public static final ConfigOption<AdaptiveBroadcastJoinStrategy> TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY =
            key("table.optimizer.adaptive-broadcast-join.strategy")
                    .enumType(AdaptiveBroadcastJoinStrategy.class)
                    .defaultValue(AdaptiveBroadcastJoinStrategy.AUTO)
                    .withDescription(
                            "Flink will perform broadcast hash join optimization when the runtime statistics on one side of a join operator 
                             is less than the threshold `table.optimizer.join.broadcast-threshold`. The value of this configuration option 
                             determines when Flink should perform this optimization. AUTO means Flink will automatically choose the timing 
                             for optimization, RUNTIME_ONLY means broadcast hash join optimization is only performed at runtime, and NONE 
                             means the optimization is only carried out at compile time.");

Proposed Changes

Overview

Adaptive Broadcast Join optimization involves the following components:

  1. Identifying and replacing eligible join operators.

  2. Dynamically generate join operator.
  3. Optimizing the StreamGraph through the AdaptiveBroadcastJoinOptimizationStrategy.

Among these, (1) occurs during the Table Planning phase and (2) (3) takes place at runtime.

Identifying and replacing eligible join operators

We plan to inject the adaptive join node into the ExecNode DAG, identifying and converting join operators that have not been statically optimized to broadcast join but meet optimization criteria into BatchExecAdaptiveJoin nodes. The conversion conditions are as follows:

  1. The execution mode is batch and AdaptiveBatchScheduler is enabled.

  2. Broadcast join is not disabled.

  3. No specific join method is specified by a hint.

  4. If one side of the HashJoin has already been optimized to forward (usually because the upstream has already grouped by the corresponding key), there is no need to change it to BroadcastHashJoin. However, if it is a SortMergeJoin, it will still be replaced to eliminate the overhead of sorting.

  5. Downstream may rely on the hashed inputs of the join operator for proper grouping (e.g., consecutive joins or aggregations based on the same key). In such cases, the upstream side cannot be replaced.

Introducing AdaptiveJoinNodeProcessor

The operation of identifying and replacing join operators is handled by the AdaptiveJoinNodeProcessor. We will add it to the end of the ExecNodeGraphProcessors processing chain. By traversing the entire ExecNodeGraph, all eligible join operators will be converted into BatchExecAdaptiveJoin nodes.

The purpose of placing the AdaptiveJoinNodeProcessor at the end of the Processors chain is as follows:

  1. To be able to identify MultiInputNode. Considering the complexity of the implementation and the combined benefits of optimization, we do not currently plan to optimize join operators within MultiInputNode to broadcast hash join. A more reasonable approach would be to delay the optimization of MultiInput until runtime. This way, once the broadcast hash join is determined at runtime, the MultiInput optimization can then take place. This will be part of a future roadmap.

  2. We aim to be aware of changes brought by the ForwardHashExchangeProcessor to avoid logical errors in downstream nodes. Specifically, in scenarios where continuous hashes are rewritten to forward, we do not want the initial necessary hash to be replaced by a forward, which could result in subsequent logical errors.


Introducing BatchExecAdaptiveJoin

As mentioned above, we will introduce BatchExecAdaptiveJoin as the implementation of the AdaptiveJoin operator at the ExecNodeGraph stage.

BatchExecAdaptiveJoin needs to meet the following requirements:

  1. Align parameters with the current mainstream Flink join operators to facilitate the flexible switching of join operators.

  2. Record the original join type to enable fallback if the broadcast conditions are not met.

  3. During the Transformation generation phase, an AdaptiveJoinOperatorFactory will be created. It is important to note that the JoinOperator will not be created during the table planning phase. Instead, it will be instantiated at runtime based on the actual size of the input data.
  4. Record the edges that can be converted to Broadcast, according to the specific rules below.

Join Type            

Left

Right

Inner

FullOuter

Semi

Anti

LeftOuter

RightOuter


Dynamically generate join operator

AdaptiveJoinOperatorFactory

The AdaptiveJoinOperatorFactory is responsible for dynamically creating the appropriate JoinOperator based on the optimization made to the StreamGraph. It inherits from AbstractStreamOperatorFactory and needs to implement the following methods:

  1. getPotentialBroadcastSides(): Gets the sides that can potentially be optimized for broadcasting.

  2. markAsBroadcastJoin(int inputId): Marks the join as a broadcast join and sets the input side to be broadcasted.

It is important to clarify that we do not intend to introduce a new AdaptiveJoinOperator; instead, we will reuse the existing join operators. Accordingly, in terms of implementation, we will delay the codegen and creation of the join operators until runtime. This means that AdaptiveJoinOperatorFactory will not immediately create a JoinOperator during the Table planning phase, but will selectively create the optimal join operator at runtime based on the actual input data size.

Additionally, when the JoinType is InnerJoin, either the left or right side can potentially be adjusted to the build side. Therefore, during the runtime phase, the build side will be calibrated to ensure that the side with the smaller data volume becomes the build side.

AdaptiveBroadcastJoinOptimizationStrategy

The AdaptiveBroadcastJoinOptimizationStrategy is responsible for adjusting the StreamGraph at the runtime stage before the scheduling of the AdaptiveJoin operator, determining whether to optimize for broadcast join based on the actual input data size from upstream. The execution steps are as follows:

  1. When upstream nodes finish execution, a OperatorsFinished event is triggered.

  2. AdaptiveBroadcastJoinOptimizationStrategy optimization is triggered and it will traverse the downstream connections of nodes. If an AdaptiveJoinOperatorFactory is present, it continues with the assessment; otherwise, it ends the traversal and returns.

  3. It retrieves potential edges that could be optimized for broadcast join using the AdaptiveJoinOperatorFactory::getPotentialBroadcastSides() method.

  4. It obtains the output data size of the completed upstream nodes from the OperatorsFinished event.

  5. If the input data size is smaller than the threshold and the StreamEdge meets the replacement criteria, it replaces the input edge with a Broadcast Partitioner and calls the AdaptiveJoin::markAsBroadcastJoin method to notify the AdaptiveJoinOperatorFactory of the broadcast optimization; And the other side is replaced with a rescale partitioner.  The reason for using a rescale partitioner here is to prevent the AdaptiveBatchScheduler from inferring the join operator's parallelism to be the same as the upstream operator, thereby losing the ability to adaptively infer parallelism. Additionally, in cases of data skew, using a rescale partitioner allows for subsequent skewedJoin optimization.

  6. The Scheduler continues to schedule the operator node. When executing the AdaptiveJoinOperatorFactory::createStreamOperator method, it creates the appropriate JoinOperator based on the actual situation.


Limitation

  1. Effective only for batch jobs using the AdaptiveBatchScheduler.

  2. Join operators within MultiInput cannot participate in the optimization.

Future work

  1. MultiInput optimization can be delayed until runtime, making adaptive broadcast join effective in MultiInput scenarios.

  2. Building on the first point, combine OpFusionCodegen with adaptive broadcast join.


Compatibility, Deprecation, and Migration Plan

This FLIP does not introduce any compatibility issues.

Due to the current implementation not supporting multiple input scenarios, at this stage, adaptive broadcast join serves as a complement to static broadcast join. That is, join operators not statically optimized as broadcast join may achieve broadcast join optimization at runtime.  However, if a join operator is mistakenly optimized into a broadcast hash join during the compilation phase, it will not undergo de-optimization at runtime.

Users can apply adaptive broadcast join optimization without additional configuration. 

Test Plan

The proposed changes will be tested for correctness and stability in a real cluster.

  • No labels