StatefulSinkWriter

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

During the migration of the Iceberg Sink to the new Flink Sink V2 API we found that the current WithPreCommitTopology interface is not sufficient for our use-case. The WithPreCommitTopology interface looks like this:

public interface WithPreCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT> {

    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
            DataStream<CommittableMessage<CommT>> committables);
}

The issue is that the addPreCommitTopology method accepts a CommT type, and also returns objects with the same CommT type.

In case of the Iceberg Sink, we would like to use the WithPreCommitTopology to aggregate the writer results and create a single committable from them. So we would like to change both the type, and the number of the messages. Using the current WithPreCommitTopology interface we can work around the issue by using a Tuple, or POJO where some of the fields are used only before the addPreCommitTopology method, and some of the fields are only used after the method, but this seems more like abusing the interface than using it.

This is a more generic issue where the WithPreCommitTopology should provide a way to transform not only the data, but the type of the data channelled through it.

The community discussed this topic from several different angles:

  • During the implementation of FLIP-371 we broke the backward compatibility of the Sink API. See the discussion on FLINK-25857
  • The discussion continued on the mailing list in this thread
  • Also the discussion of this FLIP on the mailing list in this thread

The general consensus was to reject the original approach (minimal changes to the Sink API), and move forward to rewrite the Sink API using mixin interfaces to enhance the extendibility of the API.

Generic approach

When redesigning the API we followed the generic guidelines below:

  • Every new feature should be added with a Supports<FeatureName> interface (similar to the Source API), like
    • SupportsCommiter
    • SupportsWriterState
    • SupportsPreWriteTopology
    • SupportsPreCommitToplogy
    • SupportsPostCommitTopology
  • Keep the number of the inner interfaces/classes minimal, and expose them as a full fledged classes
    • Committer - no change here
    • CommitterInitContext
    • StatefulSinkWriter
    • WriterInitContext
  • No redefining interface methods during interface inheritance - it would prevent future deprecation
  • Minimal inheritance extension - for more flexibility in the future. Kept only
    • StatefulSinkWriter, CommittingSinkWriter - which inherits from SinkWriter
    • CommitterInitContext, WriterInitContext - which inherits from InitContext

Public Interfaces

Feature interfaces

TwoPhaseCommittingSink

The TwoPhaseCommittingSink will be replaced by the SupportsCommitter mixin interface:

SupportsCommitter
public interface SupportsCommitter<CommittableT> {

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommittableT> createCommitter(CommitterInitContext context) throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
}

StatefulSink

The StatefulSink will be replaced by the SupportsWriterState mixin interface:

SupportsWriterState
public interface SupportsWriterState<InputT, WriterStateT> {

    /**
     * Create a {@link StatefulSinkWriter} from a recovered state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException;

    /**
     * Any stateful sink needs to provide this state serializer and implement {@link
     * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link
     * #restoreWriter(WriterInitContext, Collection)} on recovery.
     *
     * @return the serializer of the writer's state type.
     */
    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();

    /**
     * A mix-in for {@link SupportsWriterState} that allows users to migrate from a sink with a
     * compatible state to this sink.
     */
    @PublicEvolving
    interface WithCompatibleState {
        /**
         * A list of state names of sinks from which the state can be restored. For example, the new
         * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a
         * drop-in replacement when resuming from a checkpoint/savepoint.
         */
        Collection<String> getCompatibleWriterStateNames();
    }
}

WithPreWriteTopology

The WithPreWriteTopology will be replaced by the SupportsPreWriteTopology mixin interface:

SupportsPreWriteTopology
public interface SupportsPreWriteTopology<InputT> {

    /**
     * Adds an arbitrary topology before the writer. The topology may be used to repartition the
     * data.
     *
     * @param inputDataStream the stream of input records.
     * @return the custom topology before {@link SinkWriter}.
     */
    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
}

WithPreCommitTopology

The WithPreCommitTopology will be replaced by the SupportsPreCommitTopology mixin interface. The addPreCommitTopology method is changed so the input stream and the output stream could contain different types which is needed for the original goal of this FLIP. Also we need a different serializer for the WriteResultT objects if they are rebalanced between the steps:

SupportsPreCommitTopology
public interface SupportsPreCommitTopology<WriterResultT, CommittableT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
            DataStream<CommittableMessage<WriterResultT>> committables);

    /** Returns the serializer of the WriteResult type. */
    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
}

WithPostCommitTopology

The WithPostCommitTopology will be replaced by the SupportsPostCommitTopology mixin interface:

SupportsPostCommitTopology
public interface SupportsPostCommitTopology<CommittableT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> committables);
}

Extracted interfaces

The following interfaces were inner interfaces in the previous iteration of the Sink API. In the case of PrecommittingSinkWriter which was an inner class of TwoPhaseCommittingSink this prevented the evolution of the TwoPhaseCommittingSink, because of the generic types of the classes were tightly coupled. Also, extracting these interfaces will result in a Sink API which is similar to the Source API structure.

CommitterInitContext

CommitterInitContext was extracted from TwoPhaseCommittingSink:


CommitterInitContext
public interface CommitterInitContext extends InitContext {
    /** @return The metric group this committer belongs to. */
    SinkCommitterMetricGroup metricGroup();
}

CommittingSinkWriter

CommittingSinkWriter was extracted from TwoPhaseCommittingSink, and the original name was PrecommittingSinkWriter. This was changed so the coupling between the Committer, CommitterInitContext, CommittingSinkWriter and SupportsCommitter interfaces are more pronounced. We accepted the inheritance here, as it has a clear purpose:


CommittingSinkWriter
public interface CommittingSinkWriter<InputT, CommittableT> extends SinkWriter<InputT> {
    /**
     * Prepares for a commit.
     *
     * <p>This method will be called after {@link #flush(boolean)} and before {@link
     * StatefulSinkWriter#snapshotState(long)}.
     *
     * @return The data to commit as the second step of the two-phase commit protocol.
     * @throws IOException if fail to prepare for a commit.
     */
    Collection<CommittableT> prepareCommit() throws IOException, InterruptedException;
}

WriterInitContext

WriterInitContext was extracted from StatefulSink:

WriterInitContext
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext {
    /**
     * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, but
     * are part of the jar file of a user job.
     *
     * @see UserCodeClassLoader
     */
    UserCodeClassLoader getUserCodeClassLoader();

    /**
     * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task thread
     * in between record processing.
     *
     * <p>Note that this method should not be used per-record for performance reasons in the same
     * way as records should not be sent to the external system individually. Rather, implementers
     * are expected to batch records and only enqueue a single {@link Runnable} per batch to handle
     * the result.
     */
    MailboxExecutor getMailboxExecutor();

    /**
     * Returns a {@link ProcessingTimeService} that can be used to get the current time and register
     * timers.
     */
    ProcessingTimeService getProcessingTimeService();

    /** @return The metric group this writer belongs to. */
    SinkWriterMetricGroup metricGroup();

    /** Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */
    SerializationSchema.InitializationContext asSerializationSchemaInitializationContext();

    /** Returns whether object reuse has been enabled or disabled. */
    boolean isObjectReuseEnabled();

    /** Creates a serializer for the type of sink's input. */
    <IN> TypeSerializer<IN> createInputSerializer();

    /**
     * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type
     * {@link MetaT} to the consumer.
     *
     * <p>It is recommended to use a separate thread pool to publish the metadata because enqueuing
     * a lot of these messages in the mailbox may lead to a performance decrease. thread, and the
     * {@link Consumer#accept} method is executed very fast.
     */
    @Experimental
    default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
        return Optional.empty();
    }
}

StatefulSinkWriter

StatefulSinkWriter was extracted from StatefulSink. We accepted the inheritance here, as it has a clear purpose:


StatefulSinkWriter
public interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> {
    /**
     * @return The writer's state.
     * @throws IOException if fail to snapshot writer's state.
     */
    List<WriterStateT> snapshotState(long checkpointId) throws IOException;
}

Other modified interfaces

Sink

As a result the Sink interface createWriter method parameter type should be changed:

Sink
public interface Sink<InputT> extends Serializable {
    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException;
} 

Proposed Changes

Other than the interface changes we should modify the SinkTransformationTranslator.addCommittingTopology method to make sure that new parameter types are followed.

Compatibility, Deprecation, and Migration Plan

Deprecation

We will deprecate the old interfaces, but provide a default implementation based on the new API to allow Sink developers to migrate to the new API conveniently.

PublicEvolving interfaces

Some of the changes are affecting PublicEvolving interfaces:

  • StatefulSink
  • TwoPhaseCommittingSink
  • Sink

These changes need to be deprecated and kept for at least 1 minor release (until 1.20 as per current plans at the point of writing this).

The proposed solution is to make the old interfaces extend the new ones.

StatefulSink

StatefulSink will extend the SupportsWriterState interface and provide a default implementation for the old methods/interfaces:

StatefulSink
@Deprecated
public interface StatefulSink<InputT, WriterStateT>
        extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {

    /**
     * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
     * state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException {
        throw new UnsupportedOperationException(
                "Deprecated, please use restoreWriter(WriterInitContext, Collection<WriterStateT>)");
    }

    /**
     * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
     * state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException {
        return restoreWriter(new InitContextWrapper(context), recoveredState);
    }

    /**
     * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible
     * state to this sink.
     */
    @PublicEvolving
    interface WithCompatibleState extends SupportsWriterState.WithCompatibleState {}

    /**
     * A {@link SinkWriter} whose state needs to be checkpointed.
     *
     * @param <InputT> The type of the sink writer's input
     * @param <WriterStateT> The type of the writer's state
     */
    @PublicEvolving
    interface StatefulSinkWriter<InputT, WriterStateT>
            extends org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {}
}

TwoPhaseCommittingSink

TwoPhaseCommittingSink will extend the SupportsCommitter interface and provide a default implementation for the old methods/interfaces:

TwoPhaseCommittingSink
@Deprecated
public interface TwoPhaseCommittingSink<InputT, CommT>
        extends Sink<InputT>, SupportsCommitter<CommT> {

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     * @deprecated Please use {@link #createCommitter(CommitterInitContext)}
     */
    @Deprecated
    default Committer<CommT> createCommitter() throws IOException {
        throw new UnsupportedOperationException(
                "Deprecated, please use createCommitter(CommitterInitContext)");
    }

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    default Committer<CommT> createCommitter(CommitterInitContext context) throws IOException {
        return createCommitter();
    }

    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
    @PublicEvolving
    interface PrecommittingSinkWriter<InputT, CommT> extends CommittingSinkWriter<InputT, CommT> {}
}

Sink.createWriter

The Sink.createWriter method interface deprecation will be handled like this:

Sink
public interface Sink<InputT> extends Serializable {

    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     * @deprecated Please implement {@link #createWriter(WriterInitContext)}. For backward
     *     compatibility reasons - to keep {@link Sink} a functional interface - Flink did not
     *     provide a default implementation. New {@link Sink} implementations should implement this
     *     method, but it will not be used, and it will be removed in 1.20.0 release. Do not use
     *     {@link Override} annotation when implementing this method, to prevent compilation errors
     *     when migrating to 1.20.x release.
     */
    @Deprecated
    SinkWriter<InputT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
        return createWriter(new InitContextWrapper(context));
    }

    /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
    @PublicEvolving
    @Deprecated
    interface InitContext extends org.apache.flink.api.connector.sink2.InitContext {
[..]
    }

    /**
     * Class for wrapping a new {@link WriterInitContext} to an old {@link InitContext} until
     * deprecation.
     *
     * @deprecated Internal, do not use it.
     */
    @Deprecated
    class InitContextWrapper implements InitContext {
        private final WriterInitContext wrapped;

        InitContextWrapper(WriterInitContext wrapped) {
            this.wrapped = wrapped;
        }

        @Override
        public int getSubtaskId() {
            return wrapped.getSubtaskId();
        }
[..]
    }
}

Experimental interfaces

Some of the changes are affecting Experimental interfaces:

  • WithPreWriteTopology
  • WithPreCommitTopology
  • WithPostCommitTopology

Based on FLIP-321 we could remove the interfaces in a new minor release, but to provide more graceful migration period, we can keep the old interfaces by extending the new ones.

WithPreWriteTopology

WithPreWriteTopology will extend the SupportsPreWriteTopology interface:

WithPreWriteTopology
@Deprecated
public interface WithPreWriteTopology<InputT>
        extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}

WithPreCommitTopology

WithPreCommitTopology will extend the SupportsPreCommitTopology interface, and need to provide a default implementation for the getWriteResultSerializer method:

WithPreCommitTopology
@Deprecated
public interface WithPreCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPreCommitTopology<CommT, CommT> {
    /** Defaults to {@link #getCommittableSerializer} for backward compatibility */
    default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
        return getCommittableSerializer();
    }
}

WithPostCommitTopology

WithPostCommitTopology will extend the SupportsPostCommitTopology interface:

WithPreWriteTopology
@Deprecated
public interface WithPostCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPostCommitTopology<CommT> {}

Migration Plan

Migrating to the new interfaces should be straightforward following the next steps:

  • StatefulSink implementations:
    • Sink should implement SupportsStatefulWriter interface - no method name change, just interface name and import change
    • Writer should implement the new StatefulWriter interface - import change only
  • TwoPhaseCommittingSink
    • Sink should implement SupportsCommitter interface - no method name change, just interface name and import change
    • Writer should implement the new CommittingWriter interface - no method name change, just interface name and import change
  • Sink
    • If the Sink is used as a functional interface, then no change is needed
    • It the Sink createWriter method is implemented then interface name and import change
  • WithPreWriteTopology
    • interface name and import change
  • WithPreCommitTopology
    • Implementation for getWriteResultSerializer is needed 
  • WithPostCommitTopology
    • interface name and import change

Test Plan

As a first step, we create our own unit tests, but keep the original tests intact. This will allow us to confirm, that the changes are backward compatible. The new unit tests will confirm that the new features are working.

We will clearly demonstrate via the new Iceberg SinkV2 that the SupportsPreCommitTransformation is working as expected and propagated properly. The KafkaSink could be used to validate that the original usage of the TwoPhaseCommittingSink is also working.

Rejected Alternatives

Adding a new TwoPhaseCommittingSinkWithPreCommitTopology interface

The approach would have kept the changes minimal, stick to the current Sink API design. We introduce the required new combination of interfaces (TwoPhaseCommttingSinkWithPreCommitTopology, WithPostCommitTopologyWithPreCommitTopology), and do not change the API structure.

The advantages of this approach:

  • Minimal change - smaller rewrite on the connector side
  • Type checks happen on compile time

Based on the feedback of the community we decided to accept these downsides considering the following advantages of the mixin approach:

  • Easier to evolve - this will become especially important when this API becomes Public
  • Limits the number of required interfaces - as we do not have to create a specific interface combining the possible features
  • The mixin approach is more similar to the current stable Source API interfaces - so developers will have easier time to onboard Flink
         - Cons:
              - Harder to evolve
              - The number of interfaces increases with the possible combinations
              - Inconsistent API patterns wrt Source API - harder for developers to understand 

Adding the new methods directly to TwoPhaseCommittingSink

We considered to change the existing TwoPhaseCommittingSink interface instead of introducing a new one, but decided against it since it is already marked as PublicEvolving. Another reason is that the users of the current Sink API have not complained about the restrictions in the WithPostCommitTopology so clearly there are several important use-cases where the type transformation is not needed in the Sinks.

WithPreCommitTopology and WithPostCommitTopology to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface

In a previous iterator the FLIP suggested that we should change WithPreCommitTopology and WithPostCommitTopology interfaces to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface. This approach would have worked in this case, but the result would be awkward for cases where WithPostCommitTopology would be needed, but there is no need for WithPreCommitTopology. Also this solution is not too flexible and hard to apply if more interfaces are needed in the future.

The previously suggested interfaces:

WithPreCommitTopology
public interface WithPreCommitTopology<InputT, WriteResultT, CommittableT>
        extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResultT, CommittableT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<WriteResultT>> addPreCommitTopology(
            DataStream<CommittableMessage<CommittableT>> committables);
}
WithPostCommitTopology
public interface WithPostCommitTopology<InputT, WriteResult, CommT>
        extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResult, CommT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables);
}