Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

During the migration of the Iceberg Sink to the new Flink Sink V2 API we found that there is no way to emit metrics from the committer. This would cause loss of functionality when migrating to the SinkV2 implementation, as previously it was possible to emit commit related metrics (commit duration, committed data files, committed data files size etc), but after the migration there is no way to archive the same functionality. We should also provide similar context information as we do in the Writer’s case, which should be discussed further on the mailing list.

Public Interfaces

We should change the TwoPhaseCommitSink interface by adding a new createCommitter(InitContext context) method, and we should deprecate the old createCommitter() method, like this:

@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
[..]
    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;
    
    @Deprecated
    Committer<CommT> createCommitter() throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    default Committer<CommT> createCommitter(CommitterInitContext context) throws IOException {
       return createCommitter();
    }

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommT> getCommittableSerializer();
[..]
   /** The interface exposes some runtime info for creating a {@link Committer}. */
    @PublicEvolving
    interface CommitterInitContext {
        /**
         * The first checkpoint id when an application is started and not recovered from a
         * previously taken checkpoint or savepoint.
         */
        long INITIAL_CHECKPOINT_ID = 1;

        /** @return The id of task where the committer is running. */
        int getSubtaskId();

        /** @return The number of parallel committer tasks. */
        int getNumberOfParallelSubtasks();

        /**
         * Gets the attempt number of this parallel subtask. First attempt is numbered 0.
         *
         * @return Attempt number of the subtask.
         */
        int getAttemptNumber();

        /** @return The metric group this committer belongs to. */
        SinkCommitterMetricGroup metricGroup();

        /**
         * Returns id of the restored checkpoint, if state was restored from the snapshot of a
         * previous execution.
         */
        OptionalLong getRestoredCheckpointId();

        /**
         * The ID of the current job. Note that Job ID can change in particular upon manual restart.
         * The returned ID should NOT be used for any job management tasks.
         */
        JobID getJobId();
    }
}

We should also implement the new SinkCommitterMetricGroup to provide basic committer metrics status, like this:

public interface SinkCommitterMetricGroup extends OperatorMetricGroup {

    /** The total number of committables arrived. */
    Counter getNumCommittablesTotalCounter();

    /**
     * The total number of committable failures.
     */
    Counter getNumCommittableErrorsCounter();

    /**
     * The total number of successful committables.
     */
    Counter getNumCommittableSuccessCounter();

    /**
     * The pending committables.
     */
    Gauge getNumPendingCommittablesGauge();
}

Proposed Changes

Similarly to the SinkWriterOperator we should get the Sink as a constructor parameter for the CommitterOperator and we should create the Committers in the CommitterOperator.initializeState. The CommitterInitContext should be based on the StateInitializationContext.

Also we should implement the metrics, which is quite simple for the Counters, but we need to find a way to aggregate the pending Committables if possible.

Compatibility, Deprecation, and Migration Plan

Existing SinkV2 implementations could rely on the default implementation of the createCommitter(CommitterInitContext context) method, and the migration could be easily done during the deprecation period by moving the old implementation of the createCommitter() to the new method. Otherwise this is an additional feature which should not impact other users.

Test Plan

We will clearly demonstrate via the new Iceberg SinkV2 that the metrics are working as expected and propagated properly.

Rejected Alternatives

We could avoid changing the createCommitter() method by adding a new init(CommitterInitContext context) method to the Committer interface, but this could cause confusion around the API usage, as the Writer and the Committer would achieve the same functionality in a different way. We opted for the solution which would cause a more solid API in the longer term.

public interface Committer<CommT> extends AutoCloseable {
    /**
     * Initialize the committer with the {@link CommitterInitContext}.
     *
     * @param context The context information for the committer initialization.
     * @throws IOException for reasons that may yield a complete restart of the job.
     */ 
    void init(CommitterInitContext context) throws IOException, InterruptedException;

    /**
     * Commit the given list of {@link CommT}.
     *
     * @param committables A list of commit requests staged by the sink writer.
     * @throws IOException for reasons that may yield a complete restart of the job.
     */
    void commit(Collection<CommitRequest<CommT>> committables)
            throws IOException, InterruptedException;
[..]
}