Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Authors: Shuiqiang Chen, Hequn Cheng, Jincheng Sun

Status

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18761

Release1.12


...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, SQL and Table API have been supported in pyflink, which provides much more convenience for users who are familiar with python programming language. However, users might request for more complicated processing operations which might need to access state and timer, etc. And for AI and ML engineers, there might not be strong demand for structural data query.  In this FLIP, we will introduce the python DataStream API which can make up for the above shortcomings.

Goals

  1. support python DataStream to read/write data from/to external storage(connectors).
  2. support configuring task and job configuration( get/set resources, parallelism, chaining strategy , etc.)
  3. support stateless data transformations, including map, flatmap, keyby,  etc.

...

  1. The support for stateful data transformations, including ProcessFunction, RichFunction, Window, Join function will not be considered in this FLIP, but we have the overall design in this doc

Proposed Changes

Archetecture

In FLIP 38 Python Table API, the communication between Python VM process and JVM process is implemented by py4j RPC framework, by which there will only be wrapper classes  for all of java API and have no need to design a new set of python APIs. And in FLIP-58 Flink Python User-Defined Stateless Function for Table , we leverage the Beam framework to make the flink operator start a Beam Runner to launch a python process to run the python user defined function. The overall architecture is as below: 

...

We could also make use of the Beam framework to execute the user defined python processing function.

Implementation

In summary, the architure of python DataStream API has no much difference with SQL and Table API. But there are some more implementation detail to be mentioned.

Configuration

Task Configuration

DataStream API provides APIs to get or set specific configuration for current Stream. Python DataStream API would also privode corresponding API in the form of a wrapped up class which would send the get/set requests to Java API instance via py4j rpc framework. Take DataStream for instance:

...

In the code block above,  set_resource_spec(ResourceSpec) require user to pass  ResourceSpec instance as parameters, this means the python DataStream API also need to provide the corresponding wrapped python classes.

Dependency management

When developing pyflink application, there might be some module consist of multiple files, or other third party dependencies. We also need to provide interfaces for user to add up these extract files in StreamExecutionEnvironment as configurations. The interfaces would be as bellow:

...

  • pyfs: attach custom python files for job.
  • pyarch: add python archive files for job.
  • pyreq: specify a requirements.txt file which defines the third-party dependencies.
  • pym: specify python module with the program entry point.

Stateless Functions

There are many transform operations in DataStream API require user to provide their implementation, such as  map()/flatmap()/keyBy(), etc. In python DataStream API, users can implement their processing logic in a python function and pass it to the interface. Take map() function for instance:

...

As mentioned above, python DataStream API leverages the Beam portability framework to be able to execute user defined python function in flink runtime. When user gives a python function as a parameter to a datastream transformation interface, we will serialize the function object and pass it to a dedicated flink java operator which will launch a Beam runner and start a python process to execute user defined python function. Therefore, the submitted job graph might not be as the user specified logic. 

Types and SerDe

In the last section about supporting UDFs, we mentioned that python DataStream API can reuse the Python Table API framework. But there are some differences between them. SQL and Table API data formats are restricted by table Schema, so data between upstream and downstream operation can be SerDe by the given type information  in Schema. While DataStream does not have a strong data schema restriction that user might not care about the input and output data types when implementing their python functions. However, we need to serialize and deserialize data and ship data between operators correctly. In pyflink DataStream API, we tease out the following four scenarios:

...

data_stream.map(MyMapFunction(), data_types=Types.ROW([Types.String(), Types.INT()]))

Connectors

Furthermore, it is important to provide a connector API to read/write data from python objects or external storage systems.

Source

User can get a data stream from two kinds of sources:

...

row_schema = Types.ROW_NAMED(

                                              ["f1", "f2", "f3", "f4", "f5"],

                                              [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME])

deserialization_schema = JsonRowDeserializationSchema.Builder(rowSchema).build()

env.add_source(FlinkKafkaConsumer010("topic", deserialization_schema, properties))

Sink

DataStream API supports writing data into a local file or external storage system:

...

row_schema = Types.ROW_NAMED(

                                                            ["f1", "f2", "f3", "f4", "f5"],

                                                            [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME])

serialization_schema = JsonRowSerializationSchema.Builder(rowSchema).build()

ds.add_sink(FlinkKafkaProducer010(‘topic’, serialization_schema, properties))

Public Interfaces

DataStream

First, we will add a new class named DataStream, and most of the methods corresponding to the DataStream API at java/scala side. And base on DataStream, after specific transformations, there are also several DataStream extensions to be mentioned:

...

class StreamExecutionEnvironment(object):

       

       def from_collection(self, collection) -> DataStream:

           pass


      def read_text_file(self, file_path) → DataStream

             pass

  

       def generate_sequence(self, from, to) -> DataStream;

           pass

         

      def add_source(self, source_function) -> DataStream:

           pass

      

      def execute_async(self):

           pass

       

       def register_cache_file(self, file_path, file_name, executable):

           pass

      

      def get_cached_files(self):

           pass

StreamTableEnvironment

In Java Table API, users can get a Table from a DataStream or return a DataStream from a table, it should also be supported in python stream table environment :

...

class StreamTableEnvironment(TableEnvironment):

       

      def from_data_stream(self, data_stream, names=None):

           pass

     

     def to_append_stream(self):

           pass

       

     def to_retract_stream(self):
          pass


Function/MapFunction/FlatMapFunction


class Function(abc.ABC):

       """

    The base class for all user-defined functions.

    """
    pass


class MapFunction(ScalarFunction):

        

      @abc.abstractmethod

      def map(self, value):

          pass


class FlatMapFunction(ScalarFunction):

        

     @abc.abstractmethod

     def flat_map(self, value):

         pass

 

class FilterFunction(ScalarFunction):

    

    @abc.abstractmethod

    def filter(self, value):

        pass


class ReduceFunction(ScalarFunction):

     

     @abc.abstractmethod

     def reduce(self, value_1, value2):

         pass

Example

env = StreamExecutionEnvironment.get_execution_environment()

#create a DataStream from a python object list

ds = env.from_collection([('ab', 1), ('abc, 2), ('abcd', 3), ('abcde', 4)],

                                           type_info=Types.ROW([Types.STRING(), Types.INT()])

#set the parallelism of this operator to 3

ds.set_parallelism(3)

# keyed the DataStream with the second field of the record, then 

# transform the DataStream with a map function, finally, print the 

# transformation result to console. 

ds.key_by(lambda x: x[1]).map(lambda x: (x[0], x[1] + 1)).print()

env.execute()

...

category

functionality

Env

StreamExecutionEnvironment

configuration relevant method

StreamExecutionEnvironment(final Configuration configuration)

executeAsync

registerCachedFile/context.getCachedFile

Predefined Source

generateSequence

fromCollection

addSource

PredefinedSink

print()

addSink

DataStreamUtils(collect, reinterpretAsKeyedStream)

StreamingFileSink

Connectors

Kafka

Cassandra

Kinesis

Elasticsearch

Hadoop FileSystem

RabbitMQ

Apache NiFi

Twitter Streaming API 

Google PubSub 

JDBC

DataStream Operators

keyby

reduce

union

project

Partitioning

shuffle

rebalance

rescale

broadcast

Chaining & resource groups

startNewChain

disableChaining

slotSharingGroup

Functions

MapFunction

FlatMapFunction

FilterFunction

KeySelector

ReduceFunction


CoMapFunction

CoFlatMapFunction

CoProcessFunction

KeyedCoProcessFunction

TwoInputStreamOperator

Iterations

iterate

Types

Row Type (represented by tuple in Python, RowTypeInfo in Java)

Primitive Array Types

Basic Types

Implementation Plan

  1. Add DataStream class, and task configuration interface.
  2. Support creating a DataStream through python objects like  env.from_collection().
  3. Support map()/flat_map()/print(), etc.
  4. Support built-in connectors (Kafka, ElasticSearch, Kinese, etc.)
  5. Support key_by(), etc..