Status

Discussion threadhere (<- link to https://lists.apache.org/thread.html/r541a4dc637fdab2a831ec21964f3c46d7f1921e7dc9f70a57f4ad28d%40%3Cdev.flink.apache.org%3E)
Vote thread
JIRA

FLINK-23621 - Getting issue details... STATUS

Release1.14

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Sink interface introduced in FLIP-143 has some limitations when implementing advanced sinks. The goal of this FLIP is to add additional exceptions and fields to Sink#InitContext to facilitate more use cases. Most importantly, information necessary to port existing SinkFunctions to the Sink interface and to support asynchronous communication patterns (FLIP-171).

Public Interfaces

  • We want to adjust the SinkWriter interface to allow interruptable writes.
  • We also extend the Sink#InitContext to provide more information and opportunities to the user.

Proposed Changes

For asynchronous calls to the external system, the sink needs to propagate some backpressure upstream by blocking the write . However, there is no way to map interruptions correctly, such that in the worst case cancellation requires a hard kill. We propose to allow write  to throw InterruptedException . Similarly, ProcessingTimeService#onProcessingTime  may throw InterruptedException.

SinkWriter
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

    /**
     * Add an element to the writer.
     *
     * @param element The input record
     * @param context The additional information about the input record
     * @throws IOException if fail to add an element
     * @throws InterruptedException when interrupted
     */
    void write(InputT element, Context context) throws IOException, InterruptedException;
    
     ... // other methods
}


For initializing the SinkWriter , Sink#createWriter gets an InitContext that contains auxiliary information and services. We propose to add the following to the existing InitContext :

InitContext
/** The interface exposes some runtime info for creating a {@link SinkWriter}. */
interface InitContext {
    /**
     * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath,
     * but are part of the jar file of a user job.
     *
     * @see UserCodeClassLoader
     */
    UserCodeClassLoader getUserCodeClassLoader();

    /** @return number of parallel Sink tasks. */
    int getNumberOfParallelSubtasks();

    /**
     * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task
     * thread in between record processing.
     *
     * <p>Note that this method should not be used per-record for performance reasons in the
     * same way as individual records should not be sent individually. Rather, implementers are
     * expected to batch records and only enqueue a single {@link Runnable} per batch to handle
     * the result.
     */
    MailboxExecutor getMailboxExecutor();

    ... // existing methods
}


The UserCodeClassLoader is necessary if any serialization schema needs to be created from a class name and providing it in the sinks is symmetric to the FLIP-27 source (available in SourceReaderContext).

Similarly, getNumberOfParallelSubtasks is used to distribute work in conjunction with the already existing getSubtaskId . Again, sources provide the information with SplitEnumeratorContext#currentParallelism .

Finally, asynchronous sinks currently have no way to process asynchronous responses without resorting to complicated and slow threading models. For that we want to move MailboxExecutor from flink-streaming  to flink-core into the to the package org.apache.flink.api.common.operators   and expose it in  InitContext  similar to ProcessingTimeService . The expectation is that most sinks do not use it directly but with a more user-friendly abstraction in FLIP-171. See appendix for a usage example

Compatibility, Deprecation, and Migration Plan

  • Most changes are simple additions with no impact on existing code. Moving the MailboxExecutor  would break existing code that relies on the experimental YieldingOperatorFactory . We are not aware of external users. If we want to maintain backward compatibility, we can add a simple MailboxExecutor  at the old location that extends from the moved interface and keep the old reference in the operator factory. That interface should be immediately @deprecated .

Test Plan

These simple API changes will be covered by extending and adding unit and integration tests.

Rejected Alternatives

No alternatives yet.

Appendix

Appendix 1 - Usage example of the MailboxExecutor 

Usage example
/** Base class for out testing {@link SinkWriter Writers}. */
class AsyncSinkWriter implements SinkWriter<Integer, String, String>, Serializable {

    private final MailboxExecutor mailboxExecutor;
    private final int capacity = 10;
    private int activeElements = 0;

    AsyncSinkWriter(MailboxExecutor mailboxExecutor) {
        this.mailboxExecutor = mailboxExecutor;
    }

    @Override
    public void write(Integer element, Context context) throws InterruptedException {
        activeElements++;
        // in this example, records are directly send; however, it's recommended to use that pattern with batches only
        sendAsync(element, r -> processResult(element, r));

        while (activeElements > capacity) {
            // yielding is necessary, such that processResultInTaskThread can be executed
            mailboxExecutor.yield();
        }
    }

    /** Processes the result in the external thread. Concurrent call to write possible. */
    private void processResult(Integer element, Result result) {
        mailboxExecutor.execute(
                () -> processResultInTaskThread(element, result),
                "Process result of %s",
                element);
    }

    /** Processes the result in the task thread. No concurrent call to write possible. */
    private void processResultInTaskThread(Integer element, Result result) {
        if (result.isGood()) {
            // no synchronization needed to access fields
            activeElements--;
        } else {
            sendAsync(element, r -> processResult(element, r));
        }
    }

    // placeholder for some external call
    private void sendAsync(Integer element, Consumer<Result> resultListener) {
        // in external thread, send element, call resultListener
    }
}