Page tree
Skip to end of metadata
Go to start of metadata

Authors: Jincheng Sun, Dian Fu, Aljoscha Krettek


Current stateReleased

Discussion thread


Released: 1.10.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The Python Table API has been supported in release 1.9.0. See the FLIP-38 for an introduction. However, currently Python user-defined functions are still not supported. In this FLIP, we want to support Python user-defined functions in Python Table API.

NOTE: User-Defined AggregateFunction, Python DataStream API support and Pandas support will not be covered in this FLIP. Definitely they will also be supported. However, considering that they are big topics, separate design docs and discussions will be better. However, the data structures proposed in this FLIP will be the basis for Pandas support and Python DataStream API support.


  • Support Python user-defined functions in Table API (UDF, UDTF)
  • Support Python UDFs mixed with Java UDFs
  • Use Beam portability framework as the basis for Python user-defined function execution


  • Python UDAFs are not supported
  • Python UDFs used in Join condition which take the columns from both the left table and the right table as inputs are not supported
  • Python UDFs used in MatchRecognize are not supported

NOTE: Although these features will not be supported in the first version, it doesn’t mean that they cannot be supported according to the design proposed in this FLIP. The reasons not to support them in the first version are as follows:

  1. The performance may be poor without a lot of optimizations
  2. These scenarios are rare compared to the other scenarios

We will focus on the most used scenarios and revisit these features in the future.


Executing user-defined functions of languages other than Java can be broken up into the following items:

  1. Launch separate language-specific execution environment for user-defined function execution. For example, Python VM would be launched for Python user-defined functions execution.
  2. Send the input data to the remote execution environment for execution.
  3. Execute the user-defined functions. It may need to access state, logging, report metrics during execution.
  4. Fetch the execution results from the remote execution environment.

Moving the execution of user-defined functions to language-specific execution environment and using an RPC service between the operator and the remote execution environment allows for executing user-defined functions other than Java. The diagram is as follows:

The components involved in the communication between Flink operator and Python execution environment is as follows:

  • Env Service - Responsible for launching and destroying the Python execution environment. 
  • Data Service - Responsible for transferring the input data and the user-defined function execution results between Flink operator and Python execution environment.
  • State Service - It allows the aggregate functions which runs inside the Python execution environment to access the state managed by the Flink operator.
  • Logging Service - It provides logging support for user-defined functions. It allows for transferring log entries produced by user-defined functions to Flink operator and integrates with the Flink’s logging system. 
  • Metrics Service - It provides metrics support for user-defined functions. It allows transferring the metrics produced by user-defined functions to Flink operator and integrates with the Flink’s metrics system.

Public Interfaces

User Defined Function

There are three kinds of user-defined functions: scalar function, table function. The interfaces are as follows.


class UserDefinedFunction(object):


   Base interface for Python user-defined function.


   __metaclass__ = ABCMeta

   def open(self, function_context):


   def close(self):


   def is_deterministic(self):

       return True


class FunctionContext(object):

   def get_metric_group(self):

       return MetricGroup()


   def get_job_parameter(self, key, default_value):



class ScalarFunction(UserDefinedFunction):


   def eval(self, *args):

       """Please define your implementation"""



class TableFunction(UserDefinedFunction):


   def eval(self, *args):

       """Please define your implementation"""



User-defined functions may need to report metrics during execution. The following interfaces will be proposed to support it. 

class MetricGroup(object):

   def __init__(self, group_name):

      self.group_name = group_name

   def counter(self, name):

       return Counter(MetricName(group_name, name))

   def gauge(self, name, value):

       return Guava(MetricName(group_name, name))

   def histogram(self, name):

       return Histogram(MetricName(group_name, name))

   def meter(self, name):

       return Meter(MetricName(group_name, name))

class MetricName(object):

  def __init__(self, group_name, name):

   self.group_name = group_name = name

class Counter(object):

  def __init__(self, metric_name):

      self.metric_name = metric_name

      self.value = 0

   def inc(self, n=1):

       self.value += n

   def dec(self, n=1):

       self.value -= n

class Guava(object):

   def __init__(self, metric_name):

      self.metric_name = metric_name

   def set(self, value):

      self.value = value

class Histogram(object):

   def __init__(self, metric_name):

      self.metric_name = metric_name

   def update(self, value):

      self.value = value

class Meter(object):

   def __init__(self, metric_name):

      self.metric_name = metric_name

      self.value = 0

   def mark_event(n):

      self.value += n


We will add the following decorators to declare the input types, the output types and the accumulator types of user-defined functions: 

  • udf: used for ScalarFunction
  • udtf: used for TableFunction

Decorator arguments:

  • input_types: used in ScalarFunction, TableFunction, indicates the input type of the user defined function.
  • result_type: used in ScalarFunction, indicates the result type of the user defined scalar function.
  • result_types: used in TableFunction, indicates the result types of the user defined table function


@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())

class Multiply(ScalarFunction):

   def eval(self, x, y):

       return x * y

  def is_deterministic(self):

        return False

input_types=[DataTypes.STRING()], result_types=DataTypes.STRING())

class Split(TableFunction):

   def eval(self, x):

       for e in x.split(“#”):

           yield e

class SumAcc(object):

   def __init__(self)

       self.sum = 0

Besides, decorators can also be used to declare a Python function as UDF or UDTF:

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), deterministic=False)

def multiply(x, y):

    return x * y

@udtf(input_types=[DataTypes.STRING()], result_types=DataTypes.STRING())

def split(self, x):

    for e in x.split(“#”):

        yield e

Proposed Design

Registration of Python User-Defined Functions

Firstly, we need to allow the Python user-defined functions could be registered. The following interfaces will be added to the Python TableEnvironment:

class TableEnvironment(object):



   def register_function(self, name, function):


Underlying, the corresponding Java instance of ScalarFunction, TableFunction will be created and registered into the TableEnvironment. To distinguish the Python user-defined functions from the Java user-defined functions, the method getLanguage will be introduced to Java interface FunctionDefinition:

public interface FunctionDefinition {


  default FunctionLanguage getLanguage() {

    return FunctionLanguage.JAVA;



public enum FunctionLanguage {





class SubtractOne(ScalarFunction):

      def eval(self, i):

            return i - 1

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())

def add(i, j):

      return i + j


                                        udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()))


                                        udf(SubtractOne(), DataTypes.BIGINT(),DataTypes.BIGINT()))

t_env.register_function("add", add)

High-Level Execution Mode

Solution 1

The most straight-forward way to execute Python user-defined function is that the execution logic is wrapped inside the regular Java user-defined function generated during registration. Suppose these are two Python user-defined functions in a StreamOperator, the workflow is as follows:


  1. The changes to the existing Flink Table module is small. All the Python user-defined function execution logic is wrapped inside the generated Java user-defined function.
  2. The scenarios where Python user-defined functions could be used is wide. They could be used in any places where Java user-defined functions could be used. 


  1. The Python user-defined functions need to be executed one by one. As there are RPC communication between JVM and Python VM, the performance would be terrible slow.
  2. One column needs to be transferred to the Python VM multiple times if it’s used by multiple Python user-defined functions.
  3. Multiple communications are needed to execute nesting Python UDFs. For example, regarding UDF2(UDF1(x)), two communications are needed between JVM and Python VM. It has to send the input “x” to Python VM to execute UDF1 firstly and then send the execution results of UDF1 to the Python VM to execute UDF2.

Solution 2

Considering that the terrible performance of this solution, we propose another solution:

  1. Extract all the Python user-defined functions in the same node and execute them in batch.
  2. Execute the Python user-defined functions in a pipeline manner. There is no need to wait for the execution results for one input before processing another input.

Suppose that there are two Python user-defined functions: UDF1 and UDF2, UDF1 takes the column x and y as inputs and UDF2 takes the column y and z as inputs. The workflow of this solution is as follows:

It should be noted that the inputs are processed in a pipeline manner, it will not wait for the execution results before processing another input.


  1. This solution could solve all the problems of solution 1)


  1. More changes are required compared to solution 1). 
    1. Changes are required to extract the Python user-defined functions.
    2. Changes are required to execute the Python user-defined functions. Separate StreamOperators should be defined to execute the Python user-defined functions.
    3. Need to take care of all the places where user-defined functions could be used.

Although the cost is bigger compared to solution 1), we tend to choose this solution for performance reasons.

Extract Python User-Defined Functions

As we can see from the previous section that extracting Python user-defined functions and adjusting the physical execution plan accordingly is a prerequisite for user-defined functions execution.

The table planner has a query optimization and execution framework to translate the Table API programs to physical Flink jobs. We could add rules to extract the Python user-defined functions and adjust the execution plan during query optimization.

User-Defined Scalar Function - UDF


Regarding the Python UDFs contained in the Calc node, they will be grouped by their height in the RexProgram of the Calc node. The Python UDFs at the same height will be grouped into the same physical node.

The height can be calculated as follows:

  1. The height of a UDF is 0 if all its inputs are from the original input row
  2. For a Java UDF, if all its inputs are not Python UDFs, its height is the same as the maximum height of its inputs. Otherwise, its height is the maximum height of its inputs + 1
  3. For a Python UDF, its height is the same as the maximum height of its inputs.

Regarding the case shown in the above diagram, the height of java_udf1 and py_udf1 is 0, the height of java_udf2, py_udf2 and py_udf3 is 1.

Regarding the following job:


        .alias("a, b, c, d")

        .select("a, py_udf1(b), py_udf2(java_udf1(c)), java_udf2(py_udf3(d))")

The physical plan will be adjusted as follows:


Regarding the Python UDFs contained in the join condition of Correlate node, it should be pushed down. There is no need to push down the Java UDFs contained in the join condition of Correlate node.


Regarding the Python UDFs contained in the join condition of Join and WindowJoin node, if the Python UDF only take the inputs from the left table or the right table, it will be pushed down by default. Regarding the Python UDFs take inputs from both the left table and right table as inputs, it will not be supported for now.


It will not contain Python UDFs in OverAggregate, WindowAggregate and GroupAggregate node as the Python UDFs contained in the over aggregate, aggregate and window aggregate will be pushed down by default.

User-Defined Table Function - UDTF

Python UDTF could only exist in Correlate node. Two consecutive Correlate nodes could be merged to optimizing the Python UDTF execution performance if all the following conditions hold:

  1. The join type of the first Correlate node is Inner Join
  2. The join condition of the first Correlate node is None

The join type and join condition of the merged Correlate is the same as the second Correlate. 

Suppose the following job:


     .alias("a, b")

     .join_lateral("py_udtf1(a) as (x, y)")

     .join_lateral("py_udtf2(b, x) as (m, n)")

The table function of the merged Correlate of the above job is as follows:


    def eval(self, a, b):

        for x, y in py_udtf1(a):

            for m, n in py_udtf2(b, x):

                yield x, y, m, n

User-Defined Function Execution

In the previous sections, we have introduced how to adjust the physical plan to group the Python user-defined functions which could be executed in together into the same physical node. In this section, we will discuss how to execute the Python user-defined functions. 


As the Python user-defined functions cannot run directly in JVM, PythonUserDefinedFunctionRunner will be introduced for Python user-defined functions execution. Its main functionalities are:

  • Manage the Python VM
    • Launch the Python VM in a separate process or a docker container
    • Prepare the Python VM by sending the serialized Python user-defined functions to it
    • Teardown the Python VM when job finishes
  • Establish communication between JVM and Python VM and execute the Python user-defined functions
    • Data communication: It will send input data to Python VM and get the execution results from the Python VM
    • State communication: It receives the state requests from Python VM and send back the state responses to Python VM
    • Logging communication: It receives the logging requests from Python VM and performing logging
    • Metrics communication: It receives the metrics information from Python VM and integrate them into the Flink metrics

As we can see, to implement PythonUserDefinedFunctionRunner, we need to introduce a few building blocks, such as Python execution environment management, data service responsible for transferring data between JVM and Python VM, state service responsible for accessing state from Python VM, logging service, metric service, etc. 

A portability framework was introduced in Apache Beam in latest releases. It provides well-defined, language-neutral data structures and protocols between Beam SDK and runner, such as control service, data service, state service, logging service, etc. These data structures and protocols can be good candidates to implement the PythonUserDefinedFunctionRunner. 

In the following, we will introduce how to extend the data structures and protocols provided in Beam portability framework to implement PythonUserDefinedFunctionRunner.

PS: Please refer to Beam portability framework for details about the portability framework in Beam. It introduces the high-level design, the implementation details of the Beam portability framework and also the improvement requirements from Flink. The following design doc assumes the readers have a basic understanding of how Beam portability framework works.


The following diagram shows the architecture of the PythonUserDefinedFunctionRunner based on Beam portability framework.

The components in color are the components of Beam portability framework which should be extended:

  1. OutputReceiverFactoryImpl is the callback to receive execution results
  2. StateRequestHandler, MapStateImpl, ListStateImpl, etc are the callback to receive state requests
  3. MapState, ListState in Python SDK Harness are the state stub to access state
  4. UserDefinedFunctionOperation is responsible for user-defined function execution.

High Level Flow

The high-level flow could be summarized as two parts:

1) Initialize the Python execution environment:

  1. PythonUserDefinedFunctionRunner starts up the gRPC services, such as the data service, state service, logging service, metrics service, etc
  2. PythonUserDefinedFunctionRunner launches the Python execution environment in separate process or docker container
  3. The Python worker registers to PythonUserDefinedFunctionRunner
  4. PythonUserDefinedFunctionRunner send the user-defined functions to be executed to the Python worker
  5. The Python worker transforms the user-defined functions to Beam operations
  6. The Python worker establishes the gRPC connections, such as the data connection, state connection, logging connection, etc

2) Process the input elements:

  1. PythonUserDefinedFunctionRunner send the input elements via gRPC data service to Python worker for execution
  2. The Python user-defined function could access state via gRPC state service during execution
  3. The Python user-defined function could also aggregate the logging and metrics to PythonUserDefinedFunctionRunner via gRPC logging service and metrics service during execution
  4. The execution results could be sent to PythonUserDefinedFunctionRunner via gRPC data service

Prepare Python Execution Environment

PythonUserDefinedFunctionRunner needs to prepare the Python execution environment during the operator initialization phase, such as launching the Python VM, registering the Python user-defined functions to be executed, etc.

Interfaces JobBundleFactory and StageBundleFactory provided in Beam could be used to prepare the user-defined function execution environment. The interfaces are as follows:


* A factory that has all job-scoped information, and can be combined with stage-scoped information

* to create a {@link StageBundleFactory}.


* <p>Releases all job-scoped resources when closed.


public interface JobBundleFactory extends AutoCloseable {

 StageBundleFactory forStage(ExecutableStage executableStage);



* A bundle factory scoped to a particular {@link

*}, which has all of the resources

* it needs to provide new {@link RemoteBundle RemoteBundles}.


* <p>Closing a StageBundleFactory signals that the stage has completed and any resources bound to

* its lifetime can be cleaned up.


public interface StageBundleFactory extends AutoCloseable {

 /** Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. */

 RemoteBundle getBundle(

     OutputReceiverFactory outputReceiverFactory,

     StateRequestHandler stateRequestHandler,

     BundleProgressHandler progressHandler)

     throws Exception;

 ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor();


PS: The default implementation DefaultJobBundleFactory and SimpleStageBundleFactory could be good candidates for how to implement JobBundleFactory and StageBundleFactory.

After StageBundleFactory.getBundler() is called, the Python execution environment will be launched and the Python user-defined functions will be registered to the it:

  1. ExecutableStage defines all the information needed for the user-defined function execution, such as the execution environment, the user-defined functions to be executed, the input and output type serializers, etc.
  2. Currently four execution environments has already been supported: Docker, Process, External and Embedded.

ExecutableStage will be used during the execution environment initialization. It defines all the information needed to execute the user-defined functions. We could construct an ExecutableStage according to the Python user-defined functions to be executed.

User-Defined Function Execution

After the Python VM is initialized, PythonUserDefinedFunctionRunner is ready to process the inputs.

Data Transmission

Regarding data tran,  we can see from the interface StageBundleFactory that:

  1. A RemoteBundle will be returned during the execution environment initialization. It could be used to send inputs to the remote environment for execution.
  2. A OutputReceiverFactory is needed to be provided during the execution environment initialization. It’s a callback used to fetch the execution results from the remote environment.

At Java side, PythonUserDefinedFunctionRunner can make use of these data structures to send and receive data:

  1. To send data, PythonUserDefinedFunctionRunner will hold the instance of RemoteBundle which is created during Python execution environment initialization and send data with it when inputs come.
  2. To receive execution results, a Java OutputReceiverFactory implementation will be registered as the callback to receive the execution results.

At Python side, Beam portability framework provides a basic framework for Python user-defined function execution (Python SDK Harness). The Python framework provides a class BeamTransformFactory which transforms user-defined functions DAG to operation DAG. Each node in the operation DAG represents a processing node. DataInputOperation (which is responsible for fetching data from the remote side) and DataOutputOperation (which is responsible for sending data to the remote side) are responsible for receiving inputs and sending execution results separately.

We need to construct the user-defined function DAG by attaching an additional source node with URN “beam:source:runner:0.1” and an additional sink node with URN “beam:sink:runner:0.1” to the original DAG. The Python framework provided by Beam could transform the node with URN “beam:source:runner:0.1” to DataInputOperation and transform the node with URN “beam:sink:runner:0.1” to DataOutputOperation.

User-Defined Function Execution

As explained above, user-defined function DAG will be translated to Operation DAG for execution in Python framework. There are already a lot of Operations provided in Beam, but all of them are designed for the functions defined in Beam Python SDK. As the interfaces of user-defined functions in Flink are different from the functions defined in Beam, we need to define how to translate the Flink user-defined functions to Beam Operations.

The Python framework provides a pluggable way for user-defined function translation. Custom translators can be registered in BeamTransformFactory. We need to provide the custom translator and register it into BeamTransformFactory.


Regarding UDTF, it may produce zero or multiple results. As we know that, for Correlate node, for each input, the result is a collection of rows of the joined results of the original input row and the UDTF results. For one UDTF result, how could the PythonUserDefinedFunctionRunner know which input row it is corresponding to? There are two solutions:

  1. The Python worker packs the collection of results corresponding to one input row into a list and send them together to PythonUserDefinedFunctionRunner. If zero results is generated for one input row, an empty list will be returned.
  2. Adds a header to the original result: (header, result). The header represents whether the results corresponding to the current input is finished. The Python worker send the composed results to PythonUserDefinedFunctionRunner and there is no need to pack the results.



Solution 1

Implementation is simple

OOM is possible if the number of results for one input is very big

Solution 2

There is no need to worry about the number of results for one input

  1. Extra header is needed

Solution 1 is preferred for now as no extra header is needed. If the number of table function results for one input is so big that OOM is caused, users should increase the heap memory or optimizing the table function.

Bundle Processing 

Beam divides input elements into bundles and there are two points should be noted:

  1. The input elements are processed asynchronously in the SDK harness. This means that not all the input elements have been processed when watermark or checkpoint is encountered.
  2. State mutation may be cached in the SDK harness. This means that the state mutation may have not been sent back to the runner and persisted in the state backend when checkpoint is encountered.

We need to handle checkpoint and watermark in a proper way to solve the above problems.


We can see from the diagram of the architecture that the states accessed in the Python user-defined functions are managed by the state backend of the Flink operator. The following things should be done to make sure the checkpoint works well:

  1. The data buffered in the data service should be flushed to make sure all the input data before the checkpoint are processed and the execution results are persisted
  2. The state modification buffered in the state service should be flushed to make sure the state modification is persisted


There are two approaches to handle the watermark:

  1. Finish the current bundle and then emit the received watermark as the current watermark. Finishing the bundle ensures that all the items have been processed by the SDK harness and the results sent to the downstream operator.
  2. Hold on the received watermark and sent it out once the current bundle has been finished.

Approach 1 has better latency and approach 2 has better throughput. The Flink runner in Beam takes approach 2 for better performance and we can just take approach 2 too. We can support approach 1 in the future as an option if we find it necessary and make it configurable.


During user-defined function execution, it may produce a lot of logs for all kinds of purposes. Collecting the logs from the remote execution environment will be benefit for logging management.

Logging service has already been provided in Beam for log aggregation and we could benefit from it with nothing need to do:

  1. At Java side, a logging service GrpcLoggingService is already available in Beam and it has already been integrated into DefaultJobBundleFactory. It’s enabled by default if we use DefaultJobBundleFactory for Python execution environment setup.
  2. At Python side, FnApiLogRecordHandler has been provided in Beam and it has already been integrated into the Python framework. FnApiLogRecordHandler is responsible for forwarding the logs to the remote.


We can see from the interface StageBundleFactory that a BundleProgressHandler needs to be provided during the execution environment initialization.

public interface BundleProgressHandler {

 /** Handles a progress report from the bundle while it is executing. */

 void onProgress(ProcessBundleProgressResponse progress);

 /** Handles the bundle's completion report. */

 void onCompleted(ProcessBundleResponse response);

 /** Returns a handler that ignores metrics. */

 static BundleProgressHandler ignored() {

   return new BundleProgressHandler() {


     public void onProgress(ProcessBundleProgressResponse progress) {}


     public void onCompleted(ProcessBundleResponse response) {}




The BundleProgressHandler defines the callback of job execution progress. The data structure ProcessBundleProgressResponse and ProcessBundleResponse contain the metrics information collected in the remote environment.

To support user defined metrics, we need to do the following things:

  1. Provide a BundleProgressHandler implementation as the callback to forward the metrics collected in the remote environment to Flink metrics system.
  2. Support user-defined metrics in Python user-defined function:
    1. We need to translate metrics accesses in Python user-defined function to the metrics access in the Python framework. The translation also occurs during transforming user-defined function to Beam Operation.

Job Parameter

The job parameters will be serialized together with the user-defined functions and transferred to the Python execution environment during initialization. 

To support access job parameters in user-defined functions, we just need to provide a way to make the job parameters accessible in user-defined functions. This can be done by wrapping the job parameters in FunctionContext during translation the user-defined function to Beam Operation.

Resource Management

As the operator will launch separate Python process for Python user-defined function execution, we should make sure that the resource used by the Python process be accounted by Flink’s Resource Management framework.

Config the resource used by the Python process

We need to add a configuration “python.worker.memory.size” which indicates the maximum memory size for Python process. If multiple operators share the same Python worker, supposing n operators, the actual maximum memory size for the Python worker will be “n * python.worker.memory.size”.

Limit the Python process according to the resource configured

In Unix system, Python has provided a library “resource” which can be used to set a hard limit of the memory size for a Python process. We can use it to set the hard limit for the Python worker. 

Integrate the resource used by Python process with Flink’s resource management framework

The resource used by the Python process should also be added to the overall resource for an Operator. It makes sure that the operator doesn’t occupy resource exceeding the requested resource. The memory used by the Python process is considered as “native memory”.

For Python Table API jobs, if an operator contains Python use-defined function, it will be given a resource which is the origin resource + the resource used by the Python process.

Compatibility, Deprecation, and Migration Plan

  • This FLIP is a new feature and so there is no compatible issue with previous versions.

Implementation Plan

  1. Support the basic functionality of Python ScalarFunction
  2. Support chaining Python ScalarFunctions
  3. Python Execution Environment Management. For example, multiple operators can reuse the same Python SDK Harness.
  4. Python Dependency Management. The Python UDF may depend on third party dependencies, we should provide a proper way to handle it.
  5. Add a series of Java and Python Coders for all kinds of data types supported. The data encoded with Java coder should be able to decode with the corresponding Python coder, vice verse.
  6. Add cython support for udf execution. 
  7. Add validation check for places where Python ScalarFunction cannot be used 
  8. Support to use decorator syntax for Python functions
  9. Support the basic functionality of Python TableFunction
  10. Add rules to push down the Python ScalarFunctions contained in the join condition of Correlate node
  11. Add Python Correlate nodes merge rule
  12. Add user-defined metrics support
  13. Add documentation for Python user-defined functions
  • No labels