Discussion thread
Vote thread
JIRA

FLINK-14026 - Getting issue details... STATUS

Release1.10

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

For a Flink Table API & SQL job, if it uses Python user-defined functions, the Java operator will launch separate Python process for Python user-defined function execution. We should make sure that the resources used by the Python process are managed properly by Flink’s resource management framework. 


FLIP-49 has proposed a unified memory management framework and Python UDF resource management should be based on it. It will be considered during designing the resource management of the Python worker.

Public Interfaces

  • The following configurations will be introduced: “python.fn-execution.framework.memory.size” and “python.fn-execution.buffer.memory.size”.

Proposed Changes

Memory

Python worker

We will introduce the following configurations to represent the memory requirement of the Python worker:

  1) python.fn-execution.framework.memory.size (default 64MB)

It represents the framework overhead of the Python worker and it is non-trivial (about 60+ MB for Beam 2.15.0), so should be taken care of.

  2) python.fn-execution.buffer.memory.size (default 15MB: 1MB input queue + 1MB output queue + 10MB flush buffer + 3MB others)

It represents the size of the input buffer and output buffer.


The introduced configuration python.fn-execution.framework.memory.size and python.fn-execution.buffer.memory.size will be used to calculate the memory requirement of the Python worker.

As it allows to limit the maximum memory size of the process(resource.setrlimit) in Python, the memory used by the Python worker will be accounted as managed off-heap memory (refer to FLIP-49 for more details about managed on-heap/off-heap memory). The memory used by the Python worker will be reserved(via MemoryManager.reserveMemory which is introduced in FLIP-49) from the MemoryManager during the initialization phase of operator (in the open method of the operator).

During compilation phase, for batch jobs, the required memory of the Python worker will be calculated and set as the managed off-heap memory for the operator. For stream jobs, the resource spec will be unknown for now(The reason is that currently the resources for all the operators in stream jobs are unknown and it doesn’t support to configure both known and unknown resources in a single job). The required memory can be calculated by simply sum python.fn-execution.framework.memory.size and python.fn-execution.buffer.memory.size.

During operator initialization phase, the operator will check the memory allocated to it. If the managed off-heap memory that could be allocated from the MemoryManager can meet the requirement, it will reserve it from the MemoryManager. Otherwise, a warning message will be logged. However, it will still try to start up the Python worker to make the best efforts to execute the job. In this case, the resources used Python worker may exceed the resources allocated and may be killed the resource management framework.

Java operator

Regarding to the Java operator,  the memory requirement is very small, about 5MB (1MB input buffer(input buffer of Beam portability framework) + 1MB output buffer(result buffer of Beam portability framework) + 3MB others(operator input and output buffer)). So the memory used by the Java operator will be accounted as the task on-heap memory and there is also no configuration will be introduced for it at present unless we find it necessary.

Example

Let’s take the following example to show how the memory requirement is calculated. Suppose there are 3 operators chained together and 2 among them are Python operators.

The configuration is as follows:

table_config.get_configuration().set_string(“python.fn-execution.framework.memory.size”, “64m”)

table_config.get_configuration().set_string(“python.fn-execution.buffer.memory.size”, “15m”)

The memory of the Python operator and the Python worker will be as follows:

  1. The managed off-heap memory of the operator represents the number of managed off-heap memory reserved by the operator. 
  2. The memory of the Python worker represents the maximum memory that could be used by the Python worker.

Limit Python worker according to the resources constraint

In Unix system, the Python library “resource” could be used to set a hard limit of the memory size for a Python process. We can use it to set a hard limit for the Python worker. This could reduce the cases where YARN kill the operator because of the corresponding Python worker taking too much memory. It could also result in better error message if Python worker was killed because of taking too much memory.

Others

Python worker sharing

The Python UDF execution framework supports to share the same Python worker among multiple Java operators. It can be used to restrict the number of Python workers in a job. If multiple operators share the same Python worker, the framework overhead (python.fn-execution.framework.memory.size) will be calculated only once for each Python worker and the input/output buffer overhead (python.fn-execution.buffer.memory.size) depends on how many operators share the same Python worker.

If multiple operators share the same Python worker, the first operator which launches the Python worker is responsible for reserving the framework off-heap memory of the Python worker.

Compatibility, Deprecation, and Migration Plan

There is no compatibility issues as this is a new feature.

Implementation Plan

  1. Introduce configuration “python.fn-execution.framework.memory.size” and “python.fn-execution.buffer.memory.size” and update Python operator to reserve the managed off-heap memory from MemoryManager during initialization.
  2. Update Python worker to set the memory resource limit.