DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
There are scenarios when it is desirable to change Flink job configuration after it was submitted to Flink.
For example
- Troubleshooting (e.g. increase checkpoint timeout or failure threshold)
- Performance optimization, for example, tuning state backend parameters; in theory, even state backend type can be changed (this is out of scope though)
- Enabling new features after testing them in a non-Production environment. This allows to de-couple upgrading to newer Flink versions from actually enabling the features.
To support these use-cases, we propose to enhance Flink job configuration REST-endpoint with the support to read full job configuration; and update it.
Proposed Changes
REST API changes
The existing /jobs/<job-id>/config endpoint is extended by:
- adding
configurationandversionfields in response toGET - adding support for
PUTmethod
The new configuration field only contain job configuration; i.e. the options that were explicitly specified on job submission (or subsequent configuration updates via API). Cluster configuration is not returned by this endpoint.
Example GET request:
GET /jobs/7c2c8448ed5be80cee0ec64c1965543e/config
RESPONSE BODY:
{
"jid": "7c2c8448ed5be80cee0ec64c1965543e",
"name": "testJob",
"execution-config": {
"object-reuse-mode": false,
...
},
"configuration": {
"pipeline.object-reuse": false,
"execution.checkpointing.timeout": "10s",
...
},
"vesion": 1
}
Example PUT request
Note, that this endpoint expects the entire job configuration (ideally, obtained via GET). See rejected alternatives / PATCH for motivation.
Only configuration and version field are required. ExecutionConfig is derived from the supplied configuration and not specified in the request
PUT /jobs/7c2c8448ed5be80cee0ec64c1965543e/config
REQUEST BODY:
{
"configuration": {
"pipeline.object-reuse": false,
"execution.checkpointing.timeout": "1h",
...
},
"vesion": 1
}
RESPONSE BODY:
{}
Response codes:
| Code | Description | Explanation |
|---|---|---|
| 200 | OK | The request has been accepted. Job re-configuration will happen shortly |
| 400 | Bad Request | In particular, when changing an option that’s not white-listed in the configuration |
| 403 | Forbidden | Requested configuration change is not allowed (at least one option that's being changed is not in the allow-list) |
| 404 | Job not found | Wrong job ID |
| 409 | Conflict | Wrong value for version was supplied |
| 500 | Internal error | Internal error |
Versioning
To prevent concurrent updates (when an update from one API client is unintentionally overwritten by another one); the configuration is versioned. The client must specify the current version of the configuration when updating it; if it doesn’t match the current version in Flink, the request is rejected.
The current version can be obtained via GET.
Restart strategy behavior
The flow does not take into account the configured restart strategy; i.e. the job can be restarted to apply new configuration even if RestartStrategy is/was none.
Overall workflow
- A request to change the configuration is received by the Dispatcher
- The request is validated
- The configuration is stored in HA
- In-memory representations of configuration in
AdaptiveSchedulerare updated AdaptiveScheduler.state.onConfigurationChange()method is called; forExecutingstate, this transitions toRestartingstate- Upon start, the new configuration is used by JM and is submitted to TMs as part of submitTask RPCs
AdaptiveScheduler changes
AdaptiveScheduler is extended with updateJobConfiguration() method.
The following interface implemented by Executing State:
/** Listener for {@link JobConfiguration} changes (e.g. coming from REST API). */
interface ConfigurationListener {
void onConfigurationChange();
}
Public Interfaces
REST API
Add a GET and a PUT methods for job configuration - see the section above.
Configuration-related classes
New ConfigOption
# Comma-separated list of options that can be changed dynamically (via REST API). # '*' means any option can be changed. # epmty means no option can be changed. # By default, only execution.checkpointing.interval change is allowed: jobmanager.execution.dynamic-configuration.allow-list: execution.checkpointing.interval
org.apache.flink.configuration.Configuration
This class is extended to support semantic comparison to avoid unnecessary job restarts:
@Public
public class Configuration ... {
@Public
public static class DiffEntry {
private final String key;
@Nullable private final String oldValue;
@Nullable private final String newValue;
}
public List<DiffEntry> getDiff(Configuration other) {
...
}
}
Note on org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
Prior to flink 1.20, the constants for checkpointing options were located in ExecutionCheckpointingOptions in flink-streaming-java.
Porting this feature to prior versions requires duplicating these options in flink-core, similar to FLINK-34456 - Getting issue details... STATUS .
Note on org.apache.flink.api.common.restartstrategy.RestartStrategies
Prior to flink 2.0, restart strategy could be specified in a form of Java objects.
Porting this feature to prior versions requires adding support to initialize restart strategy from another one (to preserve current state, e.g. nextRestartTimestamp ).
Compatibility, Deprecation, and Migration Plan
The feature should be compatible with jobs from the previous Flink versions (after a regular migration).
Scope and limitations
- Not all Flink options can be changed this way
- some options are evaluated during Flink components startup (e.g. pekko configuration)
- some options are evaluated during job submission (most table.* options)
- some options, once configured, can not be changed withough adding such support (e.g. state backend)
- Only AdaptiveScheduler is supported
- AdaptiveScheduler will always restart the job if the configuration has changed
Test Plan
The feature will be covered by tests on different levels: REST API, Dispatcher, AS, UNIT.
The feature should be tested manually as part of the release process.
Rejected Alternatives
Stop-with-savepoint
Alternatively, users can already stop a job with a savepoint, change job configuration, and resubmit it back to Flink.
However, this is inferior to the proposed approach in the following ways:
- Higher downtime: the resources (task slots) need to be re-allocated, and likely re-download state
- Higher operational complexity: the user has to manage savepoints and deal with failures during creation of savepoints
Changing settings without job restart
The proposal could go further and apply new configuration without restarting the job.
While theoretically possible, it was decided to restart the job at least in the initial version:
- Special handling of every option would be necessary to make this work
- This can be added later as an incremental improvement
- The benefit is much less obvious and can be achieved by other means (speed-up recovery)
Using PATCH instead of PUT
For consistency (i.e. to avoid conflicting partial updates), this proposal requires an entire job configuration for the update.
In some cases, however, it might be useful to support PATCH, e.g. when overriding a set of parameters manually ad-hoc.
This can be a follow-up addition if deemed necessary.
Credits
The design is inspired by FLIP-291