This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/g3ohzbogww1g8zl7zlmn84fsk29qr568

JIRA: FLINK-36702 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Flink community is preparing to remove support for per-job submission. However, the current SQL API doesn’t provide any utilities to run SQL jobs in application mode.

Outline Design










SQL Gateway is the service that receives users' requests and then pulls up the JM to run the user script. When JM starts, it uses the SQLDriver to compile users' scripts and generate the job. 

The SQL Driver is a simplified version of the SQL Gateway and only supports executing the specified files. Once the file is executed, the Driver cleans up the associated file and exits.

Public Interfaces

Runtime

Currently, Deployments requires `pipeline.jars` to be configured with a bootstrap jar in Application mode, but for SQL users there is no jar to upload if the user is using jar resources that are pulled dynamically, so we need to loosen that part of the restriction.

SQL Gateway

/sessions/${session-id}/scripts

Verb: POST

Response code: 200 OK

Execute user scripts on the Yarn or Kubernetes.

Request body

{

"script": "...", // optional

"scriptPath": "...", // optional

"config": {...} // optional 

}

Response body

{

"clusterId":

}

We support two ways for users to pass SQL text. We expect users to pass the script file address in production mode. When the user passes the SQL statements directly to the SQL gateway, we record the contents of the SQL text in flink-conf and eventually in the k8s config map. However, the config map has a size limit, and in a production environment, the user's SQL text may be too large to be put into the config map.

Note: users must specify either script or script path in their request body. Otherwise, SqlGateway throws an exception to notify users the wrong usage.


public interface SqlGatewayService {

    /** 
     * Deploy the script in application mode.
     *
     * @param sessionHandle handle to identify the session.
     * @param scriptPath path to the script.
     * @param executionConfig to run the script.
     * @return the cluster description.
     */
    <ClusterID> ClusterID deployScript(
            SessionHandle sessionHandle,
            @Nullable Path scriptPath,
            @Nullable String script,
            Configuration executionConfig)
            throws SqlGatewayException;

}




Use Cases

Submit job with application mode via SQL Client

Currently, sql client provides -f startup options to submit the script to the session cluster. So we decided to reuse the same option to submit the script to the application cluster. Users can use the following command to submit scripts with application mode.


./sql-client.sh -f <path-to-file> 

  -Dexecution.target=kubernetes-applicaiton \

  -Dkubernetes.container.image.ref=custom-image-name 


This command means sql client requests the k8s API server to start a job manager based on the specified image. When the job manager starts, it executes the content in the script and submits the job. 

Note: When using -f parameter to submit script in application mode, it can not use -i parameter to initialize the session.

Additional JAR Resources

Flink offers great flexibility by permitting users to pass custom implementations(catalogs/connectors/udfs) through JAR files. Users can use the following ways to add their jars to the classpaths:

  • Use the ADD JAR command 

Users can use the `ADD JAR ‘<path-to-jar>’` in their script.


  • Use the startup option `--jars`

Users can use the following command to specify jar address.

./sql-client.sh --file <path-to-file> \

  -Dkubernetes.cluster-id=my-first-application-cluster \

  -Dexecution.target=kubernetes-applicaiton \

  -Dkubernetes.container.image.ref=<custom-image-name> \

  --jars <path-to-jar>


Both of the above methods support specifying a remote address. When a user uses the local address to refer to the jars, it means the jars are located at the JM local filesystem.

Additional Python Resources

Users can use the following command to register python files.

SET 'python.files' = 'hdfs://..';

SET 'python.archives' = 'hdfs://..';

SQL Gateway

Users can use the following REST API to submit a SQL job.

curl --request POST http://localhost:8083/sessions/${session-handle}/scripts --data \

'{

   "script": "INSERT INTO sink SELECT * FROM src;"

   "config": {

      "kubernetes.cluster-id": "my-first-application-cluster",

      "execution.target": "kubernetes-applicaiton",

      "user.artifacts.artifact-list": "oss://flink-1.20/connectors/kafka.jar;oss://flink-1.20/connectors/paimon.jar"

   }

}'

Implementation Details

When receiving the request, SQL Gateway will first merge the default parameters with the user-configured parameters. Because SQL Gateway often starts with a persistent CatalogStore configured, and by merging the configurations, we can use the CatalogStore information when compiling on the JM side. 

Since ApplicationClusterEntryPoint only loads the packages under $FLINK_HOME/lib when it starts, we will add a SqlDriver as a startup class inside the flink-table-runtime module. This startup class is mainly responsible for:

  1. Read the content of the file if the user specified a file to start;
  2. Create a SqlScriptRunner(located in flink-table-sql-gateway) through reflection to execute each single SQL sentence. SqlScriptRunner is placed in the flink-table-sql-gateway package and can use all SQL (some SQL is only supported in the gateway module, e.g. the SET command is not supported in the table module to switch to streaming batch mode).
  3. Kubernetes and Yarn have different options to ship artifacts to the JM side. We hard code these options in the sql-gateway module until deployment api provide unified api.
    1. Kubernetes uses option `user.artifacts.artifact-list` to specify files[1]
    2. Yarn uses option `yarn.ship-files` to specify files[2]


[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#user-artifacts-artifact-list

[2] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-ship-files

Please take a look at the poc code: https://github.com/fsk119/flink/tree/application-mode


Rejected Alternatives

Validation script before deploying

We do not intend to validate whether the script is legal or not. 

  1. This is not in line with the principles of the Application mode, which wants to defer compilation to the JM side.
  2. Validation involves downloading resources, which will make the whole execution longer, and will increase the complexity of the gateway itself when it comes to accessing and managing resources.


Compatibility, Deprecation, and Migration Plan

FLIP-480 is different from FLIP-316:

  1. FLIP-316 is targeted at deploying a statement to application cluster but the current design supports to deploy scripts to the application cluster;
  2. FLIP-316 uses the json plan to deploy a job but the current design compiles SQL in the runtime side.

Therefore, FLIP-480 doesn't influence FLIP-316 at all.


Future Work

Introduce Artifact Service

Currently Kubernetes does not support shipping files or jars to the jm side. Considering that k8s will be the dominant scenario in the future, we may provide a special service that allows the jm to fetch the gateway's local jar file from the gateway after the jm has started.




  • No labels