Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleTABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE
public static final ConfigOption<InferMode> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE =
           .key("table.exec.hive.infer-source-parallelism.mode")
                   .enumType(InferMode.class)
                   .defaultValue(InferMode.DYNAMIC)
                   .withDescription(
                     "An option for selecting the hive source parallelism
                     inference mode: 'static' denotes static inference, where 
                     flink will decide source parallelism at job creation stage; 
                     'dynamic' represents dynamic inference, which will decide 
                     parallelism at job execution stage. In terms of functionality, 
                     the static mode is a subset of the dynamic mode.
                     Note: it only works when the value of option 
                     'table.exec.hive.infer-source-parallelism.enabled' is 
                     true.");

In addition, the current configuration option `table.exec.hive.infer-source-parallelism` acts not only as a switch to enable or disable hive source parallelism inference but also as a prefix for other configuration options (e.g. `table.exec.hive.infer-source-parallelism.max`). This practice does not conform to the standards of YAML specifications. Therefore, we plan to address this issue in this FLIP by introducing a new configuration, `table.exec.hive.infer-source-parallelism.enabled`, as a replacement. The old configuration `table.exec.hive.infer-source-parallelism` will continue to be effective but will be marked as deprecated. By the way, the default value for the new configurations is the same as the old ones, which is true, meaning that the automatic parallelism inference is enabled by default.


Code Block
languagejava
themeEclipse
titleTABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_ENABLEDInferMode
public staticenum finalInferMode ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_ENABLED =implements DescribedEnum {
        STATIC("static", text("Static parallelism  key("table.exec.hive.infer-source-parallelism.enabled")inference mode for hive source.")),
        DYNAMIC("dynamic", text("Dynamic parallelism inference mode for hive source."));
        NONE("none", text("Disable parallelism inference for hive source.booleanType("));

        private final String value;

         .defaultValue(true)
private final InlineElement description;

        InferMode(String value, InlineElement description) {
           .withDeprecatedKeys("table.exec.hive.infer-source-parallelism") this.value = value;
            this.description = description;
      .withDescription(  }

        @Override
        public String toString() {
         "If is false, parallelismreturn ofvalue;
 source are set by config.\n"
   }

        @Override
        public InlineElement getDescription() {
            return description;
 + "If is true, source parallelism is inferred}
 according to splits number.\n"); }

In addition, the current configuration option `table.exec.hive.infer-source-parallelism` acts not only as a switch to enable or disable hive source parallelism inference but also as a prefix for other configuration options (e.g. `table.exec.hive.infer-source-parallelism.max`). This practice does not conform to the standards of YAML specifications. Besides, with the introduction of `table.exec.hive.infer-source-parallelism.mode`, we can disable parallelism inference by specifying InferMode.NONE, thus substituting the function of `table.exec.hive.infer-source-parallelism`. Therefore, we plan to mark `table.exec.hive.infer-source-parallelism` as Deprecated in this FLIP. Before this configuration option is completely phased out, `table.exec.hive.infer-source-parallelism` will still serve as the master switch to enable or disable parallelism inference.

Behavior Change

After introducing this FLIP, the behavior changes are as follows:

  1. The default hive source parallelism inference mode will change to dynamic inference.
  2. Upon enabling dynamic inference mode, the logic for applying the upper bound of parallelism inference will change. In static inference mode, hive source takes the value of `table.exec.hive.infer-source-parallelism.max` as the upper bound for inference, if it is not set, the config option's default value 1024 1000 will be used. However, in dynamic inference mode, if the user explicitly configures `table.exec.hive.infer-source-parallelism.max`, we will still use the user-configured value as the upper bound for parallelism inference. Otherwise, we will use `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the upper bound for parallelism inference. If neither of these configuration items is set, the upper bound for parallelism inference will follow the existing adaptive batch scheduler logic, falling back to `execution.batch.adaptive.auto-parallelism.max-parallelism` or `parallelism.default`. 

...

Below is the flowchart for the hive source parallelism inference after the introduction of this FLIP.

Image RemovedImage Added

Compatibility, Deprecation, and Migration Plan

A new configuration option `table.exec.hive.infer-source-parallelism.mode` will be introduced to distinguish between the static and dynamic modes. After this FLIP, users can still revert to static inference by setting `table.exec.hive.infer-source-parallelism.mode` to staticInferMode.STATIC, or they can disable parallelism inference altogether by using setting `table.exec.hive.infer-source-parallelism.mode` to InferMode.NONE or setting `table.exec.hive.infer-source-parallelism.enabled` to false.

Due to the non-compliance of the configuration `table.exec.hive.infer-source-parallelism` with YAML standard specifications, a new configuration `table.exec.hive.infer-source-parallelism.enabled` will be introduced as a replacement. The old configuration will be marked as deprecatedit will be marked as deprecated.

Limitations

It only works for batch jobs which use AdaptiveBatchScheduler.

...