Motivation


Users want to submit job via Flink REST API instead of Flink CLI which is more heavy-weight in certain scenarios, for example, a lightweight data processing workflow system that has Flink related systems.Currently, the /jars/:jarid/runAPI (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run) only supports a few selected Flink config options listed in the doc (parallelism、savepoint path and allow non-restored state), which is insufficient for practical use.Extending the /jars/:jarid/run API with an additional request body parameter "flinkConfiguration", which is a map of flink configuration option-value pairs set by users.For backward compatibility, we can retain the existing body parameters like "allowNonRestoredState", and when there's conflicting configurations, let the values set explicitly with existing body parameters take higher precedence over the values set by configs.Doing so helps users in the following ways

  • can avoid frequently packing flink jobs when users change job parameters, such as the checkpoint address
  • user code and job parameters are decoupled

Public Interfaces

Add "flinkConfiguration" to RestApi (JarRequestBody), which is a map structure that receives parameters related to Flink jobs submitted by the user Rest

Proposed Changes

Flink parameters can be roughly divided into job parameters and cluster parameters. Job parameters can be dynamically adjusted by users, while cluster parameters are fixed parameters when we deploy a FlinkSessionCluster or FlinkApplicantionJob,such as  jobmanager.rpc.port、jobmanager.rpc.address  are similar to non-job parameters

Flink job parameters currently take effect with the following priorities

The execution order will follow the same order as the CLI execution

User Code > Rest API / Flink CLI > Cluster Config

Now that we have upload a StateMachineExample jar, If we need to run it, we need to call RestApi /jars/:jarid/run

By adding the "flinkConfiguration" parameter to the /jars/:jarid/run Rest API, it is possible to extend the Rest API to produce the following behaviors, which are resolved below

We can distinguish parameters into external parameter and internal parameter

  • external parameter: outside of the flinkConfiguration field (entryClass/parallelism/programArgs/savepointPath/allowNonRestoredState ...)
  • internal parameter: The configuration associated with the "flinkConfiguration" field

When the user does not pass the internal parameters, the user's original submission behavior is not affected, and the current API status is unchanged.

{
  "entryClass": "org.apache.flink.streaming.examples.statemachine.StateMachineExample",
  "parallelism": "1",
  "programArgs": null,
  "savepointPath": null,
  "allowNonRestoredState": true
}

When the user passes Flink related job parameters into this parameter, there are several possibilities. For different possibilities, flink run jar rest api need to do the following:

  • The internal parameters passed in by the user contains non-job parameters, exp jobmanager.rpc.port , jobmanager.rpc.address and other non-job like parameters, as well as some other illegal parameters (perhaps misspelled by the user).


{
  "entryClass": "org.apache.flink.streaming.examples.statemachine.StateMachineExample",
  "parallelism": "1",
  "programArgs": null,
  "savepointPath": null,
  "allowNonRestoredState": true,
  "flinkConfiguration": {
   "jobmanager.rpc.port": 8081,
   "jobmanager.rpc.address": "localhost", 
   "state.savepoints.dir": "hdfs://flink/jobs/stateMachineExample/savepoints",
   "state.checkpoints.dir": "hdfs://flink/jobs/stateMachineExample/checkpoints"
  }
 }

These non-job parameters will not apply to Flink Applicantion Mode or Flink Session Mode, because these parameters have been fixed during user deployment, and will not apply and take effect when passed again. In this case, these parameters will be ignored

  • The internal parameters passed in by the user contains the external parameter


{
  "entryClass": "org.apache.flink.streaming.examples.statemachine.StateMachineExample",
  "parallelism": "1",
  "programArgs": null,
  "savepointPath": "hdfs://flink/jobs/stateMachineExample/savepoints",
  "allowNonRestoredState": true,
  "flinkConfiguration": {
   "allowNonRestoredState": false,
   "state.checkpoints.dir": "hdfs://flink/jobs/stateMachineExample/checkpoints"
  }
 }

At this point, we can see that the user configures the same two parameters in the external parameter and internal parameters, in this case the Outer parameters is valid ("allowNonRestoredState": true)

Ignore internal configuration parameter, use external parameter as allowNonRestoredState value. The goal is to keep the user's habits intact while also making the API portable forward

  • When the user passes external parameters, it also contains some parameters that were not in the external parametersi, such as the Checkpoint address


{
  "entryClass": "org.apache.flink.streaming.examples.statemachine.StateMachineExample",
  "parallelism": "1",
  "programArgs": null,
  "savepointPath": null,
  "allowNonRestoredState": false,
  "flinkConfiguration": {
   "state.checkpoints.dir": "hdfs://flink/jobs/stateMachineExample/checkpoints"
  }
 }

At last, these internal and external parameters will be combined into one "flinkConfiguration" for this job


On the WebUI, we might be able to add some input fields for the user to fill in these values when submitting a job, a small feature that will be determined in a subsequent discussion

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

None. The user behavior does not change

  • If we are changing behavior how will we phase out the older behavior?

None

  • If we need special migration tools, describe them here.

None

  • When will we remove the existing behavior?

None, I think we can keep two or three versions for now and then consider whether to remove those redundant parameters and replace them completely with the config field

Test Plan

  • The original job submission methods and parameters can still be used
  • The Flink job parameter can be applied to this job according to its priority
  • Incoming non-job parameters are not affected

Rejected Alternatives

None