Authors: Jincheng Sun, Dian Fu, Aljoscha Krettek
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The Python Table API has been supported in release 1.9.0. See the FLIP-38 for an introduction. However, currently Python user-defined functions are still not supported. In this FLIP, we want to support Python user-defined functions in Python Table API.
NOTE: User-Defined AggregateFunction, Python DataStream API support and Pandas support will not be covered in this FLIP. Definitely they will also be supported. However, considering that they are big topics, separate design docs and discussions will be better. However, the data structures proposed in this FLIP will be the basis for Pandas support and Python DataStream API support.
Goals
Non-Goals
NOTE: Although these features will not be supported in the first version, it doesn’t mean that they cannot be supported according to the design proposed in this FLIP. The reasons not to support them in the first version are as follows:
We will focus on the most used scenarios and revisit these features in the future.
Executing user-defined functions of languages other than Java can be broken up into the following items:
Moving the execution of user-defined functions to language-specific execution environment and using an RPC service between the operator and the remote execution environment allows for executing user-defined functions other than Java. The diagram is as follows:
The components involved in the communication between Flink operator and Python execution environment is as follows:
There are three kinds of user-defined functions: scalar function, table function. The interfaces are as follows.
class UserDefinedFunction(object): """ Base interface for Python user-defined function. """ __metaclass__ = ABCMeta def open(self, function_context): pass def close(self): pass def is_deterministic(self): return True |
class FunctionContext(object): def get_metric_group(self): return MetricGroup() @abstractmethod def get_job_parameter(self, key, default_value): pass |
class ScalarFunction(UserDefinedFunction): @abstractmethod def eval(self, *args): """Please define your implementation""" pass |
class TableFunction(UserDefinedFunction): @abstractmethod def eval(self, *args): """Please define your implementation""" pass |
User-defined functions may need to report metrics during execution. The following interfaces will be proposed to support it.
class MetricGroup(object): def __init__(self, group_name): self.group_name = group_name def counter(self, name): return Counter(MetricName(group_name, name)) def gauge(self, name, value): return Guava(MetricName(group_name, name)) def histogram(self, name): return Histogram(MetricName(group_name, name)) def meter(self, name): return Meter(MetricName(group_name, name)) class MetricName(object): def __init__(self, group_name, name): self.group_name = group_name self.name = name class Counter(object): def __init__(self, metric_name): self.metric_name = metric_name self.value = 0 def inc(self, n=1): self.value += n def dec(self, n=1): self.value -= n class Guava(object): def __init__(self, metric_name): self.metric_name = metric_name def set(self, value): self.value = value class Histogram(object): def __init__(self, metric_name): self.metric_name = metric_name def update(self, value): self.value = value class Meter(object): def __init__(self, metric_name): self.metric_name = metric_name self.value = 0 def mark_event(n): self.value += n |
We will add the following decorators to declare the input types, the output types and the accumulator types of user-defined functions:
Decorator arguments:
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) class Multiply(ScalarFunction): def eval(self, x, y): return x * y def is_deterministic(self): return False
class Split(TableFunction): def eval(self, x): for e in x.split(“#”): yield e class SumAcc(object): def __init__(self) self.sum = 0 |
Besides, decorators can also be used to declare a Python function as UDF or UDTF:
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), deterministic=False) def multiply(x, y): return x * y @udtf(input_types=[DataTypes.STRING()], result_types=DataTypes.STRING()) def split(self, x): for e in x.split(“#”): yield e |
Firstly, we need to allow the Python user-defined functions could be registered. The following interfaces will be added to the Python TableEnvironment:
class TableEnvironment(object):
... def register_function(self, name, function): pass |
Underlying, the corresponding Java instance of ScalarFunction, TableFunction will be created and registered into the TableEnvironment. To distinguish the Python user-defined functions from the Java user-defined functions, the method getLanguage will be introduced to Java interface FunctionDefinition:
public interface FunctionDefinition { … default FunctionLanguage getLanguage() { return FunctionLanguage.JAVA; } } public enum FunctionLanguage { JVM, PYTHON } |
class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j t_env.register_function("add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) t_env.register_function("subtract_one", udf(SubtractOne(), DataTypes.BIGINT(),DataTypes.BIGINT())) t_env.register_function("add", add) |
The most straight-forward way to execute Python user-defined function is that the execution logic is wrapped inside the regular Java user-defined function generated during registration. Suppose these are two Python user-defined functions in a StreamOperator, the workflow is as follows:
Pros:
Cons:
Considering that the terrible performance of this solution, we propose another solution:
Suppose that there are two Python user-defined functions: UDF1 and UDF2, UDF1 takes the column x and y as inputs and UDF2 takes the column y and z as inputs. The workflow of this solution is as follows:
It should be noted that the inputs are processed in a pipeline manner, it will not wait for the execution results before processing another input.
Pros:
Cons:
Although the cost is bigger compared to solution 1), we tend to choose this solution for performance reasons.
As we can see from the previous section that extracting Python user-defined functions and adjusting the physical execution plan accordingly is a prerequisite for user-defined functions execution.
The table planner has a query optimization and execution framework to translate the Table API programs to physical Flink jobs. We could add rules to extract the Python user-defined functions and adjust the execution plan during query optimization.
Regarding the Python UDFs contained in the Calc node, they will be grouped by their height in the RexProgram of the Calc node. The Python UDFs at the same height will be grouped into the same physical node.
The height can be calculated as follows:
Regarding the case shown in the above diagram, the height of java_udf1 and py_udf1 is 0, the height of java_udf2, py_udf2 and py_udf3 is 1.
Regarding the following job:
table.scan("source") .alias("a, b, c, d") .select("a, py_udf1(b), py_udf2(java_udf1(c)), java_udf2(py_udf3(d))") |
The physical plan will be adjusted as follows:
Regarding the Python UDFs contained in the join condition of Correlate node, it should be pushed down. There is no need to push down the Java UDFs contained in the join condition of Correlate node.
Regarding the Python UDFs contained in the join condition of Join and WindowJoin node, if the Python UDF only take the inputs from the left table or the right table, it will be pushed down by default. Regarding the Python UDFs take inputs from both the left table and right table as inputs, it will not be supported for now.
It will not contain Python UDFs in OverAggregate, WindowAggregate and GroupAggregate node as the Python UDFs contained in the over aggregate, aggregate and window aggregate will be pushed down by default.
Python UDTF could only exist in Correlate node. Two consecutive Correlate nodes could be merged to optimizing the Python UDTF execution performance if all the following conditions hold:
The join type and join condition of the merged Correlate is the same as the second Correlate.
Suppose the following job:
table.scan("source") .alias("a, b") .join_lateral("py_udtf1(a) as (x, y)") .join_lateral("py_udtf2(b, x) as (m, n)") |
The table function of the merged Correlate of the above job is as follows:
MergedTableFunction(TableFunction): def eval(self, a, b): for x, y in py_udtf1(a): for m, n in py_udtf2(b, x): yield x, y, m, n |
In the previous sections, we have introduced how to adjust the physical plan to group the Python user-defined functions which could be executed in together into the same physical node. In this section, we will discuss how to execute the Python user-defined functions.
As the Python user-defined functions cannot run directly in JVM, PythonUserDefinedFunctionRunner will be introduced for Python user-defined functions execution. Its main functionalities are:
As we can see, to implement PythonUserDefinedFunctionRunner, we need to introduce a few building blocks, such as Python execution environment management, data service responsible for transferring data between JVM and Python VM, state service responsible for accessing state from Python VM, logging service, metric service, etc.
A portability framework was introduced in Apache Beam in latest releases. It provides well-defined, language-neutral data structures and protocols between Beam SDK and runner, such as control service, data service, state service, logging service, etc. These data structures and protocols can be good candidates to implement the PythonUserDefinedFunctionRunner.
In the following, we will introduce how to extend the data structures and protocols provided in Beam portability framework to implement PythonUserDefinedFunctionRunner.
PS: Please refer to Beam portability framework for details about the portability framework in Beam. It introduces the high-level design, the implementation details of the Beam portability framework and also the improvement requirements from Flink. The following design doc assumes the readers have a basic understanding of how Beam portability framework works.
The following diagram shows the architecture of the PythonUserDefinedFunctionRunner based on Beam portability framework.
The components in color are the components of Beam portability framework which should be extended:
The high-level flow could be summarized as two parts:
1) Initialize the Python execution environment:
2) Process the input elements:
PythonUserDefinedFunctionRunner needs to prepare the Python execution environment during the operator initialization phase, such as launching the Python VM, registering the Python user-defined functions to be executed, etc.
Interfaces JobBundleFactory and StageBundleFactory provided in Beam could be used to prepare the user-defined function execution environment. The interfaces are as follows:
/** * A factory that has all job-scoped information, and can be combined with stage-scoped information * to create a {@link StageBundleFactory}. * * <p>Releases all job-scoped resources when closed. */ public interface JobBundleFactory extends AutoCloseable { StageBundleFactory forStage(ExecutableStage executableStage); } /** * A bundle factory scoped to a particular {@link * org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the resources * it needs to provide new {@link RemoteBundle RemoteBundles}. * * <p>Closing a StageBundleFactory signals that the stage has completed and any resources bound to * its lifetime can be cleaned up. */ public interface StageBundleFactory extends AutoCloseable { /** Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. */ RemoteBundle getBundle( OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) throws Exception; ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor(); } |
PS: The default implementation DefaultJobBundleFactory and SimpleStageBundleFactory could be good candidates for how to implement JobBundleFactory and StageBundleFactory.
After StageBundleFactory.getBundler() is called, the Python execution environment will be launched and the Python user-defined functions will be registered to the it:
ExecutableStage will be used during the execution environment initialization. It defines all the information needed to execute the user-defined functions. We could construct an ExecutableStage according to the Python user-defined functions to be executed.
After the Python VM is initialized, PythonUserDefinedFunctionRunner is ready to process the inputs.
Regarding data tran, we can see from the interface StageBundleFactory that:
At Java side, PythonUserDefinedFunctionRunner can make use of these data structures to send and receive data:
At Python side, Beam portability framework provides a basic framework for Python user-defined function execution (Python SDK Harness). The Python framework provides a class BeamTransformFactory which transforms user-defined functions DAG to operation DAG. Each node in the operation DAG represents a processing node. DataInputOperation (which is responsible for fetching data from the remote side) and DataOutputOperation (which is responsible for sending data to the remote side) are responsible for receiving inputs and sending execution results separately.
We need to construct the user-defined function DAG by attaching an additional source node with URN “beam:source:runner:0.1” and an additional sink node with URN “beam:sink:runner:0.1” to the original DAG. The Python framework provided by Beam could transform the node with URN “beam:source:runner:0.1” to DataInputOperation and transform the node with URN “beam:sink:runner:0.1” to DataOutputOperation.
As explained above, user-defined function DAG will be translated to Operation DAG for execution in Python framework. There are already a lot of Operations provided in Beam, but all of them are designed for the functions defined in Beam Python SDK. As the interfaces of user-defined functions in Flink are different from the functions defined in Beam, we need to define how to translate the Flink user-defined functions to Beam Operations.
The Python framework provides a pluggable way for user-defined function translation. Custom translators can be registered in BeamTransformFactory. We need to provide the custom translator and register it into BeamTransformFactory.
Regarding UDTF, it may produce zero or multiple results. As we know that, for Correlate node, for each input, the result is a collection of rows of the joined results of the original input row and the UDTF results. For one UDTF result, how could the PythonUserDefinedFunctionRunner know which input row it is corresponding to? There are two solutions:
Pros | Cons | |
Solution 1 | Implementation is simple | OOM is possible if the number of results for one input is very big |
Solution 2 | There is no need to worry about the number of results for one input |
|
Solution 1 is preferred for now as no extra header is needed. If the number of table function results for one input is so big that OOM is caused, users should increase the heap memory or optimizing the table function.
Beam divides input elements into bundles and there are two points should be noted:
We need to handle checkpoint and watermark in a proper way to solve the above problems.
We can see from the diagram of the architecture that the states accessed in the Python user-defined functions are managed by the state backend of the Flink operator. The following things should be done to make sure the checkpoint works well:
There are two approaches to handle the watermark:
Approach 1 has better latency and approach 2 has better throughput. The Flink runner in Beam takes approach 2 for better performance and we can just take approach 2 too. We can support approach 1 in the future as an option if we find it necessary and make it configurable.
During user-defined function execution, it may produce a lot of logs for all kinds of purposes. Collecting the logs from the remote execution environment will be benefit for logging management.
Logging service has already been provided in Beam for log aggregation and we could benefit from it with nothing need to do:
We can see from the interface StageBundleFactory that a BundleProgressHandler needs to be provided during the execution environment initialization.
public interface BundleProgressHandler { /** Handles a progress report from the bundle while it is executing. */ void onProgress(ProcessBundleProgressResponse progress); /** Handles the bundle's completion report. */ void onCompleted(ProcessBundleResponse response); /** Returns a handler that ignores metrics. */ static BundleProgressHandler ignored() { return new BundleProgressHandler() { @Override public void onProgress(ProcessBundleProgressResponse progress) {} @Override public void onCompleted(ProcessBundleResponse response) {} }; } } |
The BundleProgressHandler defines the callback of job execution progress. The data structure ProcessBundleProgressResponse and ProcessBundleResponse contain the metrics information collected in the remote environment.
To support user defined metrics, we need to do the following things:
The job parameters will be serialized together with the user-defined functions and transferred to the Python execution environment during initialization.
To support access job parameters in user-defined functions, we just need to provide a way to make the job parameters accessible in user-defined functions. This can be done by wrapping the job parameters in FunctionContext during translation the user-defined function to Beam Operation.
As the operator will launch separate Python process for Python user-defined function execution, we should make sure that the resource used by the Python process be accounted by Flink’s Resource Management framework.
We need to add a configuration “python.worker.memory.size” which indicates the maximum memory size for Python process. If multiple operators share the same Python worker, supposing n operators, the actual maximum memory size for the Python worker will be “n * python.worker.memory.size”.
In Unix system, Python has provided a library “resource” which can be used to set a hard limit of the memory size for a Python process. We can use it to set the hard limit for the Python worker.
The resource used by the Python process should also be added to the overall resource for an Operator. It makes sure that the operator doesn’t occupy resource exceeding the requested resource. The memory used by the Python process is considered as “native memory”.
For Python Table API jobs, if an operator contains Python use-defined function, it will be given a resource which is the origin resource + the resource used by the Python process.