Status

Current state: Under Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Motivation

Flink can be configured in various ways. Taking a session job as an example:

  • When the session cluster starts, the JobManager (JM) and TaskManager (TM) load the configuration files config.yaml from specified directory on their respective machines, merging them with command line parameters (-D) to form their component configuration.

  • When submitting a job, the client loads its local configuration file and merges it with configuration within the job code and command line parameters to obtain the job configuration.

When this job is submitted to the session cluster, the values of cluster-level config options, such as the memory for JM and TM, will be defined by the JM/TM configuration. And the values of job-level config options, such as parallelism, will be defined by the job configuration. Additionally, some config options will fall back to the values defined in the configuration from JM/TM if their values are not defined in job configuration, such as restart strategies.

However, it can be confusing for users to know where to set a config option. For example, we've received several bug reports from users saying that taskmanager.numberOfTaskSlots doesn’t take effect, which turned out to be because they mistakenly configured it in the job code.

Therefore, this FLIP plan introduces the concept of Config Option Scope to help users understand the proper way to configure each config option.

Public Interfaces

Introduce the ConfigOptionScope enumeration class and its corresponding getter/setter methods.

@PublicEvolving
public enum ConfigOptionScope {
    JOB,

    CLUSTER,

    JOB_AND_CLUSTER,

    SQL_GATEWAY,

    SQL_CLIENT,

    HISTROY_SERVER,

	SQL_EXECUTION,
 
    UNDEFINED
}
@PublicEvolving
public class ConfigOption<T> {
    ...

    private final ConfigOptionScope scope;
  
    public ConfigOptionScope getScope() {
        return scope;
    }
  
    ...
}
@PublicEvolving
public class ConfigOptions {
  ...
     

    /**
     * Builder for {@link ConfigOption} with a defined atomic type.
     *
     * @param <T> atomic type of the option
     */
    public static class TypedConfigOptionBuilder<T> {
        .....

    /**
     * Sets the scope of the configuration option.
     *
     * @param scope the scope to be set for the configuration option
     * @return a new instance of ScopedConfigOptionBuilder with the specified scope
     */
    public ScopedConfigOptionBuilder<T> scope(ConfigOptionScope scope) {
        return new ScopedConfigOptionBuilder<>(scope, key, clazz);
    }

    /**
     * Builder for {@link ConfigOption} with a defined config scope.
     *
     * @param <T> atomic type of the option
     */
    public static class ScopedConfigOptionBuilder<T> {
        private final String key;
        private final Class<T> clazz;
        private final ConfigOptionScope scope;

        ScopedConfigOptionBuilder(ConfigOptionScope scope, String key, Class<T> clazz) {
            this.scope = scope;
            this.key = key;
            this.clazz = clazz;
        }

        /** Defines that the option's type should be a list of previously defined atomic type. */
        public ListConfigOptionBuilder<T> asList() {
            return new ListConfigOptionBuilder<>(scope, key, clazz);
        }

        /**
         * Creates a ConfigOption with the given default value.
         *
         * @param value The default value for the config option
         * @return The config option with the default value.
         */
        public ConfigOption<T> defaultValue(T value) {
            return new ConfigOption<>(
                    key, clazz, ConfigOption.EMPTY_DESCRIPTION, value, false, scope);
        }

        /**
         * Creates a ConfigOption without a default value.
         *
         * @return The config option without a default value.
         */
        public ConfigOption<T> noDefaultValue() {
            return new ConfigOption<>(
                    key, clazz, Description.builder().text("").build(), null, false, scope);
        }
    }

  ....
}

Add a Scope column to the Config Option display on the official Configuration page.


Proposed Changes

Introduce the following config option scopes

Config Option Scope

Definition

CLUSTER

Config options used by Flink cluster's JM and TM.


Config option values should be defined in the configuration file (config.yaml) of the JM and TM, or defined by the command line parameters (-D) used when starting JM and TM.

JOB

Job-level Config options.


Config option values could be defined within the job code. If config option values are not specified in the job code, they will fallback to different configurations depending on the submission approach:

  1. Submitting a job to the Session cluster via Flink CLI: defined by the Flink CLI configuration.

  2. Starting a job in Application mode: defined by the JM configuration.

  3. Submitting a job through the JAR RUN REST API:

    1. First, their values will fallback to the values defined in the request body.

    2. If not specified in the request body, it will fallback to the values defined in the JM configuration.

  4. Submitting a job through the Sql-client (embedded mode): defined by the Sql-client configuration.

  5. Submitting a job through the Sql-gateway: defined by the Sql-gateway configuration.

JOB_AND_CLUSTER

Config option values can be defined either in job configuration or cluster configuration, or both, with different priorities for effectiveness.


The values of config options take precedence from the job configuration, and if they are not set, they will fallback to the cluster configuration.

SQL_CLIENT

Unique config options for the Sql-client.


Config option values should be defined in the Sql-client's configuration file or defined by the command line parameters (-D) used when starting Sql-client.

SQL_GATEWAY

Unique config options for the Sql-gateway.


Config option values should be defined in the Sql-gateway's configuration file or defined by the command line parameters (-D) used when starting Sql-gateway.

HISTORY_SERVER

Unique config options for the History-server.


Config option values should be defined in the HIstory-server's configuration file or defined by the command line parameters (-D) used when starting HIstory-server.

SQL_EXECUTION

Config options related to Flink SQL/Table API that impact SQL or Table API execution, such as SQL translation, execution plan generation, and operator behavior.


Config option values could be defined in a SQL/Table Program, or within the Sql-gateway or Sql-client config file. Config values specified in the Program take precedence; if not present, values from the Sql-gateway or Sql-client are used.

UNDEFINEDThe default scope when a config option is not labeled.

Note that some config options in Flink, related to Plugin, FileSystem, and Security, need to be configured in each Flink component when it is started. We label all of them as CLUSTER level.

Labeling and displaying Config Options

This FLIP plans to showcase Config Option Scope on the configuration page of the official documentation. To achieve this, we intend to add a Scope column to the Config Option table on the page. The specific Scope corresponding to each config option will be labeled in subsequent work.

Compatibility, Deprecation, and Migration Plan

It won't break any compatibility.

Follow-up Tasks

Label each Config Option displayed on the official Configuration page with its corresponding Scope. Note that since SQL-related config options are already on a separate page with explanations on how to configure them, this FLIP will not detail the scope of SQL config options in the official documentation.

  • No labels