Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

There are scenarios when it is desirable to change Flink job configuration after it was submitted to Flink.

For example

  • Troubleshooting (e.g. increase checkpoint timeout or failure threshold)
  • Performance optimization, for example, tuning state backend parameters; in theory, even state backend type can be changed (this is out of scope though)
  • Enabling new features after testing them in a non-Production environment. This allows to de-couple upgrading to newer Flink versions from actually enabling the features.

To support these use-cases, we propose to enhance Flink job configuration REST-endpoint with the support to read full job configuration; and update it.

Proposed Changes

REST API changes

The existing /jobs/<job-id>/config endpoint is extended by:

  • adding configuration  and version fields in response to GET 
  • adding support for PUT method

The new configuration field only contain job configuration; i.e. the options that were explicitly specified on job submission (or subsequent configuration updates via API). Cluster configuration is not returned by this endpoint.

Example GET request:

GET /jobs/7c2c8448ed5be80cee0ec64c1965543e/config
RESPONSE BODY:
{
    "jid": "7c2c8448ed5be80cee0ec64c1965543e",
    "name": "testJob",
    "execution-config": {
        "object-reuse-mode": false,
        ...
    },
    "configuration": {
        "pipeline.object-reuse": false,
        "execution.checkpointing.timeout": "10s",
        ...
    },
    "vesion": 1
}

Example PUT request

Note, that this endpoint expects the entire job configuration (ideally, obtained via GET). See rejected alternatives / PATCH for motivation.

Only configuration and version field are required. ExecutionConfig is derived from the supplied configuration and not specified in the request

PUT /jobs/7c2c8448ed5be80cee0ec64c1965543e/config
REQUEST BODY:
{
    "configuration": {
        "pipeline.object-reuse": false,
        "execution.checkpointing.timeout": "1h",
        ...
    },
    "vesion": 1
}
RESPONSE BODY:
{}

Response codes:

CodeDescriptionExplanation
200OKThe request has been accepted. Job re-configuration will happen shortly
400Bad RequestIn particular, when changing an option that’s not white-listed in the configuration
403ForbiddenRequested configuration change is not allowed (at least one option that's being changed is not in the allow-list)
404Job not foundWrong job ID
409ConflictWrong value for version was supplied
500Internal errorInternal error

Versioning
To prevent concurrent updates (when an update from one API client is unintentionally overwritten by another one); the configuration is versioned. The client must specify the current version of the configuration when updating it; if it doesn’t match the current version in Flink, the request is rejected.

The current version can be obtained via GET.

Restart strategy behavior

The flow does not take into account the configured restart strategy; i.e. the job can be restarted to apply new configuration even if RestartStrategy is/was none.

Overall workflow

  1. A request to change the configuration is received by the Dispatcher
  2. The request is validated
  3. The configuration is stored in HA
  4. In-memory representations of configuration in AdaptiveScheduler  are updated
  5. AdaptiveScheduler.state.onConfigurationChange()  method is called; for Executing  state, this transitions to Restarting  state
  6. Upon start, the new configuration is used by JM and is submitted to TMs as part of submitTask RPCs

AdaptiveScheduler changes

AdaptiveScheduler is extended with updateJobConfiguration() method.

The following interface implemented by Executing State:

/** Listener for {@link JobConfiguration} changes (e.g. coming from REST API). */
interface ConfigurationListener {
    void onConfigurationChange();
}

Public Interfaces

REST API

Add a GET and a PUT methods for job configuration - see the section above.

Configuration-related classes

New ConfigOption

# Comma-separated list of options that can be changed dynamically (via REST API).
# '*' means any option can be changed.
# epmty means no option can be changed.
# By default, only execution.checkpointing.interval change is allowed: 
jobmanager.execution.dynamic-configuration.allow-list: execution.checkpointing.interval

org.apache.flink.configuration.Configuration

This class is extended to support semantic comparison to avoid unnecessary job restarts:

@Public
public class Configuration ... {

    @Public
    public static class DiffEntry {
        private final String key;
        @Nullable private final String oldValue;
        @Nullable private final String newValue;
    }

    public List<DiffEntry> getDiff(Configuration other) {
        ...
    }
}

Note on org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions

Prior to flink 1.20, the constants  for checkpointing options were located in ExecutionCheckpointingOptions in flink-streaming-java.

Porting this feature to prior versions requires duplicating these options in flink-core, similar to FLINK-34456 - Getting issue details... STATUS .

Note on org.apache.flink.api.common.restartstrategy.RestartStrategies

Prior to flink 2.0, restart strategy could be specified in a form of Java objects.

Porting this feature to prior versions requires adding support to initialize restart strategy from another one (to preserve current state, e.g. nextRestartTimestamp ).

Compatibility, Deprecation, and Migration Plan

The feature should be compatible with jobs from the previous Flink versions (after a regular migration).

Scope and limitations

  • Not all Flink options can be changed this way
    • some options are evaluated during Flink components startup (e.g. pekko configuration)
    • some options are evaluated during job submission (most table.* options)
    • some options, once configured, can not be changed withough adding such support (e.g. state backend)
  • Only AdaptiveScheduler is supported
  • AdaptiveScheduler will always restart the job if the configuration has changed

Test Plan

The feature will be covered by tests on different levels: REST API, Dispatcher, AS, UNIT.

The feature should be tested manually as part of the release process.

Rejected Alternatives

Stop-with-savepoint

Alternatively, users can already stop a job with a savepoint, change job configuration, and resubmit it back to Flink.

However, this is inferior to the proposed approach in the following ways:

  1. Higher downtime: the resources (task slots) need to be re-allocated, and likely re-download state
  2. Higher operational complexity: the user has to manage savepoints and deal with failures during creation of savepoints

Changing settings without job restart

The proposal could go further and apply new configuration without restarting the job.

While theoretically possible, it was decided to restart the job at least in the initial version:

  1. Special handling of every option would be necessary to make this work
  2. This can be added later as an incremental improvement
  3. The benefit is much less obvious and can be achieved by other means (speed-up recovery)

Using PATCH instead of PUT

For consistency (i.e. to avoid conflicting partial updates), this proposal requires an entire job configuration for the update.

In some cases, however, it might be useful to support PATCH, e.g. when overriding a set of parameters manually ad-hoc.

This can be a follow-up addition if deemed necessary.

Credits

The design is inspired by FLIP-291