|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The Sink interface introduced in FLIP-143 has some limitations when implementing advanced sinks. The goal of this FLIP is to add additional exceptions and fields to Sink#InitContext
to facilitate more use cases. Most importantly, information necessary to port existing SinkFunctions
to the Sink
interface and to support asynchronous communication patterns (FLIP-171).
SinkWriter
interface to allow interruptable writes.Sink#InitContext
to provide more information and opportunities to the user.For asynchronous calls to the external system, the sink needs to propagate some backpressure upstream by blocking the write
. However, there is no way to map interruptions correctly, such that in the worst case cancellation requires a hard kill. We propose to allow write
to throw InterruptedException
. Similarly, ProcessingTimeService#onProcessingTime
may throw InterruptedException
.
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable { /** * Add an element to the writer. * * @param element The input record * @param context The additional information about the input record * @throws IOException if fail to add an element * @throws InterruptedException when interrupted */ void write(InputT element, Context context) throws IOException, InterruptedException; ... // other methods } |
For initializing the SinkWriter
, Sink#createWriter
gets an InitContext
that contains auxiliary information and services. We propose to add the following to the existing InitContext
:
/** The interface exposes some runtime info for creating a {@link SinkWriter}. */ interface InitContext { /** * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, * but are part of the jar file of a user job. * * @see UserCodeClassLoader */ UserCodeClassLoader getUserCodeClassLoader(); /** @return number of parallel Sink tasks. */ int getNumberOfParallelSubtasks(); /** * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task * thread in between record processing. * * <p>Note that this method should not be used per-record for performance reasons in the * same way as individual records should not be sent individually. Rather, implementers are * expected to batch records and only enqueue a single {@link Runnable} per batch to handle * the result. */ MailboxExecutor getMailboxExecutor(); ... // existing methods } |
The UserCodeClassLoader
is necessary if any serialization schema needs to be created from a class name and providing it in the sinks is symmetric to the FLIP-27 source (available in SourceReaderContext
).
Similarly, getNumberOfParallelSubtasks
is used to distribute work in conjunction with the already existing getSubtaskId
. Again, sources provide the information with SplitEnumeratorContext#currentParallelism
.
Finally, asynchronous sinks currently have no way to process asynchronous responses without resorting to complicated and slow threading models. For that we want to move MailboxExecutor
from flink-streaming
to flink-core
into the to the package org.apache.flink.api.common.operators
and expose it in InitContext
similar to ProcessingTimeService
. The expectation is that most sinks do not use it directly but with a more user-friendly abstraction in FLIP-171. See appendix for a usage example
MailboxExecutor
would break existing code that relies on the experimental YieldingOperatorFactory
. We are not aware of external users. If we want to maintain backward compatibility, we can add a simple MailboxExecutor
at the old location that extends from the moved interface and keep the old reference in the operator factory. That interface should be immediately @deprecated
.These simple API changes will be covered by extending and adding unit and integration tests.
No alternatives yet.
MailboxExecutor
/** Base class for out testing {@link SinkWriter Writers}. */ class AsyncSinkWriter implements SinkWriter<Integer, String, String>, Serializable { private final MailboxExecutor mailboxExecutor; private final int capacity = 10; private int activeElements = 0; AsyncSinkWriter(MailboxExecutor mailboxExecutor) { this.mailboxExecutor = mailboxExecutor; } @Override public void write(Integer element, Context context) throws InterruptedException { activeElements++; // in this example, records are directly send; however, it's recommended to use that pattern with batches only sendAsync(element, r -> processResult(element, r)); while (activeElements > capacity) { // yielding is necessary, such that processResultInTaskThread can be executed mailboxExecutor.yield(); } } /** Processes the result in the external thread. Concurrent call to write possible. */ private void processResult(Integer element, Result result) { mailboxExecutor.execute( () -> processResultInTaskThread(element, result), "Process result of %s", element); } /** Processes the result in the task thread. No concurrent call to write possible. */ private void processResultInTaskThread(Integer element, Result result) { if (result.isGood()) { // no synchronization needed to access fields activeElements--; } else { sendAsync(element, r -> processResult(element, r)); } } // placeholder for some external call private void sendAsync(Integer element, Consumer<Result> resultListener) { // in external thread, send element, call resultListener } } |