Authors: Shuiqiang Chen, Hequn Cheng, Jincheng Sun

Status

Discussion thread
Vote thread
JIRA

FLINK-18761 - Getting issue details... STATUS

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, SQL and Table API have been supported in pyflink, which provides much more convenience for users who are familiar with python programming language. However, users might request for more complicated processing operations which might need to access state and timer, etc. And for AI and ML engineers, there might not be strong demand for structural data query.  In this FLIP, we will introduce the python DataStream API which can make up for the above shortcomings.

Goals

  1. support python DataStream to read/write data from/to external storage(connectors).
  2. support configuring task and job configuration( get/set resources, parallelism, chaining strategy , etc.)
  3. support stateless data transformations, including map, flatmap, keyby,  etc.

NoneGoals

  1. The support for stateful data transformations, including ProcessFunction, RichFunction, Window, Join function will not be considered in this FLIP, but we have the overall design in this doc

Proposed Changes

Archetecture

In FLIP 38 Python Table API, the communication between Python VM process and JVM process is implemented by py4j RPC framework, by which there will only be wrapper classes  for all of java API and have no need to design a new set of python APIs. And in FLIP-58 Flink Python User-Defined Stateless Function for Table , we leverage the Beam framework to make the flink operator start a Beam Runner to launch a python process to run the python user defined function. The overall architecture is as below: 

We could apply the same architecture to support python DataStream APIs(such as DataStream, KeyedDataStream, MapFunction, ProcessFunction, etc.) by adding the corresponding wrapper classes for Java DataStream APIs in python. Some simple interface will be as below:

class DataStream(object): 

      

       def __init__(self, j_data_stream):

             self._j_data_stream = j_data_stream

      def get_parallelism():

             return self._j_data_stream.getParallelism()

And there are a lot of interfaces that require user to implement processing logic as a function like DataStream.map/flatmap/keyBy(). Take map() function for instance, user implements a function logic in python language, then passes it to the map() function:  

def my_map_func(value):

      return value + 1

ds.map(my_map_func)

We could also make use of the Beam framework to execute the user defined python processing function.

Implementation

In summary, the architure of python DataStream API has no much difference with SQL and Table API. But there are some more implementation detail to be mentioned.

Configuration

Task Configuration

DataStream API provides APIs to get or set specific configuration for current Stream. Python DataStream API would also privode corresponding API in the form of a wrapped up class which would send the get/set requests to Java API instance via py4j rpc framework. Take DataStream for instance:

class DataStream(object):

       def __init__(self, j_data_stream):

            self._j_data_stream = j_data_stream

      def set_parallelism(parallelism):        

            self._j_data_stream.setParallelism(parallelism)

            return self

      

       def set_resources(self, resource_spec):

             self._j_data_stream.getTransformation().setResources(resource_spec._j_resource_spec)

             return self 

       

      def slot_sharing_group(self, slot_sharing_group) 

            self._j_data_stream.slotSharingGroup(slot_sharing_group)

            return self


In the code block above,  set_resource_spec(ResourceSpec) require user to pass  ResourceSpec instance as parameters, this means the python DataStream API also need to provide the corresponding wrapped python classes.

Dependency management

When developing pyflink application, there might be some module consist of multiple files, or other third party dependencies. We also need to provide interfaces for user to add up these extract files in StreamExecutionEnvironment as configurations. The interfaces would be as bellow:

class StreamExectutionEnvironment(object):

       def add_python_file(self, file_path):

           pass

      

       def set_python_requirements(self, requirments_file_path, requirements_cache_dir=None):

           pass

      

      def add_python_archive(self, archive_path, target_dir=None):

           pass


When submitting a python flink data stream job through CLI Frontend, user can also use the following options which have already been supported currently to specify dependencies:

  • pyfs: attach custom python files for job.
  • pyarch: add python archive files for job.
  • pyreq: specify a requirements.txt file which defines the third-party dependencies.
  • pym: specify python module with the program entry point.

Stateless Functions

There are many transform operations in DataStream API require user to provide their implementation, such as  map()/flatmap()/keyBy(), etc. In python DataStream API, users can implement their processing logic in a python function and pass it to the interface. Take map() function for instance:

# implement a MapFunction

class MyMapFunction(MapFunction):

    def map(self, value):

        return value + 1

ds.map(MyMapFunction())

#a lambda function

ds.map(lambda x: x+1)

#directly pass a func

def map_func(value):
    return value + 1

ds.map(map_func)

Users has three approaches to defined their functions:

  1. implement a MapFunction provided by python DataStream API;
  2. pass a lambda function to the interface;
  3. directly pass a function.

As mentioned above, python DataStream API leverages the Beam portability framework to be able to execute user defined python function in flink runtime. When user gives a python function as a parameter to a datastream transformation interface, we will serialize the function object and pass it to a dedicated flink java operator which will launch a Beam runner and start a python process to execute user defined python function. Therefore, the submitted job graph might not be as the user specified logic. 

Types and SerDe

In the last section about supporting UDFs, we mentioned that python DataStream API can reuse the Python Table API framework. But there are some differences between them. SQL and Table API data formats are restricted by table Schema, so data between upstream and downstream operation can be SerDe by the given type information  in Schema. While DataStream does not have a strong data schema restriction that user might not care about the input and output data types when implementing their python functions. However, we need to serialize and deserialize data and ship data between operators correctly. In pyflink DataStream API, we tease out the following four scenarios:

  1. After executing a python function operation in upstream, there might be a java operation which has specified input and output type information. In this case, the upstream python operation needs to be aware of its output type information either by user specified data types or be inferred from the downstream operator so that the beam python harness can encode result data applying value coders corresponding to the InputTypeInformation of the next operator. 
  2. When upstream and downstream both are python operations, data will be transmitted in the form of primitive byte arrays between operators.  And the result data from python harness will be serialized/deserialized by pickled serializer. Therefore, we would introduce a new type information called PickledByteArrayTypeInformation to indicate that the data is in pickled primitive byte array format.
  3. As for the 3rd and 4th scenario, the downstream operation can obtain type information of upstream java operation from inputs. 


According to the illustration above, we mainly focus on the first and second scenarios. We would introduce a python class named Types to give a mapping for python types and java TypeInformations:

class Types(object):

   """

   This class gives access to the type information of the most common 

   types for which Flink has built-in serializers and comparators.

   """

   VOID = BasicTypeInfo.VOID_TYPE_INFO

   STRING = BasicTypeInfo.STRING_TYPE_INFO

   BYTE = BasicTypeInfo.BYTE_TYPE_INFO

   BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO

   SHORT = BasicTypeInfo.SHORT_TYPE_INFO

   INT = BasicTypeInfo.INT_TYPE_INFO

   LONG = BasicTypeInfo.LONG_TYPE_INFO

   FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO

   DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO

   CHAR = BasicTypeInfo.CHAR_TYPE_INFO

   BIG_INT = BasicTypeInfo.BIG_INT_TYPE_INFO

   BIG_DEC = BasicTypeInfo.BIG_DEC_TYPE_INFO

   SQL_DATE = SqlTimeTypeInfo.DATE

   SQL_TIME = SqlTimeTypeInfo.TIME

   SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP

   LOCAL_DATE = LocalTimeTypeInfo.LOCAL_DATE

   LOCAL_TIME = LocalTimeTypeInfo.LOCAL_TIME

   LOCAL_DATE_TIME = LocalTimeTypeInfo.LOCAL_DATE_TIME

   INSTANT = BasicTypeInfo.INSTANT_TYPE_INFO

   @staticmethod

   def ROW(types):

       """

       Returns type information for Row with fields of the given types. 

       A row itself must not benull.

       :param types the types of the row fields, e.g., Types.String(), Types.INT()

       """

       return RowTypeInfo(types)

   @staticmethod

   def ROW_NAMED(names, types):

       """

       Returns type information for Row with fields of the given types and with given names. A row

       must not be null.

       :param names array of field names.

       :param types array of field types.

       """

       return RowTypeInfo(types, names)

   @staticmethod

   def PRIMITIVE_ARRAY(element_type):

       """

       Returns type information for arrays of primitive type (such as byte[]). The array must not

       be null.

       :param element_type element type of the array (e.g. Types.BOOLEAN(), Types.INT(),

       Types.DOUBLE())

       """

       if element_type == Types.BOOLEAN:

           return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.BYTE:

           return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.SHORT:

           return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.INT:

           return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.LONG:

           return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.FLOAT:

           return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.DOUBLE:

           return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.CHAR:

           return PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()

       else:

           raise TypeError("Invalid element type for a primitive array.")

 Then user can specify output data types when implementing their python functions, take map() for instance:

data_stream.map(MyMapFunction(), data_types=Types.ROW([Types.String(), Types.INT()]))

Connectors

Furthermore, it is important to provide a connector API to read/write data from python objects or external storage systems.

Source

User can get a data stream from two kinds of sources:

  1. from python objects by calling from_collection(), etc.
  2. from an external data source by calling add_source() to add a connector. Currently, only built-in connectors are supported, such as Kafka, Cassandra, Kinesis, ElasticSearch, HDFS, RabbitMQ, Apache NiFi, Twitter Streaming API, Google PubSub, JDBC. We need to provide the corresponding wrapped classes in Java API. Take kafka connector for instance: 

Java code:

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "localhost:9092");

properties.setProperty("group.id", "test");

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));


in Python DataStream, users can add a kafka data source as below:

class FlinkKafkaConsumer010(object):

       def __init__(self, topic, deserialization_schema, properties):

             pass

properites = {}

properties[‘bootstrap.servers’] = ‘localhost:9092’

properties[‘group.id’] = ‘test’

ds  = env.add_source(FlinkKafkaConsumer010(‘my_topic’, SimpleStringSchema(), properties))

It requires the user to provide a DeserializationSchema which will be a wrapped class of Java API. Currently, we will only support some built-in simple DeserializationSchemas, such as:

  • SimpleStringSchema: Deserialize the bytes array data to a String object;
  • JsonRowDeserializationSchema: Deserialize the input json format data to a Row;
  • AvroRowDeserializationSchema: Deserialize the input avro format data to a Row;
  • CsvRowDeserializationSchema: Deserialize the input csv format data to a Row;

Take JsonRowDeserializationSchema for instance, at java side, users can get a deserializationSchema instance like the codes below:

TypeInformation<Row> rowSchema = Types.ROW_NAMED(

new String[] {"f1", "f2", "f3", "f4", "f5"},

Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME);

JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(rowSchema).build();

In pyflink DataStream API, we would make it in a similar approach to declare a row schema:

row_schema = Types.ROW_NAMED(

                                              ["f1", "f2", "f3", "f4", "f5"],

                                              [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME])

deserialization_schema = JsonRowDeserializationSchema.Builder(rowSchema).build()

env.add_source(FlinkKafkaConsumer010("topic", deserialization_schema, properties))

Sink

DataStream API supports writing data into a local file or external storage system:

  1. Call DataStream.print() to print the data stream to the console.
  2. call DataStream.add_sink() to add a sink connector, similar to source connectors,we will provide a wrapped class correspond to the java connector and require user to provide a DeserializationSchema, the kafka producer sink connector would be like as below:

row_schema = Types.ROW_NAMED(

                                                            ["f1", "f2", "f3", "f4", "f5"],

                                                            [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME])

serialization_schema = JsonRowSerializationSchema.Builder(rowSchema).build()

ds.add_sink(FlinkKafkaProducer010(‘topic’, serialization_schema, properties))

Public Interfaces

DataStream

First, we will add a new class named DataStream, and most of the methods corresponding to the DataStream API at java/scala side. And base on DataStream, after specific transformations, there are also several DataStream extensions to be mentioned:

       KeyedStream: represents a DataStream on which the operator state is partitioned by key using a provided KeySelector.

       IterativeStream: represents the start of an iteration in a DataStream.

The hierarchy diagram is as below:

class DataStream(object):

      """

      A DataStream represents a stream of elements of the same type. A 

      DataStream can be transformed into another DataStream by applying a

      transformation as for example:

      DataStream.map()

      DataStream.filter()

      """

       def get_parallelism(self):

       """

       Gets the parallelism for this operator.

       """

           pass

       def get_min_resources(self):

       """

       Gets the minimum resources for this operator.

       """

           pass

       def get_preferred_resources(self):
      """

       Gets the preferred resources for this operator.

       """

           pass

       

       def get_type(self):

       """

       Gets the type of the stream.

       """

           pass

       def get_execution_environment(self):

       """

       Returns the StreamExecutionEnvironment that was used to create

       this DataStream.

       """

           pass


       def get_execution_config(self):

                pass

       

       def name(self, name):

              pass

       

       def get_name(self):

             pass


       def uid(self):

             pass


       def set_uid_hash(self, hash):

              pass


       def disable_chaining(self):

              pass


       def start_new_chain(self):
              pass


       def slot_sharing_group(self, slot_sharing_group):

             pass


       def union(self, data_streams)

       """

       Creates a new DataStream by merging DataStream outputs of the

       same type with each other. The DataStream merged using this 

       operator will be transformed simultaneously.

       """

          pass

       def connect(self, data_stream)

           pass

       

      def key_by(self, key_selector):

           pass


     def broadcast(self):

           pass

     

     def shuffle(self):

           pass

      

     def forward(self):

           pass

       

     def rebalance(self):

           pass

       

     def rescale(self):

           pass

       

    def global(self):

           pass

      

    def iterate(self):

           pass

       

    def map(self, map_func):

           pass

 

    def flat_map(self, flat_map_func):

           pass

       

    def filter(self, filter_func):

           pass

      

     def add_sink(self, sink_function):

           pass

     

     def print(self)

           pass


    def partition_custom(self)

           pass

       


class KeyedStream(DataStream):

       """

    A KeyedStream represents a DataStream on which operator state is

    partitioned by key using a provided KeySelector. Typical operations

    supported by a DataStream are also possinble on a KeyedStream, with

    the exception of partitioning methods such as shuffle, forward and 

    key_by.

    Reduce-style operations, such as reduce, sum work on elements that

    have the same key.

    """

    

    def reduce(self, reducer):

        """

        Applies a reduce transformation on the grouped data stream

        grouped on by the given key position. The ReduceFunction will

        receive input values based on the key value. Only input values

        with the same key will go to the same reducer

        """

        pass


    def 

    

   ...


It is also important to add up job configuration and execution interfaces in StreamExecutionEnvironment( set_parallelism(), execute_async(), etc.) and all supported methods to get a DataStream from python object collection or external data source:

class StreamExecutionEnvironment(object):

       

       def from_collection(self, collection) -> DataStream:

           pass


      def read_text_file(self, file_path) → DataStream

             pass

  

       def generate_sequence(self, from, to) -> DataStream;

           pass

         

      def add_source(self, source_function) -> DataStream:

           pass

      

      def execute_async(self):

           pass

       

       def register_cache_file(self, file_path, file_name, executable):

           pass

      

      def get_cached_files(self):

           pass

StreamTableEnvironment

In Java Table API, users can get a Table from a DataStream or return a DataStream from a table, it should also be supported in python stream table environment :


class StreamTableEnvironment(TableEnvironment):

       

      def from_data_stream(self, data_stream, names=None):

           pass

     

     def to_append_stream(self):

           pass

       

     def to_retract_stream(self):
          pass


Function/MapFunction/FlatMapFunction


class Function(abc.ABC):

       """

    The base class for all user-defined functions.

    """
    pass


class MapFunction(ScalarFunction):

        

      @abc.abstractmethod

      def map(self, value):

          pass


class FlatMapFunction(ScalarFunction):

        

     @abc.abstractmethod

     def flat_map(self, value):

         pass

 

class FilterFunction(ScalarFunction):

    

    @abc.abstractmethod

    def filter(self, value):

        pass


class ReduceFunction(ScalarFunction):

     

     @abc.abstractmethod

     def reduce(self, value_1, value2):

         pass

Example

env = StreamExecutionEnvironment.get_execution_environment()

#create a DataStream from a python object list

ds = env.from_collection([('ab', 1), ('abc, 2), ('abcd', 3), ('abcde', 4)],

                                           type_info=Types.ROW([Types.STRING(), Types.INT()])

#set the parallelism of this operator to 3

ds.set_parallelism(3)

# keyed the DataStream with the second field of the record, then 

# transform the DataStream with a map function, finally, print the 

# transformation result to console. 

ds.key_by(lambda x: x[1]).map(lambda x: (x[0], x[1] + 1)).print()

env.execute()


To sum up, we have the following function list to be implemented:

category

functionality

Env

StreamExecutionEnvironment

configuration relevant method

StreamExecutionEnvironment(final Configuration configuration)

executeAsync

registerCachedFile/context.getCachedFile

Predefined Source

generateSequence

fromCollection

addSource

PredefinedSink

print()

addSink

DataStreamUtils(collect, reinterpretAsKeyedStream)

StreamingFileSink

Connectors

Kafka

Cassandra

Kinesis

Elasticsearch

Hadoop FileSystem

RabbitMQ

Apache NiFi

Twitter Streaming API 

Google PubSub 

JDBC

DataStream Operators

keyby

reduce

union

project

Partitioning

shuffle

rebalance

rescale

broadcast

Chaining & resource groups

startNewChain

disableChaining

slotSharingGroup

Functions

MapFunction

FlatMapFunction

FilterFunction

KeySelector

ReduceFunction


CoMapFunction

CoFlatMapFunction

CoProcessFunction

KeyedCoProcessFunction

TwoInputStreamOperator

Iterations

iterate

Types

Row Type (represented by tuple in Python, RowTypeInfo in Java)

Primitive Array Types

Basic Types

Implementation Plan

  1. Add DataStream class, and task configuration interface.
  2. Support creating a DataStream through python objects like  env.from_collection().
  3. Support map()/flat_map()/print(), etc.
  4. Support built-in connectors (Kafka, ElasticSearch, Kinese, etc.)
  5. Support key_by(), etc..