
Discussion threadhere (<- link to
Vote thread


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The Sink interface introduced in FLIP-143 has some limitations when implementing advanced sinks. The goal of this FLIP is to add additional exceptions and fields to Sink#InitContext to facilitate more use cases. Most importantly, information necessary to port existing SinkFunctions to the Sink interface and to support asynchronous communication patterns (FLIP-171).

Public Interfaces

Proposed Changes

For asynchronous calls to the external system, the sink needs to propagate some backpressure upstream by blocking the write . However, there is no way to map interruptions correctly, such that in the worst case cancellation requires a hard kill. We propose to allow write  to throw InterruptedException . Similarly, ProcessingTimeService#onProcessingTime  may throw InterruptedException.

public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

     * Add an element to the writer.
     * @param element The input record
     * @param context The additional information about the input record
     * @throws IOException if fail to add an element
     * @throws InterruptedException when interrupted
    void write(InputT element, Context context) throws IOException, InterruptedException;
     ... // other methods

For initializing the SinkWriter , Sink#createWriter gets an InitContext that contains auxiliary information and services. We propose to add the following to the existing InitContext :

/** The interface exposes some runtime info for creating a {@link SinkWriter}. */
interface InitContext {
     * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath,
     * but are part of the jar file of a user job.
     * @see UserCodeClassLoader
    UserCodeClassLoader getUserCodeClassLoader();

    /** @return number of parallel Sink tasks. */
    int getNumberOfParallelSubtasks();

     * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task
     * thread in between record processing.
     * <p>Note that this method should not be used per-record for performance reasons in the
     * same way as individual records should not be sent individually. Rather, implementers are
     * expected to batch records and only enqueue a single {@link Runnable} per batch to handle
     * the result.
    MailboxExecutor getMailboxExecutor();

    ... // existing methods

The UserCodeClassLoader is necessary if any serialization schema needs to be created from a class name and providing it in the sinks is symmetric to the FLIP-27 source (available in SourceReaderContext).

Similarly, getNumberOfParallelSubtasks is used to distribute work in conjunction with the already existing getSubtaskId . Again, sources provide the information with SplitEnumeratorContext#currentParallelism .

Finally, asynchronous sinks currently have no way to process asynchronous responses without resorting to complicated and slow threading models. For that we want to move MailboxExecutor from flink-streaming  to flink-core into the to the package org.apache.flink.api.common.operators   and expose it in  InitContext  similar to ProcessingTimeService . The expectation is that most sinks do not use it directly but with a more user-friendly abstraction in FLIP-171. See appendix for a usage example

Compatibility, Deprecation, and Migration Plan

Test Plan

These simple API changes will be covered by extending and adding unit and integration tests.

Rejected Alternatives

No alternatives yet.


Appendix 1 - Usage example of the MailboxExecutor 

/** Base class for out testing {@link SinkWriter Writers}. */
class AsyncSinkWriter implements SinkWriter<Integer, String, String>, Serializable {

    private final MailboxExecutor mailboxExecutor;
    private final int capacity = 10;
    private int activeElements = 0;

    AsyncSinkWriter(MailboxExecutor mailboxExecutor) {
        this.mailboxExecutor = mailboxExecutor;

    public void write(Integer element, Context context) throws InterruptedException {
        // in this example, records are directly send; however, it's recommended to use that pattern with batches only
        sendAsync(element, r -> processResult(element, r));

        while (activeElements > capacity) {
            // yielding is necessary, such that processResultInTaskThread can be executed

    /** Processes the result in the external thread. Concurrent call to write possible. */
    private void processResult(Integer element, Result result) {
                () -> processResultInTaskThread(element, result),
                "Process result of %s",

    /** Processes the result in the task thread. No concurrent call to write possible. */
    private void processResultInTaskThread(Integer element, Result result) {
        if (result.isGood()) {
            // no synchronization needed to access fields
        } else {
            sendAsync(element, r -> processResult(element, r));

    // placeholder for some external call
    private void sendAsync(Integer element, Consumer<Result> resultListener) {
        // in external thread, send element, call resultListener