Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleTABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE
public static final ConfigOption<InferMode> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE =
           ConfigOptions.key("table.exec.hive.infer-source-parallelism.mode")
                   .enumType(InferMode.class)
                   .defaultValue(InferMode.DYNAMIC)
                   .withDescription(
                     "An option for selecting the hive source parallelism
                     inference mode: 'static' denotes static inference, where 
                     flink will determine source parallelism at plan
                     generation time; 'dynamic' represents dynamic inference,
                     which will determine parallelism at runtime dynamically.
                     In terms of functionality, the static mode is a subset
                     of the dynamic mode.
                     Note: it only works when the value of option 
                     'table.exec.hive.infer-source-parallelism.enabled' is 
                     true.");


In addition, the current configuration option `table.exec.hive.infer-source-parallelism` acts not only as a switch to enable or disable hive source parallelism inference but also as a prefix for other configuration options acts not only as a switch to enable or disable hive source parallelism inference but also as a prefix for other configuration options (e.g. `table.exec.hive.infer-source-parallelism.max`). This practice does not conform to the standards of YAML specifications. Therefore, we plan to address this issue in this FLIP by introducing a new configuration item, `table.exec.hive.infer-source-parallelism.enabled`, as a replacement. The old configuration item `table.exec.hive.infer-source-parallelism`  will continue to be effective but will be marked as deprecatedwill continue to be effective but will be marked as deprecated.


Behavior Change

After introducing this FLIP, the behavior changes are as follows:

After introducing this FLIP, the behavior changes are as follows:

  1. The default inference mode will change to dynamic inference
  2. The default inference mode will change to dynamic inference
  3. .
  4. Upon enabling dynamic inference mode, the logic for applying the upper bound of parallelism inference will change. In static inference mode, hive source takes the value of `table.exec.hive.infer-source-parallelism.max`
  5.  as the upper bound for inference, if it is not set, the config option's default value 1024 will be used
  6. as the upper bound for inference, if it is not set, the config option's default value 1024 will be used. However, in dynamic inference mode, if the user explicitly configures `table.exec.hive.infer-source-parallelism.max`, we will still use the user-configured value as the upper bound for parallelism inference. Otherwise, we will use `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
  7. as the upper bound for parallelism inference. If neither of these configuration items is set, the upper bound for parallelism inference will follow the existing adaptive batch scheduler logic, falling back to
  8. as the upper bound for parallelism inference. If neither of these configuration items is set, the upper bound for parallelism inference will follow the existing adaptive batch scheduler logic, falling back to `execution.batch.adaptive.auto-parallelism.max-parallelism` or `parallelism.default`.
  9.  This could result in different hive source parallelism settings within user jobs when running under dynamic inference mode
  10. This could result in different hive source parallelism settings within user jobs when running under dynamic inference mode.

Proposed Changes

General Idea

...

HiveSource needs to implement the DynamicParallelismInference interface to support dynamic parallelism inference capabilities. Functionally, it needs to fulfill the following two points:    

  1. Include optimizations of currently available in static parallelism inference (e.g.

...

  1. PartitionPushDown,

...

  1. LimitPushDown,

...

  1. etc.).

...

  1.      
  2. When dynamic partition pruning optimization is present, it should be able to calibrate parallelism based on the dynamically filtered partition information.


Currently,

...

the logic for hive parallelism inference is primarily implemented in the HiveParallelismInference. Dynamic parallelism inference will also be based on the HiveParallelismInference to maximize the reuse of existing code logic. Additionally, the following logic needs to be added to the HiveParallelismInference:

...

  1. Add logic to evaluate the setting of `table.exec.hive.infer-source-parallelism.mode`

...

b. Determine the upper bound for parallelism inference under dynamic inference mode, as detailed in [Public Interfaces | Behavior Change].

...

  1. and choose the appropriate inference mode accordingly.
  2. Determine the upper bound for parallelism inference under dynamic inference mode, as detailed in [Public Interfaces | Behavior Change].

Below is the flowchart for the hive source parallelism inference after the introduction of this FLIP.

Compatibility, Deprecation, and Migration Plan

...