Authors: Shuiqiang Chen, Hequn Cheng, Jincheng Sun


Discussion thread
Vote thread

FLINK-18761 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Currently, SQL and Table API have been supported in pyflink, which provides much more convenience for users who are familiar with python programming language. However, users might request for more complicated processing operations which might need to access state and timer, etc. And for AI and ML engineers, there might not be strong demand for structural data query.  In this FLIP, we will introduce the python DataStream API which can make up for the above shortcomings.


  1. support python DataStream to read/write data from/to external storage(connectors).
  2. support configuring task and job configuration( get/set resources, parallelism, chaining strategy , etc.)
  3. support stateless data transformations, including map, flatmap, keyby,  etc.


  1. The support for stateful data transformations, including ProcessFunction, RichFunction, Window, Join function will not be considered in this FLIP, but we have the overall design in this doc

Proposed Changes


In FLIP 38 Python Table API, the communication between Python VM process and JVM process is implemented by py4j RPC framework, by which there will only be wrapper classes  for all of java API and have no need to design a new set of python APIs. And in FLIP-58 Flink Python User-Defined Stateless Function for Table , we leverage the Beam framework to make the flink operator start a Beam Runner to launch a python process to run the python user defined function. The overall architecture is as below: 

We could apply the same architecture to support python DataStream APIs(such as DataStream, KeyedDataStream, MapFunction, ProcessFunction, etc.) by adding the corresponding wrapper classes for Java DataStream APIs in python. Some simple interface will be as below:

class DataStream(object): 


       def __init__(self, j_data_stream):

             self._j_data_stream = j_data_stream

      def get_parallelism():

             return self._j_data_stream.getParallelism()

And there are a lot of interfaces that require user to implement processing logic as a function like Take map() function for instance, user implements a function logic in python language, then passes it to the map() function:  

def my_map_func(value):

      return value + 1

We could also make use of the Beam framework to execute the user defined python processing function.


In summary, the architure of python DataStream API has no much difference with SQL and Table API. But there are some more implementation detail to be mentioned.


Task Configuration

DataStream API provides APIs to get or set specific configuration for current Stream. Python DataStream API would also privode corresponding API in the form of a wrapped up class which would send the get/set requests to Java API instance via py4j rpc framework. Take DataStream for instance:

class DataStream(object):

       def __init__(self, j_data_stream):

            self._j_data_stream = j_data_stream

      def set_parallelism(parallelism):        


            return self


       def set_resources(self, resource_spec):


             return self 


      def slot_sharing_group(self, slot_sharing_group) 


            return self

In the code block above,  set_resource_spec(ResourceSpec) require user to pass  ResourceSpec instance as parameters, this means the python DataStream API also need to provide the corresponding wrapped python classes.

Dependency management

When developing pyflink application, there might be some module consist of multiple files, or other third party dependencies. We also need to provide interfaces for user to add up these extract files in StreamExecutionEnvironment as configurations. The interfaces would be as bellow:

class StreamExectutionEnvironment(object):

       def add_python_file(self, file_path):



       def set_python_requirements(self, requirments_file_path, requirements_cache_dir=None):



      def add_python_archive(self, archive_path, target_dir=None):


When submitting a python flink data stream job through CLI Frontend, user can also use the following options which have already been supported currently to specify dependencies:

  • pyfs: attach custom python files for job.
  • pyarch: add python archive files for job.
  • pyreq: specify a requirements.txt file which defines the third-party dependencies.
  • pym: specify python module with the program entry point.

Stateless Functions

There are many transform operations in DataStream API require user to provide their implementation, such as  map()/flatmap()/keyBy(), etc. In python DataStream API, users can implement their processing logic in a python function and pass it to the interface. Take map() function for instance:

# implement a MapFunction

class MyMapFunction(MapFunction):

    def map(self, value):

        return value + 1

#a lambda function x: x+1)

#directly pass a func

def map_func(value):
    return value + 1

Users has three approaches to defined their functions:

  1. implement a MapFunction provided by python DataStream API;
  2. pass a lambda function to the interface;
  3. directly pass a function.

As mentioned above, python DataStream API leverages the Beam portability framework to be able to execute user defined python function in flink runtime. When user gives a python function as a parameter to a datastream transformation interface, we will serialize the function object and pass it to a dedicated flink java operator which will launch a Beam runner and start a python process to execute user defined python function. Therefore, the submitted job graph might not be as the user specified logic. 

Types and SerDe

In the last section about supporting UDFs, we mentioned that python DataStream API can reuse the Python Table API framework. But there are some differences between them. SQL and Table API data formats are restricted by table Schema, so data between upstream and downstream operation can be SerDe by the given type information  in Schema. While DataStream does not have a strong data schema restriction that user might not care about the input and output data types when implementing their python functions. However, we need to serialize and deserialize data and ship data between operators correctly. In pyflink DataStream API, we tease out the following four scenarios:

  1. After executing a python function operation in upstream, there might be a java operation which has specified input and output type information. In this case, the upstream python operation needs to be aware of its output type information either by user specified data types or be inferred from the downstream operator so that the beam python harness can encode result data applying value coders corresponding to the InputTypeInformation of the next operator. 
  2. When upstream and downstream both are python operations, data will be transmitted in the form of primitive byte arrays between operators.  And the result data from python harness will be serialized/deserialized by pickled serializer. Therefore, we would introduce a new type information called PickledByteArrayTypeInformation to indicate that the data is in pickled primitive byte array format.
  3. As for the 3rd and 4th scenario, the downstream operation can obtain type information of upstream java operation from inputs. 

According to the illustration above, we mainly focus on the first and second scenarios. We would introduce a python class named Types to give a mapping for python types and java TypeInformations:

class Types(object):


   This class gives access to the type information of the most common 

   types for which Flink has built-in serializers and comparators.


   VOID = BasicTypeInfo.VOID_TYPE_INFO


   BYTE = BasicTypeInfo.BYTE_TYPE_INFO



   INT = BasicTypeInfo.INT_TYPE_INFO

   LONG = BasicTypeInfo.LONG_TYPE_INFO



   CHAR = BasicTypeInfo.CHAR_TYPE_INFO



   SQL_DATE = SqlTimeTypeInfo.DATE

   SQL_TIME = SqlTimeTypeInfo.TIME


   LOCAL_DATE = LocalTimeTypeInfo.LOCAL_DATE

   LOCAL_TIME = LocalTimeTypeInfo.LOCAL_TIME




   def ROW(types):


       Returns type information for Row with fields of the given types. 

       A row itself must not benull.

       :param types the types of the row fields, e.g., Types.String(), Types.INT()


       return RowTypeInfo(types)


   def ROW_NAMED(names, types):


       Returns type information for Row with fields of the given types and with given names. A row

       must not be null.

       :param names array of field names.

       :param types array of field types.


       return RowTypeInfo(types, names)


   def PRIMITIVE_ARRAY(element_type):


       Returns type information for arrays of primitive type (such as byte[]). The array must not

       be null.

       :param element_type element type of the array (e.g. Types.BOOLEAN(), Types.INT(),



       if element_type == Types.BOOLEAN:

           return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.BYTE:

           return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.SHORT:

           return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.INT:

           return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.LONG:

           return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.FLOAT:

           return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.DOUBLE:

           return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO()

       elif element_type == Types.CHAR:

           return PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO()


           raise TypeError("Invalid element type for a primitive array.")

 Then user can specify output data types when implementing their python functions, take map() for instance:, data_types=Types.ROW([Types.String(), Types.INT()]))


Furthermore, it is important to provide a connector API to read/write data from python objects or external storage systems.


User can get a data stream from two kinds of sources:

  1. from python objects by calling from_collection(), etc.
  2. from an external data source by calling add_source() to add a connector. Currently, only built-in connectors are supported, such as Kafka, Cassandra, Kinesis, ElasticSearch, HDFS, RabbitMQ, Apache NiFi, Twitter Streaming API, Google PubSub, JDBC. We need to provide the corresponding wrapped classes in Java API. Take kafka connector for instance: 

Java code:

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "localhost:9092");

properties.setProperty("", "test");

DataStream<String> stream = env

.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));

in Python DataStream, users can add a kafka data source as below:

class FlinkKafkaConsumer010(object):

       def __init__(self, topic, deserialization_schema, properties):


properites = {}

properties[‘bootstrap.servers’] = ‘localhost:9092’

properties[‘’] = ‘test’

ds  = env.add_source(FlinkKafkaConsumer010(‘my_topic’, SimpleStringSchema(), properties))

It requires the user to provide a DeserializationSchema which will be a wrapped class of Java API. Currently, we will only support some built-in simple DeserializationSchemas, such as:

  • SimpleStringSchema: Deserialize the bytes array data to a String object;
  • JsonRowDeserializationSchema: Deserialize the input json format data to a Row;
  • AvroRowDeserializationSchema: Deserialize the input avro format data to a Row;
  • CsvRowDeserializationSchema: Deserialize the input csv format data to a Row;

Take JsonRowDeserializationSchema for instance, at java side, users can get a deserializationSchema instance like the codes below:

TypeInformation<Row> rowSchema = Types.ROW_NAMED(

new String[] {"f1", "f2", "f3", "f4", "f5"},


JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(rowSchema).build();

In pyflink DataStream API, we would make it in a similar approach to declare a row schema:

row_schema = Types.ROW_NAMED(

                                              ["f1", "f2", "f3", "f4", "f5"],

                                              [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME])

deserialization_schema = JsonRowDeserializationSchema.Builder(rowSchema).build()

env.add_source(FlinkKafkaConsumer010("topic", deserialization_schema, properties))


DataStream API supports writing data into a local file or external storage system:

  1. Call DataStream.print() to print the data stream to the console.
  2. call DataStream.add_sink() to add a sink connector, similar to source connectors,we will provide a wrapped class correspond to the java connector and require user to provide a DeserializationSchema, the kafka producer sink connector would be like as below:

row_schema = Types.ROW_NAMED(

                                                            ["f1", "f2", "f3", "f4", "f5"],

                                                            [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME])

serialization_schema = JsonRowSerializationSchema.Builder(rowSchema).build()

ds.add_sink(FlinkKafkaProducer010(‘topic’, serialization_schema, properties))

Public Interfaces


First, we will add a new class named DataStream, and most of the methods corresponding to the DataStream API at java/scala side. And base on DataStream, after specific transformations, there are also several DataStream extensions to be mentioned:

       KeyedStream: represents a DataStream on which the operator state is partitioned by key using a provided KeySelector.

       IterativeStream: represents the start of an iteration in a DataStream.

The hierarchy diagram is as below:

class DataStream(object):


      A DataStream represents a stream of elements of the same type. A 

      DataStream can be transformed into another DataStream by applying a

      transformation as for example:



       def get_parallelism(self):


       Gets the parallelism for this operator.



       def get_min_resources(self):


       Gets the minimum resources for this operator.



       def get_preferred_resources(self):

       Gets the preferred resources for this operator.




       def get_type(self):


       Gets the type of the stream.



       def get_execution_environment(self):


       Returns the StreamExecutionEnvironment that was used to create

       this DataStream.



       def get_execution_config(self):



       def name(self, name):



       def get_name(self):


       def uid(self):


       def set_uid_hash(self, hash):


       def disable_chaining(self):


       def start_new_chain(self):

       def slot_sharing_group(self, slot_sharing_group):


       def union(self, data_streams)


       Creates a new DataStream by merging DataStream outputs of the

       same type with each other. The DataStream merged using this 

       operator will be transformed simultaneously.



       def connect(self, data_stream)



      def key_by(self, key_selector):


     def broadcast(self):



     def shuffle(self):



     def forward(self):



     def rebalance(self):



     def rescale(self):



    def global(self):



    def iterate(self):



    def map(self, map_func):



    def flat_map(self, flat_map_func):



    def filter(self, filter_func):



     def add_sink(self, sink_function):



     def print(self)


    def partition_custom(self)



class KeyedStream(DataStream):


    A KeyedStream represents a DataStream on which operator state is

    partitioned by key using a provided KeySelector. Typical operations

    supported by a DataStream are also possinble on a KeyedStream, with

    the exception of partitioning methods such as shuffle, forward and 


    Reduce-style operations, such as reduce, sum work on elements that

    have the same key.



    def reduce(self, reducer):


        Applies a reduce transformation on the grouped data stream

        grouped on by the given key position. The ReduceFunction will

        receive input values based on the key value. Only input values

        with the same key will go to the same reducer






It is also important to add up job configuration and execution interfaces in StreamExecutionEnvironment( set_parallelism(), execute_async(), etc.) and all supported methods to get a DataStream from python object collection or external data source:

class StreamExecutionEnvironment(object):


       def from_collection(self, collection) -> DataStream:


      def read_text_file(self, file_path) → DataStream



       def generate_sequence(self, from, to) -> DataStream;



      def add_source(self, source_function) -> DataStream:



      def execute_async(self):



       def register_cache_file(self, file_path, file_name, executable):



      def get_cached_files(self):



In Java Table API, users can get a Table from a DataStream or return a DataStream from a table, it should also be supported in python stream table environment :

class StreamTableEnvironment(TableEnvironment):


      def from_data_stream(self, data_stream, names=None):



     def to_append_stream(self):



     def to_retract_stream(self):


class Function(abc.ABC):


    The base class for all user-defined functions.


class MapFunction(ScalarFunction):



      def map(self, value):


class FlatMapFunction(ScalarFunction):



     def flat_map(self, value):



class FilterFunction(ScalarFunction):



    def filter(self, value):


class ReduceFunction(ScalarFunction):



     def reduce(self, value_1, value2):



env = StreamExecutionEnvironment.get_execution_environment()

#create a DataStream from a python object list

ds = env.from_collection([('ab', 1), ('abc, 2), ('abcd', 3), ('abcde', 4)],

                                           type_info=Types.ROW([Types.STRING(), Types.INT()])

#set the parallelism of this operator to 3


# keyed the DataStream with the second field of the record, then 

# transform the DataStream with a map function, finally, print the 

# transformation result to console. 

ds.key_by(lambda x: x[1]).map(lambda x: (x[0], x[1] + 1)).print()


To sum up, we have the following function list to be implemented:





configuration relevant method

StreamExecutionEnvironment(final Configuration configuration)



Predefined Source







DataStreamUtils(collect, reinterpretAsKeyedStream)







Hadoop FileSystem


Apache NiFi

Twitter Streaming API 

Google PubSub 


DataStream Operators










Chaining & resource groups


















Row Type (represented by tuple in Python, RowTypeInfo in Java)

Primitive Array Types

Basic Types

Implementation Plan

  1. Add DataStream class, and task configuration interface.
  2. Support creating a DataStream through python objects like  env.from_collection().
  3. Support map()/flat_map()/print(), etc.
  4. Support built-in connectors (Kafka, ElasticSearch, Kinese, etc.)
  5. Support key_by(), etc..