Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

                       https://lists.apache.org/thread/2plf17tcl7znhj1gxmp0mb47dx96g6tc                        

Vote threadhttps://lists.apache.org/thread/hcwl4hsbtpsoyvxfbd1bm3w77264ns94
JIRA

FLINK-36333 - Getting issue details... STATUS

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In a Join query, when certain keys occur frequently, it can lead to an uneven distribution of data across partitions. This may affect the execution performance of Flink jobs, as a single partition with skewed data can severely downgrade the performance of the entire job. To ensure that data is evenly distributed to downstream tasks, we can use the statistics of the input to split (and duplicate if needed) skewed and splittable partitions into balanced partitions at runtime. However, currently, Flink is unable to accurately determine which partitions are skewed and eligible for splitting at runtime, and it also lacks the capability to split data within a specific join key.

To address this issue, we plan to introduce Adaptive Skewed Join Optimization capability. This will allow the Join operator to dynamically split partitions that are skewed and splittable based on the statistics of the input at runtime, reducing the long-tail problem caused by skewed data. This FLIP is based on FLIP-469 and also leverages capabilities introduced in FLIP-470.

Public Interfaces

New Configuration Options

Key

Type

Default Value

Description

table.optimizer.skewed-join-optimization.strategy

enumType

AdaptiveSkewedJoinStrategy.AUTO

Flink will handle skew in shuffled joins (sort-merge and hash) at runtime by splitting data corresponding to the skewed join key. The value of this configuration determines how Flink performs this optimization. "AUTO" means Flink will automatically apply this optimization, "FORCED" means Flink will enforce this optimization even if it introduces extra hash shuffle, and "NONE" means this optimization will not be executed.

table.optimizer.skewed-join-optimization.skewed-factor

Double

4.0

During the join phase, Flink will automatically reduce the ratio of the maximum to median concurrent task processing data volume to below the skewed-factor and will also achieve a more balanced data distribution, unless the maximum value is below the skewed-threshold. Note that this optimization has additional overhead, and after balancing, there may still be a 2x difference in data volume. Users can adjust this parameter based on the 'Read Bytes' metric. We recommend that this value be set to no less than 3 when the left and right tables have similar data volume, and to no less than 2 in other cases.

table.optimizer.skewed-join-optimization.skewed-threshold

MemorySize

MemorySize.ofMebiBytes(256)

During the join phase, when the maximum data volume processed by a concurrent task is greater than the skewed-threshold, Flink can automatically reduce the ratio of the maximum data volume processed by a concurrent task to the median to less than the skewed-factor and will also achieve a more balanced data distribution. Note that this optimization has additional overhead, and it is not recommended to set this value to less than 256MB.  Users can adjust this parameter based on the 'Read Bytes' metric. 

Proposed Changes

Overview

The optimization process for Adaptive Skewed Join is as follows:

  1. Identify eligible join operators, replace them with AdaptiveJoin, and mark splittable input edges.

  2. At runtime, adjust the StreamGraph by AdaptiveSkewedJoinOptimizationStrategy. If the input of the AdaptiveJoin operator has data skew and is eligible for splitting, mark the input edge in the StreamGraph as splittable.

  3. Based on the adjusted StreamGraph, distribute the data evenly.

Identify and replace the eligible Join operators

The purpose of this part is to enable Flink to get the necessary information required for adaptive skewed join optimization during runtime, such as the type of join and the optimizable input sides.

FLIP-470 introduces the capability to convert Join operators to AdaptiveJoin operators. Based on this capability, we will convert operators that meet the conditions for adaptive skewed join optimization. The conversion conditions are as follows:

  1. The execution mode is batch and AdaptiveBatchScheduler is enabled.

  2. The option 'execution.batch.adaptive.skewed-join.optimization.enable' is set to true.

Since Adaptive Skewed Join and Adaptive Broadcast Join both convert eligible operators to AdaptiveJoin operators, but have different conversion conditions, we need to divide the conversion process into two steps to avoid conflicts:

  1. Convert the Join operators that meet the conversion conditions for either Adaptive Skewed Join or Adaptive Broadcast Join to AdaptiveJoin operators.

  2. Populate the optimizable edge information for the AdaptiveJoin operators:

    1. If the operator meets the replacement conditions for Adaptive Skewed Join, record the information for edges that can be optimized for Skewed Join according to the rules; otherwise, set all optimizable edges to the default value (False).

    2. If the operator meets the replacement conditions for Adaptive Broadcast Join, record the information for edges that can be optimized for Broadcast according to the rules; otherwise, set all potential broadcast edges to the default value (False).

Improvement of BatchExecAdaptiveJoin

BatchExecAdaptiveJoin is the implementation of the AdaptiveJoin operator at the ExecNodeGraph stage,and we need to introduce the capability to record edges that meet the conditions for Adaptive Skewed Join optimization for it.

For different types of Join operators, the conditions of the input edge to perform Adaptive Skewed Join optimization depends on whether the data corresponding to a specific join key can be split and sent to different downstream join concurrent instances. The specific rules are as below:

Join Type

Left

Right

Inner

LeftSemi

LeftAnti

LeftOuter

RightOuter

FullOuter

Improvement of AdaptiveJoinOperatorFactory

We plan to introduce the capability to query optimizable skewed sides for the AdaptiveJoinOperatorFactory which was introduced in FLIP-470 and need to implement the following methods:

AdaptiveJoinOperatorFactory
class AdaptiveJoinOperatorFactory {
  ...
  
  /**
   * Returns the input ids of the join input that can be optimized for skewed join.
   *
   * @return the input ids.
   */
  List<Integer> getOptimizableSkewedSides();

  ...
}

Note that we only introduce the capability to query optimizable skewed sides for the AdaptiveJoinOperatorFactory and won't change its original behaviors.

AdaptiveSkewedJoinOptimizationStrategy

The AdaptiveSkewedJoinOptimizationStrategy is responsible for adjusting the StreamGraph at the runtime stage before the scheduling of the AdaptiveJoin operator, determining whether to optimize for skewed join based on the skewness of the actual input data from upstream and the type of input edge. The execution steps are as follows:

  1. When upstream nodes finish execution, a OperatorsFinished event is triggered.

  2. AdaptiveSkewedJoinOptimizationStrategy identifies the OperatorsFinished event and traverses the downstream connections of nodes. If an AdaptiveJoinOperatorFactory operator is present, it continues with the assessment; otherwise, it ends the traversal and returns.

  3. It retrieves potential edges that could be optimized for skewed join using the AdaptiveJoinOperatorFactory::getOptimizableSkewedSides() method.

  4. It obtains the size and distribution of the output data of the completed upstream nodes from the OperatorsFinished event.

  5. If there is data skew in the optimizable input edge, then:

    1. Determine whether the Partitioner of the output StreamEdge is a ForwardForConsecutiveHashPartitioner:

      1. If the Partitioner is a ForwardForConsecutiveHashPartitioner, in order to avoid impacting the data correctness of the downstream task, we need to change it to the HashPartitioner and mark the input StreamEdge as splittable in the StreamGraph. However, this will introduce additional hash shuffle which may lead to performance regression. Therefore, the partitioner is changed only if the configuration 'execution.batch.adaptive.skewed-join.optimization.forced' is set to 'true'; otherwise, it will exit and return.

      2. If the Partitioner is not a ForwardForConsecutiveHashPartitioner, then directly mark the input StreamEdge as splittable in the StreamGraph.

    2. Mark the operator as skewed in the StreamGraph.

  6. The Scheduler continues to schedule the AdaptiveJoinOperator. In the VertexParallelismAndInputInfosDecider, an appropriate algorithm will be selected for balanced data distribution based on the adjusted StreamGraph.

Note that the AdaptiveSkewedJoinOptimizationStrategy will execute after the AdaptiveBroadcastJoinOptimizationStrategy, as the AdaptiveBroadcastJoinOptimizationStrategy may modify the type of Partitioner, while the AdaptiveSkewedJoinOptimizationStrategy only applies to specific types of Partitioner.

Balanced data distribution

The existing data distribution algorithms in Flink impose strict limitations on joins, requiring that data corresponding to the specific join key must be sent to the same downstream for processing. This restricts the adaptability of data distribution.

For some join queries, it is not always necessary to strictly follow this rule for data distribution. For example:

  • For an Inner Join, as long as we ensure that all data can be traversed and processed in the form of a Cartesian product, the data correctness of the operator can be guaranteed. Therefore, we can split the data on one side while duplicating the data with the same join key on the other side.

  • For a Left Outer Join, the right side cannot be split, as this would lead to data correctness issues. However, the left side can be split. Therefore, we can split the data on the left side while duplicating the data with the same join key on the right side.

To describe the behavior of split and duplication more clearly, we abstract it into two types of relationships:

  1. IntraInputKeyCorrelation: For this input, the data corresponding to a specific join key must be sent to the same downstream subtask.

  2. InterInputsKeyCorrelation: There are relationships between multiple inputs, if the data corresponding to a specific join key from one input is split, the corresponding join key data from the other inputs must be duplicated (meaning that it must be sent to the downstream nodes where the split data is sent).

For join queries, there is always InterInputsKeyCorrelation among all input streams. However, the existence of IntraInputKeyCorrelation depends on the operator itself.

We will introduce a more balanced data distribution algorithm for Flink based on these two types of relationships. For joins, the data distribution results for different IntraInputKeyCorrelation are shown in the figure below:

Limitation

Adaptive Skewed Join Optimization is implemented based on FLIP-469 and FLIP-470, and therefore is also subject to the limitations outlined in FLIP-469 and FLIP-470:

  1. Effective only for batch jobs using the AdaptiveBatchScheduler.

  2. Join operators within MultiInput cannot participate in the optimization.

Future work

MultiInput optimization can be delayed until runtime, making adaptive skewed join effective in MultiInput scenarios.

Compatibility, Deprecation, and Migration Plan

Users can apply adaptive skewed join optimization without any additional configuration.

Test Plan

The proposed changes will be tested for correctness and stability in a real cluster.

  • No labels