DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Shuiqiang Chen, Hequn Cheng, Jincheng Sun
Status
| Page properties | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, SQL and Table API have been supported in pyflink, which provides much more convenience for users who are familiar with python programming language. However, users might request for more complicated processing operations which might need to access state and timer, etc. And for AI and ML engineers, there might not be strong demand for structural data query. In this FLIP, we will introduce the python DataStream API which can make up for the above shortcomings.
Goals
- support python DataStream to read/write data from/to external storage(connectors).
- support configuring task and job configuration( get/set resources, parallelism, chaining strategy , etc.)
- support stateless data transformations, including map, flatmap, keyby, etc.
...
- The support for stateful data transformations, including ProcessFunction, RichFunction, Window, Join function will not be considered in this FLIP, but we have the overall design in this doc.
Proposed Changes
Archetecture
In FLIP 38 Python Table API, the communication between Python VM process and JVM process is implemented by py4j RPC framework, by which there will only be wrapper classes for all of java API and have no need to design a new set of python APIs. And in FLIP-58 Flink Python User-Defined Stateless Function for Table , we leverage the Beam framework to make the flink operator start a Beam Runner to launch a python process to run the python user defined function. The overall architecture is as below:
...
We could also make use of the Beam framework to execute the user defined python processing function.
Implementation
In summary, the architure of python DataStream API has no much difference with SQL and Table API. But there are some more implementation detail to be mentioned.
Configuration
Task Configuration
DataStream API provides APIs to get or set specific configuration for current Stream. Python DataStream API would also privode corresponding API in the form of a wrapped up class which would send the get/set requests to Java API instance via py4j rpc framework. Take DataStream for instance:
...
In the code block above, set_resource_spec(ResourceSpec) require user to pass ResourceSpec instance as parameters, this means the python DataStream API also need to provide the corresponding wrapped python classes.
Dependency management
When developing pyflink application, there might be some module consist of multiple files, or other third party dependencies. We also need to provide interfaces for user to add up these extract files in StreamExecutionEnvironment as configurations. The interfaces would be as bellow:
...
- pyfs: attach custom python files for job.
- pyarch: add python archive files for job.
- pyreq: specify a requirements.txt file which defines the third-party dependencies.
- pym: specify python module with the program entry point.
Stateless Functions
There are many transform operations in DataStream API require user to provide their implementation, such as map()/flatmap()/keyBy(), etc. In python DataStream API, users can implement their processing logic in a python function and pass it to the interface. Take map() function for instance:
...
As mentioned above, python DataStream API leverages the Beam portability framework to be able to execute user defined python function in flink runtime. When user gives a python function as a parameter to a datastream transformation interface, we will serialize the function object and pass it to a dedicated flink java operator which will launch a Beam runner and start a python process to execute user defined python function. Therefore, the submitted job graph might not be as the user specified logic.
Types and SerDe
In the last section about supporting UDFs, we mentioned that python DataStream API can reuse the Python Table API framework. But there are some differences between them. SQL and Table API data formats are restricted by table Schema, so data between upstream and downstream operation can be SerDe by the given type information in Schema. While DataStream does not have a strong data schema restriction that user might not care about the input and output data types when implementing their python functions. However, we need to serialize and deserialize data and ship data between operators correctly. In pyflink DataStream API, we tease out the following four scenarios:
...
data_stream.map(MyMapFunction(), data_types=Types.ROW([Types.String(), Types.INT()])) |
Connectors
Furthermore, it is important to provide a connector API to read/write data from python objects or external storage systems.
Source
User can get a data stream from two kinds of sources:
...
row_schema = Types.ROW_NAMED( ["f1", "f2", "f3", "f4", "f5"], [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME]) deserialization_schema = JsonRowDeserializationSchema.Builder(rowSchema).build() env.add_source(FlinkKafkaConsumer010("topic", deserialization_schema, properties)) |
Sink
DataStream API supports writing data into a local file or external storage system:
...
row_schema = Types.ROW_NAMED( ["f1", "f2", "f3", "f4", "f5"], [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME]) serialization_schema = JsonRowSerializationSchema.Builder(rowSchema).build() ds.add_sink(FlinkKafkaProducer010(‘topic’, serialization_schema, properties)) |
Public Interfaces
DataStream
First, we will add a new class named DataStream, and most of the methods corresponding to the DataStream API at java/scala side. And base on DataStream, after specific transformations, there are also several DataStream extensions to be mentioned:
...
class StreamExecutionEnvironment(object):
def from_collection(self, collection) -> DataStream: pass def read_text_file(self, file_path) → DataStream: pass
def generate_sequence(self, from, to) -> DataStream; pass
def add_source(self, source_function) -> DataStream: pass
def execute_async(self): pass
def register_cache_file(self, file_path, file_name, executable): pass
def get_cached_files(self): pass |
StreamTableEnvironment
In Java Table API, users can get a Table from a DataStream or return a DataStream from a table, it should also be supported in python stream table environment :
...
class StreamTableEnvironment(TableEnvironment):
def from_data_stream(self, data_stream, names=None): pass
def to_append_stream(self): pass
def to_retract_stream(self): |
Function/MapFunction/FlatMapFunction
class Function(abc.ABC): """ The base class for all user-defined functions. """ class MapFunction(ScalarFunction):
@abc.abstractmethod def map(self, value): pass class FlatMapFunction(ScalarFunction):
@abc.abstractmethod def flat_map(self, value): pass
class FilterFunction(ScalarFunction):
@abc.abstractmethod def filter(self, value): pass class ReduceFunction(ScalarFunction):
@abc.abstractmethod def reduce(self, value_1, value2): pass |
Example
env = StreamExecutionEnvironment.get_execution_environment() #create a DataStream from a python object list ds = env.from_collection([('ab', 1), ('abc, 2), ('abcd', 3), ('abcde', 4)], type_info=Types.ROW([Types.STRING(), Types.INT()]) #set the parallelism of this operator to 3 ds.set_parallelism(3) # keyed the DataStream with the second field of the record, then # transform the DataStream with a map function, finally, print the # transformation result to console. ds.key_by(lambda x: x[1]).map(lambda x: (x[0], x[1] + 1)).print() env.execute() |
...
category | functionality |
Env | StreamExecutionEnvironment |
configuration relevant method | |
StreamExecutionEnvironment(final Configuration configuration) | |
executeAsync | |
registerCachedFile/context.getCachedFile | |
Predefined Source | generateSequence |
fromCollection | |
addSource | |
PredefinedSink | print() |
addSink | |
DataStreamUtils(collect, reinterpretAsKeyedStream) | |
StreamingFileSink | |
Connectors | Kafka |
Cassandra | |
Kinesis | |
Elasticsearch | |
Hadoop FileSystem | |
RabbitMQ | |
Apache NiFi | |
Twitter Streaming API | |
Google PubSub | |
JDBC | |
DataStream Operators | keyby |
reduce | |
union | |
project | |
Partitioning | shuffle |
rebalance | |
rescale | |
broadcast | |
Chaining & resource groups | startNewChain |
disableChaining | |
slotSharingGroup | |
Functions | MapFunction |
FlatMapFunction | |
FilterFunction | |
KeySelector | |
ReduceFunction | |
CoMapFunction | |
CoFlatMapFunction | |
CoProcessFunction | |
KeyedCoProcessFunction | |
TwoInputStreamOperator | |
Iterations | iterate |
Types | Row Type (represented by tuple in Python, RowTypeInfo in Java) |
Primitive Array Types | |
Basic Types |
Implementation Plan
- Add DataStream class, and task configuration interface.
- Support creating a DataStream through python objects like env.from_collection().
- Support map()/flat_map()/print(), etc.
- Support built-in connectors (Kafka, ElasticSearch, Kinese, etc.)
- Support key_by(), etc..