Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
Discussion thread |
---|
| |||||||||
Vote thread | https://lists.apache.org/thread/lktnb162l2z3042m76to6xfbsdndy4r7 | ||||||||
---|---|---|---|---|---|---|---|---|---|
JIRA |
| ||||||||
Release | 1.20 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
FLIP-379 introduces dynamic source parallelism inference, which, compared to static inference, utilizes runtime information to more accurately determine the source parallelism. The FileSource already possesses the capability for dynamic parallelism inference. As a follow-up task to FLIP-379, this FLIP plans to implement the dynamic parallelism inference interface for HiveSource, and also switches the default static parallelism inference to dynamic parallelism inference.
Public Interfaces
New Config Option
To facilitate a smooth transition, it is planned to introduce a new configuration optionTo facilitate a smooth transition, it is planned to introduce a new configuration option `table.exec.hive.infer-source-parallelism.mode`, and the default value will be 'InferMode.DYNAMIC':
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public static final ConfigOption<InferMode> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE = ConfigOptions.key("table.exec.hive.infer-source-parallelism.mode") .enumType(InferMode.class) .defaultValue(InferMode.DYNAMIC) .withDescription( "An option for selecting the hive source parallelism inference mode: 'static' denotes static inference, where flink will determinedecide source parallelism at plan job creation stage; generation time; 'dynamic' represents dynamic inference, which will decide which will determine parallelism at job runtimeexecution dynamicallystage. In terms of functionality, In terms of functionality, the static mode is a subset of the dynamic mode; of'none' therepresents dynamicdisabling modeinference. Note: at the current stage, it only works when the value of option 'table.exec.hive.infer-source-parallelism.enabled' is true, until it is removed."); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public enum InferMode implements DescribedEnum { STATIC("static", text("Static parallelism inference mode for hive source.")), DYNAMIC("dynamic", text("Dynamic parallelism inference mode for hive source.")); NONE("none", true.");text("Disable parallelism inference for hive source.")); private final String value; private final InlineElement description; InferMode(String value, InlineElement description) { this.value = value; this.description = description; } @Override public String toString() { return value; } @Override public InlineElement getDescription() { return description; } } |
In addition, the current configuration option `table.exec.hive.infer-source-parallelism` acts not only as a switch to enable or disable hive source parallelism inference but also as a prefix for other configuration options (e.g. `table.exec.hive.infer-source-parallelism.max`). This practice does not conform to the standards of YAML specifications. Besides, with the introduction of `table.exec.hive.infer-source-parallelism.mode`
, we can disable parallelism inference by specifying InferMode.NONE, thus substituting the function of `table.exec.hive.infer-source-parallelism`
. Therefore, we plan to address this issue mark `table.exec.hive.infer-source-parallelism`
as Deprecated in this FLIP by introducing a new configuration item. Before this configuration option is completely phased out, `table.exec.hive.infer-source-parallelism
.enabled`
, as a replacement. The old configuration item `table will still serve as the main switch to enable or disable parallelism inference.
The temporary coexistence logic of the two configuration options is as follows:
Inference Mode Relationship Table | |||
---|---|---|---|
`table.exec.hive.infer-source-parallelism` | `table.exec.hive.infer-source-parallelism |
...
.mode` | |||
DYNAMIC | STATIC | NONE | |
true | dynamic parallelsim inference | static parallelism inference | disabled |
false | disabled | disabled | disabled |
Behavior Change
After introducing this FLIP, the behavior changes are as follows:
- The default hive source parallelism inference mode will change to dynamic inference.
- Upon enabling dynamic inference mode, the logic for applying the upper bound of parallelism inference will change. In static inference mode, hive source takes the value of `table.exec.hive.infer-source-parallelism.max` as the upper bound for inference, if it is not set, the config option's default value 1024 1000 will be used. However, in dynamic inference mode, if the user explicitly configures `table.exec.hive.infer-source-parallelism.max`, we will still use the user-configured value as the upper bound for parallelism inference. Otherwise, we will use `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the upper bound for parallelism inference. If neither of these configuration items is set, the upper bound for parallelism inference will follow the existing adaptive batch scheduler logic, falling back to `execution.batch.adaptive.auto-parallelism.max-parallelism` or `parallelism.default`.
Proposed Changes
General Idea
HiveSource needs to implement the DynamicParallelismInference interface to support dynamic parallelism inference capabilities. Functionally, it needs to fulfill the following two points:
- Include optimizations of currently available in static parallelism inference (e.g. PartitionPushDown, LimitPushDown, etc.).
- When dynamic partition pruning optimization is present, it should be able to calibrate parallelism based on the dynamically filtered dynamic partition pruning information.
Currently, the logic for hive parallelism inference is primarily implemented in the HiveParallelismInference. Dynamic parallelism inference will also be based on the HiveParallelismInference to maximize the reuse of existing code logic. Additionally, the following logic needs to be added to the HiveParallelismInference:
- Add logic to evaluate the setting of `table.exec.hive.infer-source-parallelism.mode` and choose the appropriate inference mode accordingly.
- Determine the upper bound for parallelism inference under dynamic inference mode, as detailed in [Public Interfaces | Behavior Change].
Below is the flowchart for the hive source parallelism inference after the introduction of this FLIP.
Compatibility, Deprecation, and Migration Plan
A new configuration item option `table.exec.hive.infer-source-parallelism.mode`
will be introduced to distinguish between the static and dynamic modes. After this FLIP, users can still revert to static inference by setting `table.exec.hive.infer-source-parallelism.mode`
to static
InferMode.STATIC
, or they can disable parallelism inference altogether by using setting `table.exec.hive.infer-source-parallelism.
enabledmode`
.Due to the non-compliance of the configuration item to InferMode.NONE or setting `table.exec.hive.infer-source-parallelism` to false
.
Due to the non-compliance of the configuration with YAML standard specifications, a new configuration item `table.exec.hive.infer-source-parallelism.enabled`
will be introduced as a replacement. The old configuration will be marked as deprecated`
with YAML standard specifications, it will be marked as deprecated.
Limitations
It only works for batch jobs which use AdaptiveBatchScheduler.
Test Plan
This will be tested by unit and integration tests.
...