Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

FLIP-379 introduces dynamic source parallelism inference, which, compared to static inference, utilizes runtime information to more accurately determine the source parallelism. The FileSource already possesses the capability for dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP plans to implement the dynamic parallelism inference interface for HiveSource, and also switches the default static parallelism inference to dynamic parallelism inference.

Public Interfaces

New Config Option

To facilitate a smooth transition, it is planned to introduce a new configuration option:

...

In addition, the current configuration option `table.exec.hive.infer-source-parallelism` acts not only as a switch to enable or disable hive source parallelism inference but also as a prefix for other configuration options (e.g. `table.exec.hive.infer-source-parallelism.max`). This practice does not conform to the standards of YAML specifications. Therefore, we plan to address this issue in this FLIP by introducing a new configuration item, `table.exec.hive.infer-source-parallelism.enabled`, as a replacement. The old configuration item `table.exec.hive.infer-source-parallelism` will continue to be effective but will be marked as deprecated.


Behavior Change

After introducing this FLIP, the behavior changes are as follows:

  1. The default hive source parallelism inference mode will change to dynamic inference.
  2. Upon enabling dynamic inference mode, the logic for applying the upper bound of parallelism inference will change. In static inference mode, hive source takes the value of `table.exec.hive.infer-source-parallelism.max` as the upper bound for inference, if it is not set, the config option's default value 1024 will be used. However, in dynamic inference mode, if the user explicitly configures `table.exec.hive.infer-source-parallelism.max`, we will still use the user-configured value as the upper bound for parallelism inference. Otherwise, we will use `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the upper bound for parallelism inference. If neither of these configuration items is set, the upper bound for parallelism inference will follow the existing adaptive batch scheduler logic, falling back to `execution.batch.adaptive.auto-parallelism.max-parallelism` or `parallelism.default`. 

Proposed Changes

General Idea

HiveSource needs to implement the DynamicParallelismInference interface to support dynamic parallelism inference capabilities. Functionally, it needs to fulfill the following two points:    

...

Below is the flowchart for the hive source parallelism inference after the introduction of this FLIP.

Compatibility, Deprecation, and Migration Plan

A new configuration `table.exec.hive.infer-source-parallelism.mode` will be introduced to distinguish between the static and dynamic modes. After this FLIP, users can still revert to static inference by setting `table.exec.hive.infer-source-parallelism.mode` to static, or they can disable parallelism inference altogether by using `table.exec.hive.infer-source-parallelism.enabled`.

Due to the non-compliance of the configuration `table.exec.hive.infer-source-parallelism` with YAML standard specifications, a new configuration `table.exec.hive.infer-source-parallelism.enabled` will be introduced as a replacement. The old configuration will be marked as deprecated.

Limitations

It only works for batch jobs which use AdaptiveBatchScheduler.

Test Plan

This will be tested by unit and integration tests.

...