Current state: Under Discussion
Provide support for asynchronous operations over streams
Released: Release this feature as soon as possible
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
I/O access, for the most case, is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. Starting multiple threads may be an option to handle this problem, but the drawbacks are obvious: The programming model for end users may become more complicated as they have to implement thread model in the operator. Furthermore, they have to pay attention to coordinate with checkpointing.
AsyncFunction: Async I/O will be triggered in AsyncFunction.
AsyncWaitOperator: An StreamOperator which will invoke AsyncFunction.
AsyncCollector: For each input streaming record, an AsyncCollector will be created and passed into user's callback to get the async i/o result.
AsyncCollectorBuffer: A buffer to keep all AsyncCollectors.
Emitter Thread: A working thread in AsyncCollectorBuffer, being signalled while some of AsyncCollectors have finished async i/o and emitting results to the following opeartors.
A helper class, named AsyncDataStream, is added to provide methods to add AsyncFunction, which will do async i/o operation, into FLINK streaming job.
The following diagram illustrates how the streaming records are processed while
- arriving at AsyncWaitOperator
- recovering from task failover
- snapshotting state
- being emitted by Emitter Thread
AsyncFunction works as a user function in AsyncWaitOperator which looks like StreamFlatMap operator, having open()/processElement(StreamRecord<IN> record)/processWatermark(Watermark mark).
For user’s concrete AsyncFunction, the asyncInvoke(IN input, AsyncCollector<OUT> collector) has to be overriden to supply codes to start an async operation.
For each input stream record of AsyncWaitOperator, they will be processed by AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> cb). Then AsyncCollector will be append into AsyncCollectorBuffer. We will cover AsyncCollector and AsyncCollectorBuffer later.
AsyncCollector is created by AsyncWaitOperator, and passed into AsyncFunction, where it should be added into user’s callback. It acts as a role to get results or errors from user codes and notify the AsyncCollectorBuffer to emit results.
The functions specific for the user is the collect, and they should be called when async operation is done or errors are thrown out.
How is it used
Before calling AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> collector), AsyncWaitOperator will try to get an instance of AsyncCollector from AsyncCollectorBuffer. Then it will be taken into user’s callback function. If the buffer is full, it will wait until some of ongoing callbacks has finished.
Once async operation has done, the AsyncCollector.collect() will take results or errors and AsyncCollectorBuffer will be notified.
AsyncCollector is implemented by FLINK.
AsyncCollectorBuffer keeps all AsyncCollectors, and emit results to the next nodes.
When AsyncCollector.collect() is called, a mark will be placed in AsyncCollectorBuffer, indicating finished AsyncCollectors. A working thread, named Emitter, will also be signalled once a AsyncCollector gets results, and then try to emit results depending on the ordered or unordered setting.
For simplicity, we will refer task to AsycnCollector in the AsyncCollectorBuffer in the following text.
Ordered and Unordered
Based on the user configuration, the order of output elements will or will not be guaranteed. If not guaranteed, the finished AsyncCollectors coming later will be emitted earlier.
The Emitter Thread will wait for finished AsyncCollectors. When it is signalled, it will process tasks in the buffer as follow:
If the first task in the buffer is finished, then Emitter will collect its results, and then proceed to the second task. If the first task is not finished yet, just wait for it again.
Check all finished tasks in the buffer, and collect results from those tasks which are prior to the oldest Watermark in the buffer.
The Emitter Thread and Task Thread will access exclusively by acquiring/releasing the lock.
Signal Task Thread when all tasks have finished to notify it that all data has been processed, and it is OK to close the operator.
Signal Task Thread after removing some tasks from the buffer.
Propagate Exceptions to Task Thread.
Access AsyncCollectorBuffer exclusively against the Emitter Thread.
Get and Add a new AsyncCollector to the buffer, wait while buffer is full.
All watermarks will also be kept in AsyncCollectorBuffer. A watermark will be emitted if and only if after all AsyncCollectors coming before current watermark have been emitted.
State, Failover and Checkpoint
State and Checkpoint
All input StreamRecords will be kept in state. Instead of storing each input stream records into state one by one while processing, AsyncWaitOperator will put all input stream records in AsyncCollectorBuffer into state while snapshotting operator state. Old data in the state will be cleared before persisting those records.
When all barriers have arrived at the operator, checkpoint can be carried out immediately.
While restoring the operator’s state, the operator will scan all elements in the state, get AsyncCollectors, call AsyncFunction.asyncInvoke() and insert them back into AsyncCollectorBuffer.
Async Resource Sharing
For the case to share async resources(like connection to hbase, netty connections) among different slots(task workers) in the same TaskManager(a.k.a the same JVM), we can make the connection static so that all threads in the same process can share the same instance.
Of course, please pay attention to thread safety while using those resources.
Compatibility, Deprecation, and Migration Plan
No compatibility and migration issues for the users' current FLINK codes.
Add Unittest cases / Function test cases