Status

Discussion threadhttps://lists.apache.org/thread/c7d2mt1vh8v11x2sl8slm4sy9j3t2pdg
Vote thread
JIRA

FLINK-25718 - Getting issue details... STATUS

Release1.15

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We have provided PyFlink Runtime framework to support Python user-defined functions since Flink 1.10. The PyFlink Runtime framework is called Process Mode, which depends on an inter-process communication architecture based on the Apache Beam Portability framework. Although starting a dedicated process to execute Python user-defined functions could have better resource isolation, it will bring greater resource and performance overhead.

In order to overcome the resource and performance problems on Process Mode, we will propose a new execution mode which executes Python user-defined functions in the same thread instead of a separate process.

Public Interfaces

Configuration

   @Experimental
    public static final ConfigOption<String> PYTHON_EXECUTION_MODE =
        ConfigOptions.key("python.execution-mode")
                .stringType()
                .defaultValue("process")
                .withDescription(
                        "Specify the python runtime execution mode. The optional values are `process`, `multi-thread` and `sub-interpreter`. "
                                + "The `process` mode means that the Python user-defined functions will be executed in separate Python process. "
                                + "The `multi-thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, but it will be affected by GIL performance. "
                                + "The `sub-interpreter` mode means that the Python user-defined functions will be executed in python different sub-interpreters rather than different threads of one interpreter, "
                                + "which can largely overcome the effects of the GIL, but it maybe fail in some CPython extensions libraries, such as numpy, tensorflow. "
                                + "Note that if the python operator dose not support `multi-thread` and `sub-interpreter` mode, we will still use `process` mode.");

We will introduce a new Python Configuration `python.execution-mode`,  which is used to specify the python runtime execution mode. The possible values are `process` and `thread`. The `process` mode means that the Python user-defined functions will be executed in a separate Python Process and it is the current PyFlink Runtime execution mode. The `thread` mode means that the Python user-defined functions will be executed in the same thread as Java Operator, which is the new execution mode we will discuss in this FLIP.

Proposed Changes

The architecture of Process Mode

In FLIP-58, we have introduced the architecture of the Process Mode in PyFlink Runtime. In order to better understand the Thread Mode introduced in the FLIP, let’s introduce the existing Process Mode architecture firstly.

As we can see from the architecture of Process Mode, the Python user-defined functions are executed in Python Worker which is run in a dedicated Process. The Java Operator Process communicates with the Python Worker Process using various Grpc Services.

PEMJA

Architecture

Before introducing Thread Mode, let’s introduce a library PEMJA firstly, which is the core to the architecture of PyFlink Runtime Thread Mode.

As we all know, Java Native Interface (JNI) is a standard programming interface for writing Java native methods and embedding the Java virtual machine into native applications. What’s more, CPython provides Python/C API to help embed Python in C Applications. 

So if we combine these two interfaces together, we can embed Python in Java Application. We can temporarily call this bridge library PEMJA (Python Embedded In Java). Since this library solves a general problem that Python and Java could call each other, we tend to open source PEMJA as an independent project, and then let pyflink depend on PEMJA.

As we can see from the architecture of PEMJA, JVM and PVM can call each other in the same process through PEMJA Library.

Firstly, PEMJA will start a daemon thread in JVM, which is responsible for initializing the Python Environment and creating a Python Main Interpreter owned by this process. The reason why PEMJA uses a dedicated thread to initialize Python Environment is to avoid potential deadlocks in Python Interpreter. Python Interpreter could deadlock when trying to acquire the GIL through methods such as PyGILState_* in Python/C API concurrently. It should be noted that PEMJA doesn’t call those methods directly, however, it may happen that third-party libraries may call them, e.g. numpy, etc. To get around this, we use a dedicated thread to initialize the Python Environment.  

Then, each Java worker thread can invoke the Python functions through the Python Sub Interpreter created from Python Main Interpreter. Each Python Sub Interpreter has its dedicated heap space, which means that each sub interpreter can execute Python functions in a dedicated namespace.

Comparison with Other Proposals

Framework

Principle

Limitations

Jython

Python compiler implemented in Java

  • Only supports for Python2

GraalVM

Truffle framework 

  • Compatibility issues with various Python ecological libraries
  • Replaces Hotspot VM with GraalVM

JPype

JNI + Python/C API

  • Don’t support Java calling Python
  • Only support for CPython

Jep

JNI + Python/C API

  • Only supports installing from source
  • Difficult to use as middleware
  • The framework overhead is big
  • Only support for CPython

PEMJA

JNI + Python/C API

  • Only support for CPython

In the table above, we list the comparison of other proposals and PEMJA. We will analyze them one by one.

Jython: Jython is a Python interpreter implemented in Java language. Because its implementation language is Java, the interoperability between code implemented by Python syntax and Java code will be very natural. But Jython does not support Python 3 anymore, and it is no longer maintained.

GraalVM: GraalVM takes use of Truffle framework to support interoperability between Python and Java. However, there are two problems.The first one is that it has some compatibility issues with various Python ecological libraries as many Python libraries rely on standard CPython to implement their C extensions. The other problem is that users need to replace the current Hotspot VM with GraalVM, which requires high migration costs.

JPype: Similar to PEMJA, JPype is also a framework built using JNI and Python/C API, but JPype only supports calling Java from Python.

Jep: Similar to PEMJA, Jep is also a framework built using JNI and Python/C API and it supports calling Python from Java. However, it has some problems in engineering implementation as the table shows and the performance is not so satisfactory.

PEMJA: Similar to Jep and JPype, PEMJA is built on CPython, so it cannot support other Python interpreters, such as PyPy, etc. Since CPython is the most used implementation and standard of Python Runtime officially provided by Python, most libraries of the Python ecology are basically built on CPython Runtime. 

Performance

Here, we compare the performance of different approaches using a Python UDF which converts words into upper case. There are two reasons why we choose `String.upper` UDF for the comparision:

  • The String type is the most commonly used data type in the Python UDFs I have collected from PyFlink jobs.
  • The calculation of `String.upper` is simple enough to better compare the overhead between these frameworks.

From the histogram of performance comparison, we can find the following two points

  • Compared with Jep and Jython, PEMJA has 8-20 times performance improvement
  • When the size of the string is large enough, the performance of PEMJA even exceeds the implementation of Java

The main reason for the performance of PEMAJA beyond Java when calling large strings is that the Python version of `String.upper` is implemented by the C language, so the performance is more efficient than the Java implementation. 

The architecture of Thread Mode

Architecture

The above figure shows the architecture of PyFlink Runtime Thread Mode. By replacing Grpc with PEMJA, Java Operator and Python Worker that were originally running in different processes can now run in the same process. 

Comparison with Process Mode

Execution Mode

Benefits

Limitations

Process Mode

  • Better resource isolation
  • Usage restrictions
  • IPC overhead
  • High implementation complexity

Thread Mode

  • Better performance
  • Non usage restrictions
  • Only support for CPython
  • Multiple jobs cannot use different Python interpreters in session mode

Process Mode: Java Operator sends(receives) batches of data to(from) Python Worker asynchronously on Process Mode, which makes it impossible for Python UDF to run on the same Node of JobGraph with other Java UDFs. So it limits the usage of Python UDF in some scenarios, such as cep, join, etc. In terms of performance, due to inter-process communication, there will be an additional process of serialization/deserialization.

Thread Mode: Unlike Process Mode which sends and receives data asynchronously in batches based on Apache Beam Portability, Thread Mode can be implemented as a synchronous mode, so there are no restrictions on use. In terms of performance, since Python and Java run in the same thread, there is no data serialization/deserialization, as well as the stage of copying and context switching between kernel space and user space. In addition to the limitation that PEMJA only supports CPython, Thread Mode has another limitation that multiple jobs cannot use different Python interpreters in session mode. This limitation comes from the fact that many Python libraries assume that they will only be initialized once in the process, so they use a lot of static variables. 

Performance

From the histogram of performance comparison, we can find the following two points:

  • When calculating small strings of data, the performance of Python UDF On PEMJA is about 40% of the performance of Java UDF, and it is 50% higher than the Python UDF on Grpc.
  • When the size of the string is large enough, the performance of Python UDF On PEMJA even exceeds the implementation of Java UDF.

Compatibility, Deprecation, and Migration Plan

Jobs can be migrated from `Process Mode` to `Thread Mode` by setting the newly added configuration `python.execution-mode`

Test Plan

Thread Mode will cover the original tests of Process Mode