Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The Sink interface introduced in FLIP-143 has some limitations when implementing advanced sinks. The goal of this FLIP is to add additional exceptions and fields to
Sink#InitContext to facilitate more use cases. Most importantly, information necessary to port existing
SinkFunctions to the
Sink interface and to support asynchronous communication patterns (FLIP-171).
- We want to adjust the
SinkWriterinterface to allow interruptable writes.
- We also extend the
Sink#InitContextto provide more information and opportunities to the user.
For asynchronous calls to the external system, the sink needs to propagate some backpressure upstream by blocking the
write . However, there is no way to map interruptions correctly, such that in the worst case cancellation requires a hard kill. We propose to allow
write to throw
InterruptedException . Similarly,
ProcessingTimeService#onProcessingTime may throw
For initializing the
Sink#createWriter gets an
InitContext that contains auxiliary information and services. We propose to add the following to the existing
UserCodeClassLoader is necessary if any serialization schema needs to be created from a class name and providing it in the sinks is symmetric to the FLIP-27 source (available in
getNumberOfParallelSubtasks is used to distribute work in conjunction with the already existing
getSubtaskId . Again, sources provide the information with
Finally, asynchronous sinks currently have no way to process asynchronous responses without resorting to complicated and slow threading models. For that we want to move
flink-core into the to the package
org.apache.flink.api.common.operators and expose it in
InitContext similar to
ProcessingTimeService . The expectation is that most sinks do not use it directly but with a more user-friendly abstraction in FLIP-171. See appendix for a usage example
Compatibility, Deprecation, and Migration Plan
- Most changes are simple additions with no impact on existing code. Moving the
MailboxExecutorwould break existing code that relies on the experimental
YieldingOperatorFactory. We are not aware of external users. If we want to maintain backward compatibility, we can add a simple
MailboxExecutorat the old location that extends from the moved interface and keep the old reference in the operator factory. That interface should be immediately
These simple API changes will be covered by extending and adding unit and integration tests.
No alternatives yet.