Discussion thread
Vote thread
JIRA

FLINK-16114 - Getting issue details... STATUS

Release1.11

Motivation

Scalar Python UDF has already been supported in Flink 1.10 (FLIP-58) and it operates one row at a time. It works in the way that the Java operator serializes one input row to bytes and sends them to the Python worker; the Python worker deserializes the input row and evaluates the Python UDF with it; the result row is serialized and sent back to the Java operator.

It suffers from the following problems:

  1. High serialization/deserialization overhead
  2. It’s difficult to leverage the popular Python libraries used by data scientists, such as Pandas, Numpy, etc which provide high-performance data structure and functions.

We want to introduce vectorized Python UDF to address this problem. For vectorized Python UDF, a batch of rows are transferred between JVM and Python VM in columnar format. The batch of rows will be converted to a collection of Pandas. Series and given to the vectorized Python UDF which could then leverage the popular Python libraries such as Pandas, Numpy, etc for the Python UDF implementation.


The performance gains could be got from the following aspects(including but not limited to):

  1. The serialization/deserialization at the Python worker could be eliminated
  2. Leverage the performance improvements there are already performed in Pandas
  3. The deserialization at the Java operator could also be deferred and even eliminated for Blink planner (see ColumnarRow section for more details)
  1. Support scalar vectorized Python UDF
  2. Support mix use of vectorized Python UDF, non-vectorized Python UDF and Java UDF

Public Interfaces

The general Python UDF and vectorized Python UDF are handled differently, e.g. for vectorized Python UDF, the inputs should be converted to a collection of pandas.Series before calling it. One way needs to be provided to declare whether a Python UDF is vectorized or not. 

The definition of vectorized Python UDF is similar to non-vectorized Python UDF except that the decorator name is “vectorized_udf” instead of “udf”.

@vectorized_udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())

def add(i, j):

   return i + j

The definition of vectorized Python UDF is similar to non-vectorized Python UDF except that the decorator name is “pandas_udf” instead of “udf”. Currently PySpark takes this option.

@pandas_udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())

def add(i, j):

   return i + j

The general Python UDF and the vectorized Python UDF share the same decorator name. However, we provide a parameter “udf_type” to let users declare a Python UDF as vectorized Python UDF.

The candidates of udf_type could be “general”(default value) or “vectorized”.

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type=”vectorized”)

def add(i, j):

   return i + j

This is the same as Option 3 except that the value of the udf_type could be “general”(default value) or “pandas”.

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type=”pandas”)

def add(i, j):

   return i + j


We tend to option 4 for now as there is no need to introduce new decorators and it’s convenient to add new kinds of Python UDFs support. Welcome any feedback on this.

Type Hints is a feature introduced since Python 3.5 (PEP 484).

Instead of introducing another property for the udf decorator, we could determine the Python UDF type according to its input/output type.

For example, for a generic Python UDF, it could be defined as following:

@udf

def add(i: int, j: int) -> int:

   return i + j


For a Pandas UDF, it could be defined as following (It seems that it’s more simple to use decorator for Pandas UDF):

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())

def add(i: pandas.Series, j: pandas.Series) -> pandas.Series:

   return i + j


We tend to support both the decorator and the type hints way.  

We will introduce config options “python.fn-execution.arrow.batch.size” and to control how to split the input data. This is further discussed in the following sections.

Proposed Changes

Scalar vectorized Python UDF takes one or more pandas.Series as the input arguments and returns one pandas.Series as output, e.g. the input arguments(i and j) of the following example are both pandas.Series and the returned type is also pandas.Series.

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type=”pandas”)

def add(i, j):

   return i + j


NOTE:

  1. All the input pandas.Series and the returned pandas.Series have the same length, e.g. the input arguments (i and j) and the returned result in the above example have the same length.
  2. It doesn’t support vectorized Python UDFs without arguments and vectorized Python UDFs which only have constants arguments in the initial version as it needs to figure out a way to tell the UDF the input length.
  3. The constants arguments of vectorized Python UDF will be given to it as a regular Python object and it will not be converted to pandas.Series.
  4. It may support taking pandas.DataFrame as the input type and output type in the future, e.g. when supporting map in the Python Table API. However, it’s out of the scope of this design doc.

Vectorized Python UDF shares a lot of optimization strategies with the general Python UDF, e.g. executing multiple Python UDFs which have no dependencies between each other in one batch, executing multiple Python UDFs which are chained together in one batch, etc. More details about these optimizations could be found in the PythonCalcSplitRule. Besides, it should be noted that general Python UDFs and vectorized Python UDFs could not be executed in the same batch as the input data type is quite different between them. Rules will be introduced to separate them if both kinds of Python UDFs exist.



The vectorized Python UDFs are executed as following:

  1. The Java operator buffers the input rows and serializes the input rows to columnar format(Arrow memory format)
  2. The Arrow memory format input data are sent to the Python worker
  3. The Python worker converts the Arrow memory format input data to collection of pandas. Series and then calling the vectorized Python UDF
  4. The generated result is of type pandas. Series and will be converted to Arrow memory format
  5. The Arrow memory format result data are sent back to the Java operator
  6. The Java operator deserializes the Arrow memory format result data to Row/BaseRow and sent to the downstream operator. 


We will describe the details in the following sections.

The Java operator will not buffer all the input rows before sending them to the Python worker for execution. Instead, the input rows will be split into batches and then transferred to the vectorized Python UDF for execution. The number of rows in a batch is non-deterministic and so the vectorized Python UDF implementation should not depend on the batch size.

We’d like to introduce the config option “python.fn-execution.arrow.batch.size” for the vectorized Python UDF execution. It divides the input rows into multiple batches of input rows. The number of input rows belonging to each batch doesn’t exceed the value defined by “python.fn-execution.arrow.batch.size”.

There is already a config option “python.fn-execution.bundle.size”. As we know that the Python UDF is executed asynchronously and we need to make sure that buffered inputs and the buffered execution results flushed back to the Java operator when checkpoint barrier is received. The config option “python.fn-execution.bundle.size” is used to force the execution of the buffered input elements and retrieve the cached execution results every N inputs. It could ensure that the buffered inputs are limited and the checkpoint barrier could be processed in a limited time. However, “python.fn-execution.arrow.batch.size” is used for another purpose, it’s used config the number of elements in an Arrow batch and it’s introduced to be tuned for performance purpose for vectorized Python UDF. The config option “python.fn-execution.arrow.batch.size” and “python.fn-execution.bundle.size” are orthogonal. However, it should be noted that usually “python.fn-execution.arrow.batch.size” should not be lower than “python.fn-execution.bundle.size”. Otherwise, the max number of inputs in an Arrow batch will be no more than “python.fn-execution.bundle.size”.


Regarding the default value of “python.fn-execution.arrow.batch.size”, we have performed a simple test with Pandas UDF: lambda i: i +1, the performance reaches the peak value when the “python.fn-execution.arrow.batch.size” is set to about 10000. This value may change according to what the Pandas UDF does, the data types of the inputs/outputs, etc. However, it definitely means that we should adjust the default value(FLINK-15971) of “python.fn-execution.bundle.size” which is currently 1000.

Apache Arrow is a cross-language in-memory columnar data format that’s widely used in many notable projects, such as Pandas, Spark, Parquet, etc. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware.


We can make use of Apache Arrow as the exchange format between the JVM and the Python VM:

  1. The input rows need to be converted to a collection of Pandas. Series before calling the vectorized Python UDF. Apache Arrow has provided convenient zero-copy methods to convert between the arrow data structure pyarrow. Array and the pandas data structure Pandas.Series.
  2. Once the data is Arrow memory format, there is no need to serialize it anymore and can be sent directly, e.g. the execution results of vectorized Python UDF is Arrow memory format and can be sent to the JVM process directly.

In Flink, the data type of columns is defined as DataType. Arrow has also defined ArrowType for the data type of columns. 

The mapping between DataType in Flink and ArrowType in Arrow is summarized as following:

Flink DataType

Flink LogicalType

ArrowType

AtomicDataType

BooleanType

ArrowType.Bool

TinyIntType

ArrowType.Int(1 * 8, true)

SmallIntType

ArrowType.Int(2 * 8, true)

IntType

ArrowType.Int(4 * 8, true)

BigIntType

ArrowType.Int(8 * 8, true)

FloatType

ArrowType.FloatingPoint(SINGLE)

DoubleType

ArrowType.FloatingPoint(DOUBLE)

DateType

ArrowType.Date(DateUnit.DAY)

TimeType(precision)

Precision:
0: ArrowType.Time(TimeUnit.SECOND, 32)

1 - 3: ArrowType.Time(TimeUnit.MILLISECOND, 32)

4 - 6: ArrowType.Time(TimeUnit.MICROSECOND, 64)

7 - 9: ArrowType.Time(TimeUnit.NANOSECOND, 64)

TimestampType

Precision:

0: ArrowType.Timestamp(TimeUnit.SECOND, null)

1 - 3: ArrowType.Timestamp(TimeUnit.MILLISECOND, null)

4 - 6: ArrowType.Timestamp(TimeUnit.MICROSECOND, null)

7 - 9: ArrowType.Timestamp(TimeUnit.NANOSECOND, null)

LocalZonedTimestampType(precision)

Precision:

0: ArrowType.Timestamp(TimeUnit.SECOND, null)

1 - 3: ArrowType.Timestamp(TimeUnit.MILLISECOND, null)

4 - 6: ArrowType.Timestamp(TimeUnit.MICROSECOND, null)

7 - 9: ArrowType.Timestamp(TimeUnit.NANOSECOND, null)

ZonedTimestampType

Precision:

0: ArrowType.Timestamp(TimeUnit.SECOND, timezone)

1 - 3: ArrowType.Timestamp(TimeUnit.MILLISECOND, timezone)

4 - 6: ArrowType.Timestamp(TimeUnit.MICROSECOND, timezone)

7 - 9: ArrowType.Timestamp(TimeUnit.NANOSECOND, timezone)

DecimalType(precision, scale)

ArrowType.Decimal(precision, scale)

CharType / VarcharType

ArrowType.Utf8

BinaryType / VarBinaryType

ArrowType.Binary

DateTimeIntervalType

ArrowType.Interval(IntervalUnit.DAY_TIME)

YearMonthIntervalType

ArrowType.Interval(IntervalUnit.YEAR_MONTH)

NullType

ArrowType.Null

RawType

N/A

TypeInformationRawType

N/A

SymbolType

N/A

KeyValueDataType

MapType

ArrowType.Map

CollectionDataType

ArrayType

ArrowType.List

MultisetType

ArrowType.List

FieldsDataType

RowType

ArrowType.Struct

StructuredType

ArrowType.Struct


DistinctType(logicalType)

ArrowType of logicalType

We will introduce ArrowWriter and ArrowFieldWriter to handle the serialization of input elements. 

ArrowFieldWriter is responsible for the serialization of a field. There is an ArrowFieldWriter implementation for each kind of type. For example, BigIntArrowFieldWriter will be introduced for BigIntType. Internally, each ArrowFieldWriter has an Arrow ValueVector which holds the serialized value in Arrow memory format, e.g. an Arrow BigIntVector is held in BigIntArrowFieldWriter.

Example:

public final class BigIntArrowFieldWriter extends ArrowFieldWriter {

  public BigIntArrowFieldWriter(BigIntVector bigIntVector) {

     super(bigIntVector);

  }

  @Override

  public void doWrite(Object value) {

     if (value == null) {

        ((BigIntVector) valueVector).setNull(count);

     } else {

        ((BigIntVector) valueVector).setSafe(count, (long) value);

     }

  }

}


ArrowWriter is responsible for the serialization of a row. Internally it has a collection of ArrowFieldWriter. ArrowWriter will delegate the serialization of each column of the row to the corresponding ArrowFieldWriter.

Similarly, ArrowReader and a series of ArrowFieldReader will be introduced to deserialize the execution results of vectorized Python UDF execution results as the execution results are organized in Arrow memory format.

It has been mentioned that the input rows need to be converted to a collection of Pandas.Series before calling the vectorized Python UDF after the Python worker received the input data. PyArrow has provided convenient zero-copy methods to convert between the Arrow data structure pyarrow.Array and the Pandas data structure Pandas.Series: pyarrow.Array.from_pandas/pyarrow.Array.to_pandas.

For scalar vectorized Python UDF, it could assume that the input arguments are a collection of pandas.Series and it also must produce a pandas.Series as the execution result. 

The Java operator deserializes the Arrow memory format result data to Row/BaseRow. For Blink planner, ColumnarRow will be used to hold the execution result. ColumnarRow is just a row view of columnar format data and the deserialization is postponed when actual data access is needed. This may eliminate unnecessary deserialization overhead in certain scenarios, e.g. when there is a filter operation at the downstream operator which may filter out a few rows.

Implementation Plan

  1. Introduce ArrowWriter and ArrowFieldWriter which is responsible for serializing the input data to Arrow memory format.
  2. Introduce ArrowReader and ArrowFieldReader which is responsible for deserializing the execution results which is of Arrow memory format.
  3. Introduce ArrowCoder in Python which is responsible for converting data between Arrow data structure and Pandas data structure
  4. Introduce ArrowPythonScalarFunctionOperator which is responsible for scalar vectorized Python UDF execution. It serializes the input data to Arrow memory format, sends them to the Python worker for execution, retrieves the execution results which are of Arrow memory format, deserializes them and sends the results to the downstream operator.
  5. Support mix use of vectorized Python UDF and general Python UDF

Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

N/A

Rejected Alternatives

N/A