Peter Huang, Rong Rong, Bowen Li, Shuyi Chen
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This proposal aims to support function DDL with the consideration of SQL syntax, language compliance, and advanced external UDF lib registration.
The Flink DDL is initialized and discussed in the design [1] by Shuyi Chen, et. al. As the initial discussion mainly focused on the table, type and view. FLIP-69 [2] extend it with a more detailed discussion of DDL for catalog, database, and function. Original the function DDL was under the scope of FLIP-69.
After some discussion with the community, we found that there are several ongoing efforts, such as FLIP-64 [3], FLIP-65 [4], and FLIP-78 [5]. As they will directly impact the SQL syntax of function DDL, the proposal wants to describe the problem clearly with the consideration of existing works and make sure the design aligns with efforts of API change of temporary objects and type inference for UDF defined by different languages.
Before deep into the DDL SQL, we want to discuss the major requirements for defining a function within Flink runtime by related FLIPs:
CREATE FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar' |
---|
Language Distinction: Due to bytecode language specifics in Scala, there are some limitations to extract type information from scala function. At the same time, support python UDF in table runtime is another ongoing effort. Thus, the SQL syntax needs to consider supporting multiple languages. For example: Mysql create function syntax support language in this way:
CREATE FUNCTION hello (s CHAR(20))RETURNS CHAR(50)DETERMINISTIC RETURN CONCAT('Hello, ',s,'!') LANGUAGE SQL |
---|
CREATE TEMPORARY FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar' |
---|
CREATE FUNCTION catalog1.addfunc AS 'com.example.hiveserver2.udf.add' LANGUAGE JAVA |
---|
We propose the following as the function DDL syntax:
CREATE [TEMPORARY| TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON] [USING JAR|FILE|ARCHIVE ‘resource_path’ [, USING JAR|FILE|ARCHIVE ‘path’]*]; |
---|
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name; |
---|
ALTER [TEMPORARY| TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name RENAME TO new_name; |
---|
SHOW FUNCTION [catalog_name.][db_name] |
---|
We currently only support java/scala/python. Both java and scala run in JVM. Technically, JVM and python are enough to distinguish two runtimes in Flink. But JVM and python are conceptually in different domains as JVM is runtime and python is language. Thus, we distinguished JAVA and SCALA in DDL syntax.
We want to use the function syntax to support all potential use cases. Below we list some obvious use cases that can be achieved.
CREATE TEMPORARY FUNCTION catalog1.db1.func1 AS ‘com.xxx.udf.func1UDF’ LANGUAGE ’JAVA’ DROP FUNCTION catalog1.db1.geofence |
---|
In this case, the assumption is that the UDF classes are already in the Classpath. Thus, we just need to get the class object by reflection, determine whether it is UDF, UDAF or UDTF, and register it to TableEnvironment.
CREATE FUNCTION catalog1.db1.func2 AS 'com.xxx.udf.func2UDF' LANGUAGE JAVA USING 'http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar' |
---|
In this case, the user can use a class that is not the local Classpath. In the example above, the function NestedOutput is contained in a jar that is released to Artifactory.
Using this type of model, we can split the user level logic from the platform. Each team can write and own its own UDF library. A Flink platform is just responsible to load it into Classpath and use it. We will discuss how to achieve it in the later section. Basically, the resource URL will be added as a user library in the execution environment. It will be added into a job graph, and ship to the storage layer, such as HDFS before job submission.
CREATE FUNCTION catalog1.db1.func3 AS 'com.xxx.udf.func3UDF' LANGUAGE 'PYTHON' USING 'http://external.resources/flink-udf.py' |
---|
The first change needed is to add more functions in CatalogFunction interface.
public interface CatalogFunction { String getClassName(); Enum getLanguage(); // TODO Map<String, String> getProperties(); CatalogFunction copy(); Optional<List<String>> getResourcePaths(); // TODO Optional<String> getDescription(); Optional<String> getDetailedDescription(); } |
---|
The second change is to register user Jar: In order to support loading external libraries and create UDFs from external libraries, we need to add a function in ExecutionEnvironment to register external libraries.
/** * Register a jar file to load in the Flink job dynamically. The jar file will be added into job graph before job * submission. During runtime, the jar file is loaded into user code class loader automatically. * * @param jarFile The path of the jar file (e.g., “file://a”, “hdfs://b”, “http://c”) */ Public void registerUserJarFile(String jarFile) { Path path = new Path(jarFile); this.userJars.add(path) } |
---|
Before the job submission, register user jars will be added into StreamGraph, and then be added into JobGraph in the JobGraphGenerator.
To consider the isolation of class loading of different session, we can add a new interface in {Stream}ExecutionEnvironment. Such as:
Public void registerUserJarFiles(String classloaderName, String... jarFiles) { // ... } |
---|
The interface register a set of Jar files with a specific Classloader environment key: classloaderName. Internally, it uses similar path as registerCachedFile(), which distributes the Jar files to runtime using Flink’s Blob Server.
Also, add a new interface in RuntimeContext to create and cache a custom userCodeClassLoader using the Jar file set registered under name.
Public ClassLoader getClassLoaderByName(String classloaderName) { // ... } |
---|
During code generation of the UDF function call, it will load the set of Jar files that are associated with the library into a custom classLoader, and invoke the function reflectively.
Also, inside RuntimeContext implementation, we will keep a cached of all loaded custom classLoader so we wont load the same library multiple times.
Note: For different language support, the implementation will be different, here we just discuss the API needed for java/scala libs.
From an implementation perspective, we want to provide function syntax align with multiple language support, but limit the scope only in java and scala. The python udd related support will be discussed and implemented in the scope of FLIP-78. The concrete action items include
In the Flink 1.10 release, we will focus on the basic function DDL syntax as below:
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF NOT EXISTS] [catalog_name.db_name.]function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name;
ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.] function_name AS identifier [LANGUAGE JAVA|SCALA|PYTHON];
SHOW FUNCTIONS [catalog_name.][db_name]
Basically, we will delivery create function and drop function that is already included in the classpath. For loading function from remote resources, it will be work after the Flink 1.10 release. The sub-tasks include.
As FLIP-65 New type inference for Table API UDFs is a blocker for adding scala function into TableEnvImpl. 1), 2) and 3) will only support language java. 4) is for adding a function
into the table environment with remote resources. Once the FLIP-65 is done, we can continue the work of supporting language Scala, and corresponding function registration into TableEnvImpl.
It is a new feature for Flink DDL, there is no migration needed.
The function DDL syntax considered about existing requirements. No rejected alternatives yet.
[2] FLIP-69 Flink SQL DDL Enhancement
[3] FLIP-64 Support for Temporary Objects in Table module
[4] FLIP-65 New type inference for Table API UDFs
[5] FLIP-78 Flink Python UDF Environment and Dependency Management