Status
Current state: Under Discussion
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-59-Enable-execution-configuration-from-Configuration-object-td32359.html
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently the runtime parameters can only be configured via setters in ExecutionConfig / CheckpointConfig / StreamExecutionEnvironment via Java or Scala code. This has some drawbacks:
- User must have access to the StreamExecutionEnvironment. We face this problem in the unified TableEnvironmen mentioned in FLIP-32. There, users do not see the underlying StreamExecutionEnvironment.
- It’s impossible to set the parameters from command line. We face this issue e.g. in SQL Client.
- It’s impossible to have job specific default values for execution parameters provided from a file.
- Moreover some of the config options do not have a string serializable options, which makes it even harder to implement the two previous items.
- These options do not have descriptions in documentation.
We suggest to improve the runtime configuration so that it’s configurable from a Configuration object and integrates nicely in a possible hierarchy of configuration layers in the future.
Assumption:
This FLIP in few places is based on FLIP-54. In particular it assumes richer type support for ConfigOptions, and availability of ConfigurationReader#getOptional.
Proposed Changes
Methods to configure config objects
We suggest to add methods that enable to change the state of the three classes from a ConfigurationReader.
- Add configure methods:
- void ExecutionConfig.configure(ReadableConfig configuration, ClassLoader classLoader)
- void CheckpointConfig.configure(ReadableConfig)
- void StreamExecutionEnvironment.configure(ReadableConfig configuration, ClassLoader classLoader)
Those methods would mutate internal state of only those parameters for which a corresponding option is present in the ConfigurationReader. The rest would be left intact. The method StreamExecutionEnvironment.configure would also call the two other methods.
E.g.
public static final ConfigOption<ClosureCleanerLevel> CLOSURE_CLEANER_LEVEL =
ConfigOptions.key("pipeline.closure-cleaner-level")
.enumType(ClosureCleanerLevel.class)
.defaultValue(ClosureCleanerLevel.RECURSIVE);
void configure(ConfigurationReader conf) {
...
conf.getOptional(CLOSURE_CLEANER_LEVEL).ifPresent(level -> {
this.closureCleanerLevel = level;
})
...
}
New ConfigOptions
- In class org.apache.flink.configuration.PipelineOptions:
key | setting | type | Expected format |
---|---|---|---|
pipeline.auto-generate-uids | ExecutionConfig#enableAutoGeneratedUids | ConfigOption<Boolean> | |
pipeline.auto-type-registration | ExecutionConfig#autoTypeRegistrationEnabled | ConfigOption<Boolean> | |
pipeline.auto-watermark-interval | ExecutionConfig#autoWatermarkInterval | ConfigOption<Duration> | |
pipeline.closure-cleaner-level | ExecutionConfig#closureCleanerLevel | ConfigOption<ExecutionConfig.ClosureCleanerLevel> | |
pipeline.force-avro | ExecutionConfig#forceAvro | ConfigOption<Boolean> | |
pipeline.force-kryo | ExecutionConfig#forceKryo | ConfigOption<Boolean> | |
pipeline.generic-types | ExecutionConfig#disableGenericTypes | ConfigOption<Boolean> | |
pipeline.global-job-parameters | ExecutionConfig#globalJobParameters | ConfigOption<Map<String, String>> | |
pipeline.max-parallelism | ExecutionConfig#maxParallelism | ConfigOption<Integer> | |
pipeline.object-reuse | ExecutionConfig#objectReuse | ConfigOption<Boolean> | |
pipeline.default-kryo-serializers | ExecutionConfig#defaultKryoSerializerClasses | ConfigOption<List<String>> | semicolon separated list of pairs of class names and serializers class names. Example: class:org.apache.flink.api.common.ExecutionConfigTest,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1;class:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer2 |
pipeline.registered-kryo-types | ExecutionConfig#registeredKryoTypes | ConfigOption<List<String>> | semicolon separated list of class names Example: org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1 |
pipeline.registered-pojo-types | ExecutionConfig#registeredPojoTypes | ConfigOption<List<String>> | semicolon separated list of class names Example: org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1 |
pipeline.operator-chaining | StreamExecutionEnvironment#isChainingEnabled | ConfigOption<Boolean> | |
pipeline.cached-files | StreamExecutionEnvironment#cacheFile | ConfigOption<List<String>> | semicolon separated list of triples of cached file path, name, and executable flag(the executable flag is optional) Example: name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2 |
In class org.apache.flink.streaming.api.environment.StreamPipelineOptions:
key setting type pipeline.time-characteristic StreamExecutionEnvironment#timeCharacteristic ConfigOption<TimeCharacteristic> In class org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
key setting type execution.checkpointing.mode CheckpointConfig#checkpointingMode ConfigOption<CheckpointingMode> execution.checkpointing.interval CheckpointConfig#checkpointInterval ConfigOption<Duration> execution.checkpointing.timeout CheckpointConfig#checkpointTimeout ConfigOption<Duration> execution.checkpointing.max-concurrent-checkpoints CheckpointConfig#maxConcurrentCheckpoints ConfigOption<Integer> execution.checkpointing.min-pause CheckpointConfig#minPauseBetweenCheckpoints ConfigOption<Duration> execution.checkpointing.prefer-checkpoint-for-recovery CheckpointConfig#preferCheckpointForRecovery ConfigOption<Boolean> execution.checkpointing.tolerable-failed-checkpoints CheckpointConfig#tolerableCheckpointFailureNumber ConfigOption<Integer> execution.checkpointing.externalized-checkpoint CheckpointConfig#externalizedCheckpointCleanup ConfigOption<CheckpointConfig.ExternalizedCheckpointCleanup> - In class org.apache.flink.configuration.ExecutionOptions
key setting type execution.checkpointing.snapshot-compression ExecutionConfig#useSnapshotCompression ConfigOption<Boolean> execution.buffer-timeout StreamExecutionEnvironment#bufferTimeout ConfigOption<Duration>
Options not configurable from Configuration
- ExecutionConfig
- ExecutionConfig#executionMode
- ExecutionConfig#numberOfExecutionRetries
- ExecutionConfig#codeAnalysisMode
- ExecutionConfig#executionRetryDelay
- ExecutionConfig#failTaskOnCheckpointError
- ExecutionConfig#defaultInputDependencyConstraint
- ExecutionConfig#registeredTypesWithKryoSerializers
- ExecutionConfig#registeredTypesWithKryoSerializerClasses
- ExecutionConfig#defaultKryoSerializers
- CheckpointConfig
- CheckpointConfig#failOnCheckpointingErrors
Existing configuration options:
There are at least three places where user can configure execution behavior on a per job case: ExecutionConfig, CheckpointConfig, StreamExecutionEnvironment. The following table lists all configuration parameters that are present in those classes. It also discussed suggested keys that we could use for those options.
ExecutionConfig:
Configuration | Comment | Suggested key | Type of the field | Suggested type of ConfigOption | Exists in 1.9 |
autoTypeRegistration | pipeline.auto-type-registration | boolean | ConfigOption<Boolean> | ||
autoWatermarkInterval | We could use Duration for ConfigOption | pipeline.auto-watermark-interval | long | ConfigOption<Duration> | |
closureCleaner | pipeline.closure-cleaner-level | ClosureCleanerLevel | ConfigOption<ClosureCleanerLevel> | ||
defaultInputDependencyConstraint | not supported | ||||
defaultKryoSerializerClasses | We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization. | pipeline.default-kryo-serializers | Map<Class, Class<Serializer<?>>> | ConfigOption<List<DefaultKryoSerializer>>, where DefaultKryoSerializer is a pojo | |
defaultKryoSerializers | We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from Java-serialized strings in Base64 encoding. | not supported | Map<Class, SerializableSerializer<?>> | ||
disableGenericTypes | we should invert the logic in the ConfigOption. true means enabled, false disabled | pipeline.generic-types | boolean | ConfigOption<Boolean> | |
enableAutoGeneratedUIDs | pipeline.auto-generated-uids | boolean | ConfigOption<Boolean> | ||
executionMode | not supported | ||||
executionRetryDelay/ numberOfExecutionRetries | Deprecated | not supported | long/int | ||
forceAvro | pipeline.force-avro | boolean | ConfigOption<Boolean> | ||
forceKryo | pipeline.force-kryo | boolean | ConfigOption<Boolean> | ||
setGlobalJobParameters | pipeline.global-job-parameters | Map<String, String> | ConfigOption<Map<String, String>> | ||
latencyTrackingInterval | reuse the existing | long | metrics.latency.interval(MetricOptions) | ||
setMaxParallelism | pipeline.max-parallelism | int | ConfigOption<Integer> | ||
objectReuse | pipeline.object-reuse | boolean | ConfigOption<Boolean> | ||
setParallelism | reuse the existing | int | parallelism.default(CoreOptions) table.exec.resource.default-parallelism (ExecutionConfigOptions) - drop this Table specific option | ||
registeredKryoTypes | pipeline.registered-kryo-types | List<Class<?>> | ConfigOption<List<Class<?>>> | ||
registeredPojoTypes | pipeline.registered-pojo-types | List<Class<?>> | ConfigOption<List<Class<?>>> | ||
registeredTypesWithKryoSerializers | We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from MD5 encoding. | not supported | Map<Class, Serializer<?>> | ||
registeredTypesWithKryoSerializerClasses | We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization. This option in the serialization uses ids of serializers. | not supported as part of this FLIP | Map<Class, Class<Serializer<?>>> | ||
setRestartStrategy | On the client side we need to convert the option to RestartStrategyConfiguration. It does not support custom restart strategy. We would also flatten the configuration structure. | reuse the existing | RestartStrategy | restart-strategy(ConfigConstants) (not a proper ConfigOption yet) | |
taskCancellationIntervalMillis | reuse the existing | long | task.cancellation.interval (TaskManagerOptions) | ||
taskCancellationTimeoutMillis | reuse the existing | long | task.cancellation.timeout (TaskManagerOptions) | ||
useSnapshotCompression | execution.checkpointing.snapshot-compression | boolean | ConfigOption<Boolean> | ||
printProgressDuringExecution | Deprecated, has no effect | not supported | |||
codeAnalysisMode | Deprecated, has no effect | not supported | --- | ||
failTaskOnCheckpoint | Deprecated | not supported | boolean |
CheckpointConfig:
Configuration | Comment | Suggested key | Type | Suggested type of ConfigOption | Exists in 1.9 |
checkpointingMode | execution.checkpointing.mode | Enum<CheckpointingMode> | ConfigOption<CheckpointingMode> | ||
checkpointInterval | We could use Duration for ConfigOption | execution.checkpointing.interval | long | ConfigOption<Duration> | |
checkpointTimeout | We could use Duration for ConfigOption | execution.checkpointing.timeout | long | ConfigOption<Duration> | |
externalizedCheckpointCleanup | execution.checkpointing.externalized-checkpoint-mode | Enum<ExternalizedCheckpointCleanup> | ConfigOption<ExternalizedCheckpointCleanup> | ||
failOnCheckpointingErrors | this is deprecated | not supported | boolean | ||
forceCheckpointing | this is deprecated | not supported | boolean | ||
maxConcurrentCheckpoints | execution.checkpointing.max-concurrent-checkpoints | int | ConfigOption<Integer> | ||
minPauseBetweenCheckpoints | We could use Duration for ConfigOption | execution.checkpointing.min-pause | long | ConfigOption<Duration> | |
preferCheckpointForRecovery | execution.checkpointing.prefer-checkpoint-over-savepoint | boolean | ConfigOption<Boolean> | ||
tolerableCheckpointFailureNumber | execution.checkpointing.tolerable-checkpoint-failures | int | ConfigOption<Integer> |
StreamExecutionEnvironment:
Configuration | Comment | Suggested key | Type | Suggested type of ConfigOption | Exists in 1.9 |
timeCharacteristic | pipeline.time-characteristic | Enum<StreamTimeCharacteristic> | ConfigOption<StreamTimeCharacteristic> | ||
defaultStateBackend | reuse options | StateBackend | state.backend (CheckpointingOptions) | ||
isChainingEnabled | pipeline.operator-chaining | boolean | ConfigOption<Boolean> | ||
bufferTimeout | We could use Duration for ConfigOption We could move to ExecutionConfig | execution.buffer-timeout | long | ConfigOption<Duration> | |
cachedFile |
| pipeline.cached-files | <String, String, Boolean> filePath/fileName/Executable | ConfigOption<List<CachedFile>>, where CachedFile is a pojo |
Compatibility, Deprecation, and Migration Plan
- The changes are backward compatible. It introduces an additional way of configuring ExecutionConfig, CheckpointConfig & StreamExecutionEnvironment. It reuses existing options wherever possible.
Test Plan
Covered by unit tests
Rejected Alternatives
---