Authors: Wei Zhong, Xingbo Huang
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
FLIP-24 proposed SQL Client which provides an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. It is a useful tool for prototyping and playing around with Flink SQL.
SQL Client defines UDF via the environment file and has its own CLI implementation to manage dependencies, but neither of which supports Python UDF. We want to introduce the support of Python UDF for SQL Client, including the registration and the dependency management of Python UDF.
Define Python UDF in the SQL Client Environment File
Currently the format for Java UDF in SQL Client environment file is:
- name: myUDF
We expand the "from" property to support defining Python UDFs in the following formats:
- name: func1
The "name" property defines the function name used in SQL query. "from: python-object" means this UDF comes from a python object. The value of the property "fully-qualified-name" is the fully qualified name of the Python UDF object, which contains its module name and object name. The format is "module_name.object_name".
How to process the Python UDF in the environment file
When processing Python UDFs in the environment file, Java UserDefinedFunction wrappers would be created for corresponding Python UDFs. A Python process will be launched which is used to provide the necessary information such as the input types, result type, the deterministic information to the instantiation of Python UDFs. Then these wrappers will be registered to TableEnvironment just as other normal Java UDFs. This design comes from FLIP-106 and will reuse its code.
Python UDF Dependency Management in SQL Client
Users can add Java Dependencies via command line option "-j" and "-l" when launching SQL Client. We follow the design and leverage the Python dependency management command line options in "flink run". Users can add Python Dependencies via such command line options when launching SQL Client:
Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory is specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF. The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r').
Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.
Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could be used as the separator to specify multiple files.
Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists.
Python UDF in test.py:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
def func1(s: str):
return s.replace('bar', 'foo')
Define Python UDF in env.yaml:
- name: myUDF # Java UDF
- name: func1 # Python UDF
Add python dependencies:
$ sql-client.sh embedded -e env.yaml -pyfs test.py
# full example of python dependency management
$ sql-client.sh embedded -e env.yaml -pyfs test.py,/home/my/func2.py -pyreq /home/my/requirements.txt#/home/my/pacakges_dir -pyarch py37.zip#venv -pyexec venv/py37/bin/python
Use Python UDF in SQL Client:
Flink SQL > SELECT func1(a) AS a FROM source_table;
Compatibility, Deprecation, and Migration Plan
This FLIP is a new feature and so there is no compatible issue with previous versions.
- Support define Python UDF in the SQL Client environment file.
- Support add python dependencies via SQL Client command line.