StatefulSinkWriter
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
During the migration of the Iceberg Sink to the new Flink Sink V2 API we found that the current WithPreCommitTopology interface is not sufficient for our use-case. The WithPreCommitTopology interface looks like this:
public interface WithPreCommitTopology<InputT, CommT> extends TwoPhaseCommittingSink<InputT, CommT> { DataStream<CommittableMessage<CommT>> addPreCommitTopology( DataStream<CommittableMessage<CommT>> committables); }
The issue is that the addPreCommitTopology method accepts a CommT type, and also returns objects with the same CommT type.
In case of the Iceberg Sink, we would like to use the WithPreCommitTopology to aggregate the writer results and create a single committable from them. So we would like to change both the type, and the number of the messages. Using the current WithPreCommitTopology interface we can work around the issue by using a Tuple, or POJO where some of the fields are used only before the addPreCommitTopology method, and some of the fields are only used after the method, but this seems more like abusing the interface than using it.
This is a more generic issue where the WithPreCommitTopology should provide a way to transform not only the data, but the type of the data channelled through it.
The community discussed this topic from several different angles:
- During the implementation of FLIP-371 we broke the backward compatibility of the Sink API. See the discussion on FLINK-25857
- The discussion continued on the mailing list in this thread
- Also the discussion of this FLIP on the mailing list in this thread
The general consensus was to reject the original approach (minimal changes to the Sink API), and move forward to rewrite the Sink API using mixin interfaces to enhance the extendibility of the API.
Generic approach
When redesigning the API we followed the generic guidelines below:
- Every new feature should be added with a Supports<FeatureName> interface (similar to the Source API), like
- SupportsCommiter
- SupportsWriterState
- SupportsPreWriteTopology
- SupportsPreCommitToplogy
- SupportsPostCommitTopology
- Keep the number of the inner interfaces/classes minimal, and expose them as a full fledged classes
- Committer - no change here
- CommitterInitContext
- StatefulSinkWriter
- WriterInitContext
- No redefining interface methods during interface inheritance - it would prevent future deprecation
- Minimal inheritance extension - for more flexibility in the future. Kept only
- StatefulSinkWriter, CommittingSinkWriter - which inherits from SinkWriter
- CommitterInitContext, WriterInitContext - which inherits from InitContext
Public Interfaces
Feature interfaces
TwoPhaseCommittingSink
The TwoPhaseCommittingSink will be replaced by the SupportsCommitter mixin interface:
public interface SupportsCommitter<CommittableT> { /** * Creates a {@link Committer} that permanently makes the previously written data visible * through {@link Committer#commit(Collection)}. * * @param context The context information for the committer initialization. * @return A committer for the two-phase commit protocol. * @throws IOException for any failure during creation. */ Committer<CommittableT> createCommitter(CommitterInitContext context) throws IOException; /** Returns the serializer of the committable type. */ SimpleVersionedSerializer<CommittableT> getCommittableSerializer(); }
StatefulSink
The StatefulSink will be replaced by the SupportsWriterState mixin interface:
public interface SupportsWriterState<InputT, WriterStateT> { /** * Create a {@link StatefulSinkWriter} from a recovered state. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. */ StatefulSinkWriter<InputT, WriterStateT> restoreWriter( WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException; /** * Any stateful sink needs to provide this state serializer and implement {@link * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link * #restoreWriter(WriterInitContext, Collection)} on recovery. * * @return the serializer of the writer's state type. */ SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer(); /** * A mix-in for {@link SupportsWriterState} that allows users to migrate from a sink with a * compatible state to this sink. */ @PublicEvolving interface WithCompatibleState { /** * A list of state names of sinks from which the state can be restored. For example, the new * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a * drop-in replacement when resuming from a checkpoint/savepoint. */ Collection<String> getCompatibleWriterStateNames(); } }
WithPreWriteTopology
The WithPreWriteTopology will be replaced by the SupportsPreWriteTopology mixin interface:
public interface SupportsPreWriteTopology<InputT> { /** * Adds an arbitrary topology before the writer. The topology may be used to repartition the * data. * * @param inputDataStream the stream of input records. * @return the custom topology before {@link SinkWriter}. */ DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream); }
WithPreCommitTopology
The WithPreCommitTopology will be replaced by the SupportsPreCommitTopology mixin interface. The addPreCommitTopology method is changed so the input stream and the output stream could contain different types which is needed for the original goal of this FLIP. Also we need a different serializer for the WriteResultT objects if they are rebalanced between the steps:
public interface SupportsPreCommitTopology<WriterResultT, CommittableT> { /** * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers * need to ensure to modify all {@link CommittableMessage}s appropriately. * * @param committables the stream of committables. * @return the custom topology before {@link Committer}. */ DataStream<CommittableMessage<CommittableT>> addPreCommitTopology( DataStream<CommittableMessage<WriterResultT>> committables); /** Returns the serializer of the WriteResult type. */ SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer(); }
WithPostCommitTopology
The WithPostCommitTopology will be replaced by the SupportsPostCommitTopology mixin interface:
public interface SupportsPostCommitTopology<CommittableT> { /** * Adds a custom post-commit topology where all committables can be processed. * * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming * modes do not require special cases. * * <p>All operations need to be idempotent: on recovery, any number of committables may be * replayed that have already been committed. It's mandatory that these committables have no * effect on the external system. * * @param committables the stream of committables. */ void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> committables); }
Extracted interfaces
The following interfaces were inner interfaces in the previous iteration of the Sink API. In the case of PrecommittingSinkWriter which was an inner class of TwoPhaseCommittingSink this prevented the evolution of the TwoPhaseCommittingSink, because of the generic types of the classes were tightly coupled. Also, extracting these interfaces will result in a Sink API which is similar to the Source API structure.
CommitterInitContext
CommitterInitContext was extracted from TwoPhaseCommittingSink:
public interface CommitterInitContext extends InitContext { /** @return The metric group this committer belongs to. */ SinkCommitterMetricGroup metricGroup(); }
CommittingSinkWriter
CommittingSinkWriter was extracted from TwoPhaseCommittingSink, and the original name was PrecommittingSinkWriter. This was changed so the coupling between the Committer, CommitterInitContext, CommittingSinkWriter and SupportsCommitter interfaces are more pronounced. We accepted the inheritance here, as it has a clear purpose:
public interface CommittingSinkWriter<InputT, CommittableT> extends SinkWriter<InputT> { /** * Prepares for a commit. * * <p>This method will be called after {@link #flush(boolean)} and before {@link * StatefulSinkWriter#snapshotState(long)}. * * @return The data to commit as the second step of the two-phase commit protocol. * @throws IOException if fail to prepare for a commit. */ Collection<CommittableT> prepareCommit() throws IOException, InterruptedException; }
WriterInitContext
WriterInitContext was extracted from StatefulSink:
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext { /** * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, but * are part of the jar file of a user job. * * @see UserCodeClassLoader */ UserCodeClassLoader getUserCodeClassLoader(); /** * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task thread * in between record processing. * * <p>Note that this method should not be used per-record for performance reasons in the same * way as records should not be sent to the external system individually. Rather, implementers * are expected to batch records and only enqueue a single {@link Runnable} per batch to handle * the result. */ MailboxExecutor getMailboxExecutor(); /** * Returns a {@link ProcessingTimeService} that can be used to get the current time and register * timers. */ ProcessingTimeService getProcessingTimeService(); /** @return The metric group this writer belongs to. */ SinkWriterMetricGroup metricGroup(); /** Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */ SerializationSchema.InitializationContext asSerializationSchemaInitializationContext(); /** Returns whether object reuse has been enabled or disabled. */ boolean isObjectReuseEnabled(); /** Creates a serializer for the type of sink's input. */ <IN> TypeSerializer<IN> createInputSerializer(); /** * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type * {@link MetaT} to the consumer. * * <p>It is recommended to use a separate thread pool to publish the metadata because enqueuing * a lot of these messages in the mailbox may lead to a performance decrease. thread, and the * {@link Consumer#accept} method is executed very fast. */ @Experimental default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() { return Optional.empty(); } }
StatefulSinkWriter
StatefulSinkWriter was extracted from StatefulSink. We accepted the inheritance here, as it has a clear purpose:
public interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> { /** * @return The writer's state. * @throws IOException if fail to snapshot writer's state. */ List<WriterStateT> snapshotState(long checkpointId) throws IOException; }
Other modified interfaces
Sink
As a result the Sink interface createWriter method parameter type should be changed:
public interface Sink<InputT> extends Serializable { /** * Creates a {@link SinkWriter}. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. */ SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException; }
Proposed Changes
Other than the interface changes we should modify the SinkTransformationTranslator.addCommittingTopology method to make sure that new parameter types are followed.
Compatibility, Deprecation, and Migration Plan
Deprecation
We will deprecate the old interfaces, but provide a default implementation based on the new API to allow Sink developers to migrate to the new API conveniently.
PublicEvolving interfaces
Some of the changes are affecting PublicEvolving interfaces:
- StatefulSink
- TwoPhaseCommittingSink
- Sink
These changes need to be deprecated and kept for at least 1 minor release (until 1.20 as per current plans at the point of writing this).
The proposed solution is to make the old interfaces extend the new ones.
StatefulSink
StatefulSink will extend the SupportsWriterState interface and provide a default implementation for the old methods/interfaces:
@Deprecated public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> { /** * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered * state. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. */ default StatefulSinkWriter<InputT, WriterStateT> restoreWriter( Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException { throw new UnsupportedOperationException( "Deprecated, please use restoreWriter(WriterInitContext, Collection<WriterStateT>)"); } /** * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered * state. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. */ default StatefulSinkWriter<InputT, WriterStateT> restoreWriter( WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException { return restoreWriter(new InitContextWrapper(context), recoveredState); } /** * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible * state to this sink. */ @PublicEvolving interface WithCompatibleState extends SupportsWriterState.WithCompatibleState {} /** * A {@link SinkWriter} whose state needs to be checkpointed. * * @param <InputT> The type of the sink writer's input * @param <WriterStateT> The type of the writer's state */ @PublicEvolving interface StatefulSinkWriter<InputT, WriterStateT> extends org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {} }
TwoPhaseCommittingSink
TwoPhaseCommittingSink will extend the SupportsCommitter interface and provide a default implementation for the old methods/interfaces:
@Deprecated public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT>, SupportsCommitter<CommT> { /** * Creates a {@link Committer} that permanently makes the previously written data visible * through {@link Committer#commit(Collection)}. * * @return A committer for the two-phase commit protocol. * @throws IOException for any failure during creation. * @deprecated Please use {@link #createCommitter(CommitterInitContext)} */ @Deprecated default Committer<CommT> createCommitter() throws IOException { throw new UnsupportedOperationException( "Deprecated, please use createCommitter(CommitterInitContext)"); } /** * Creates a {@link Committer} that permanently makes the previously written data visible * through {@link Committer#commit(Collection)}. * * @param context The context information for the committer initialization. * @return A committer for the two-phase commit protocol. * @throws IOException for any failure during creation. */ default Committer<CommT> createCommitter(CommitterInitContext context) throws IOException { return createCommitter(); } /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */ @PublicEvolving interface PrecommittingSinkWriter<InputT, CommT> extends CommittingSinkWriter<InputT, CommT> {} }
Sink.createWriter
The Sink.createWriter method interface deprecation will be handled like this:
public interface Sink<InputT> extends Serializable { /** * Creates a {@link SinkWriter}. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. * @deprecated Please implement {@link #createWriter(WriterInitContext)}. For backward * compatibility reasons - to keep {@link Sink} a functional interface - Flink did not * provide a default implementation. New {@link Sink} implementations should implement this * method, but it will not be used, and it will be removed in 1.20.0 release. Do not use * {@link Override} annotation when implementing this method, to prevent compilation errors * when migrating to 1.20.x release. */ @Deprecated SinkWriter<InputT> createWriter(InitContext context) throws IOException; /** * Creates a {@link SinkWriter}. * * @param context the runtime context. * @return A sink writer. * @throws IOException for any failure during creation. */ default SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException { return createWriter(new InitContextWrapper(context)); } /** The interface exposes some runtime info for creating a {@link SinkWriter}. */ @PublicEvolving @Deprecated interface InitContext extends org.apache.flink.api.connector.sink2.InitContext { [..] } /** * Class for wrapping a new {@link WriterInitContext} to an old {@link InitContext} until * deprecation. * * @deprecated Internal, do not use it. */ @Deprecated class InitContextWrapper implements InitContext { private final WriterInitContext wrapped; InitContextWrapper(WriterInitContext wrapped) { this.wrapped = wrapped; } @Override public int getSubtaskId() { return wrapped.getSubtaskId(); } [..] } }
Experimental interfaces
Some of the changes are affecting Experimental interfaces:
- WithPreWriteTopology
- WithPreCommitTopology
- WithPostCommitTopology
Based on FLIP-321 we could remove the interfaces in a new minor release, but to provide more graceful migration period, we can keep the old interfaces by extending the new ones.
WithPreWriteTopology
WithPreWriteTopology will extend the SupportsPreWriteTopology interface:
@Deprecated public interface WithPreWriteTopology<InputT> extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}
WithPreCommitTopology
WithPreCommitTopology will extend the SupportsPreCommitTopology interface, and need to provide a default implementation for the getWriteResultSerializer method:
@Deprecated public interface WithPreCommitTopology<InputT, CommT> extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPreCommitTopology<CommT, CommT> { /** Defaults to {@link #getCommittableSerializer} for backward compatibility */ default SimpleVersionedSerializer<CommT> getWriteResultSerializer() { return getCommittableSerializer(); } }
WithPostCommitTopology
WithPostCommitTopology will extend the SupportsPostCommitTopology interface:
@Deprecated public interface WithPostCommitTopology<InputT, CommT> extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPostCommitTopology<CommT> {}
Migration Plan
Migrating to the new interfaces should be straightforward following the next steps:
- StatefulSink implementations:
- Sink should implement SupportsStatefulWriter interface - no method name change, just interface name and import change
- Writer should implement the new StatefulWriter interface - import change only
- TwoPhaseCommittingSink
- Sink should implement SupportsCommitter interface - no method name change, just interface name and import change
- Writer should implement the new CommittingWriter interface - no method name change, just interface name and import change
- Sink
- If the Sink is used as a functional interface, then no change is needed
- It the Sink createWriter method is implemented then interface name and import change
- WithPreWriteTopology
- interface name and import change
- WithPreCommitTopology
- Implementation for getWriteResultSerializer is needed
- WithPostCommitTopology
- interface name and import change
Test Plan
As a first step, we create our own unit tests, but keep the original tests intact. This will allow us to confirm, that the changes are backward compatible. The new unit tests will confirm that the new features are working.
We will clearly demonstrate via the new Iceberg SinkV2 that the SupportsPreCommitTransformation is working as expected and propagated properly. The KafkaSink could be used to validate that the original usage of the TwoPhaseCommittingSink is also working.
Rejected Alternatives
Adding a new TwoPhaseCommittingSinkWithPreCommitTopology interface
The approach would have kept the changes minimal, stick to the current Sink API design. We introduce the required new combination of interfaces (TwoPhaseCommttingSinkWithPreCommitTopology, WithPostCommitTopologyWithPreCommitTopology), and do not change the API structure.
The advantages of this approach:
- Minimal change - smaller rewrite on the connector side
- Type checks happen on compile time
Based on the feedback of the community we decided to accept these downsides considering the following advantages of the mixin approach:
- Easier to evolve - this will become especially important when this API becomes Public
- Limits the number of required interfaces - as we do not have to create a specific interface combining the possible features
- The mixin approach is more similar to the current stable Source API interfaces - so developers will have easier time to onboard Flink
- Cons:
- Harder to evolve
- The number of interfaces increases with the possible combinations
- Inconsistent API patterns wrt Source API - harder for developers to understand
Adding the new methods directly to TwoPhaseCommittingSink
We considered to change the existing TwoPhaseCommittingSink interface instead of introducing a new one, but decided against it since it is already marked as PublicEvolving. Another reason is that the users of the current Sink API have not complained about the restrictions in the WithPostCommitTopology so clearly there are several important use-cases where the type transformation is not needed in the Sinks.
WithPreCommitTopology and WithPostCommitTopology to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface
In a previous iterator the FLIP suggested that we should change WithPreCommitTopology and WithPostCommitTopology interfaces to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface. This approach would have worked in this case, but the result would be awkward for cases where WithPostCommitTopology would be needed, but there is no need for WithPreCommitTopology. Also this solution is not too flexible and hard to apply if more interfaces are needed in the future.
The previously suggested interfaces:
public interface WithPreCommitTopology<InputT, WriteResultT, CommittableT> extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResultT, CommittableT> { /** * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers * need to ensure to modify all {@link CommittableMessage}s appropriately. * * @param committables the stream of committables. * @return the custom topology before {@link Committer}. */ DataStream<CommittableMessage<WriteResultT>> addPreCommitTopology( DataStream<CommittableMessage<CommittableT>> committables); }
public interface WithPostCommitTopology<InputT, WriteResult, CommT> extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResult, CommT> { /** * Adds a custom post-commit topology where all committables can be processed. * * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming * modes do not require special cases. * * <p>All operations need to be idempotent: on recovery, any number of committables may be * replayed that have already been committed. It's mandatory that these committables have no * effect on the external system. * * @param committables the stream of committables. */ void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables); }