Authors:  Dian Fu, Jincheng Sun

Discussion thread
Vote thread
JIRA

FLINK-17146 - Getting issue details... STATUS

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Pandas dataframe is the de-facto standard to work with tabular data in Python community. PyFlink table is Flink’s representation of the tabular data in Python language. It would be nice to provide the ability to convert between the PyFlink table and Pandas dataframe in PyFlink Table API which has the following benefits:

  • It provides users the ability to switch between PyFlink and Pandas seamlessly when processing data in Python language. Users could process data using one execution engine and switch to another seamlessly. For example, it may happen that users have already got a Pandas dataframe at hand and want to perform some expensive transformation of it. Then they could convert it to a PyFlink table and leverage the power of Flink engine. Users could also convert a PyFlink table to Pandas dataframe and perform transformation of it with the rich functionalities provided by the Pandas ecosystem.
  • No intermediate connectors are needed when converting between them.

Public Interfaces

We will introduce interfaces `from_pandas` in `TableEnvironment` and `to_pandas` in `Table`.

from_pandas

This interface is used to convert a Pandas dataframe to a PyFlink table. 

The signature of `from_pandas` is as following:

class TableEnvironment(object):

    def from_pandas(self, pdf: pd.DataFrame, schema: Union[RowType, List[str], Tuple[str], List[DataType], Tuple[DataType]] = None, splits_num: int = None) -> Table:

        pass


The argument `schema` is used to specify the schema of the result table:

  1. If it’s not specified, the schema of the result table will be inferred from the given Pandas dataframe.
  2. If it’s specified and the type is RowType, the specified type will be used as the schema of the result table.
  3. If it’s specified as list or tuple of str, the specified list or tuple will be used as the column names of the result table and the column types are inferred from the given Pandas dataframe.
  4. If it’s specified as list or tuple of DataTypes, the specified list or tuple will be used as the column types of the result table and the column names are inferred from the given Pandas dataframe.


The argument `splits_num` is used to specify the number of splits the given Pandas dataframe will be splitted into. If not specified, the default parallelism will be used. 

to_pandas

This interface is used to convert a PyFlink table to a Pandas dataframe.

The signature of `to_pandas` is as following:

class Table(object):

    def to_pandas(self) -> pd.DataFrame:

        pass

Proposed Changes

from_pandas

The basic workflow of `from_pandas` is as following:



  1. The Pandas dataframe will be splitted into multiple splits and serialized into Arrow format. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the serialization of the Pandas dataframe.
  2. The serialized bytes are used to construct Java ArrowTableSource. Inside Arrow source implementation, it will create an ArrowReader to deserialize the serialized bytes of the Pandas dataframe and each instance the source will only handle a subset of the input splits. The ArrowReader which has already been introduced for vectorized Python UDF could be reused for the deserialization.
  3. The ArrowTableSource could then be used to construct a Flink Table object.

ArrowTableSource

Although the length of the Pandas dataframe is limited, ArrowTableSource is designed to support both the streaming mode and batch mode. This enables users to create a PyFlink table which could run in both the streaming mode and the batch mode. Under streaming mode, it will handle the checkpoint properly(e.g. store the split ids which have already been processed into the state) to make sure that the execution results remain correct in case of job failover.

to_pandas

The basic workflow of `to_pandas` is as following:


  1. As we know that, Flink Table is just a logical representation and we have to execute it to retrieve the data which it stands for. So when `to_pandas` is called, a Flink Table API job will be constructed and executed which takes an ArrowTableSink as the sink of the Flink Table. 
  2. Inside the ArrowTableSink, it will serialize the result data into Arrow format data using ArrowWriter which has already been introduced for vectorized Python UDF. 
  3. During execution of the job, the result data of the table will be fetched to the client continuously. (Note: It shares the same design on how to retrieve the execution results as Table.collect which is still under discussion. This is just the implementation details and will not be discussed in this design and we can discuss it in the discussion about Table.collect. In this design we will only focus on Flink/Pandas conversion related things)
  4. The serialized data of the table will be used to construct the Pandas dataframe. The ArrowCoder which has already been introduced for vectorized Python UDF could be reused for the deserialization of the serialized data.

ArrowTableSink

ArrowTableSink is designed to support both the streaming mode and batch mode. This enables users to convert a PyFlink table which runs in both the batch mode and the streaming mode(the size of the source data is limited in this case) to Pandas dataframe. Under streaming mode, it will handle the checkpoint properly(the results will be buffered and only be sent to the client when a checkpoint is finished or job finished) to make sure that the execution results remain correct in case of job failover.

Example

# Suppose there is already a Pandas dataframe
pdf = xxx
...

row_type = DataTypes.ROW(

    [DataTypes.FIELD("f1", DataTypes.INT()),

     DataTypes.FIELD("f2", DataTypes.FLOAT()),

     DataTypes.FIELD("f3", DataTypes.BOOLEAN()),

     DataTypes.FIELD("f4", DataTypes.STRING())])

# convert the Pandas dataframe to PyFlink table
table = t_env.from_pandas(pdf, row_type)

# perform operations on the PyFlink table
filtered_table = table.filter("f1 > 0")

# convert the result PyFlink table to Pandas dataframe
filtered_pdf = filtered_table.to_pandas()

# perform operations using Pandas dataframe API

filtered_pdf.count()


Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

N/A

Rejected Alternatives

N/A