Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.

Page properties

Discussion thread
Vote threadpending

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Table of Contents


It has been a long-standing issue that Flink’s Kafka Sink implementation for exactly-once delivery guarantees comes with two critical pitfalls (please see the Appendix for a detail recap of why these issues exist):

  1. Data loss can occur when Kafka aborts a successfully checkpointed transaction due to timeout. Users were recommended to mitigate this by setting a substantially large Kafka setting, but ultimately there’s always the chance of data loss if it takes long enough to restore the job. Moreover, Kafka brokers can limit ( broker config) the upper bound of the timeout, and users don't always have access to alter this (e.g. if they are writing to a managed Kafka cluster service).

  2. When a KafkaSink is restored from a checkpoint, checkpointed transactions need to be resumed and committed. Here, to create a producer instance that is capable of resuming the previous on-going transaction, the KafkaSink implementation relies heavily on Java reflection in order to bypass the Kafka transaction protocol, which by definition aborts any previous ongoing transactions when a producer session is recovered. The use of Java reflection makes upgrading Kafka clients in Flink hard and highly error-prone.

These issues exist because Kafka’s transaction protocol was originally designed with the simple “read-process-write“ loop in mind, where each transaction is made by a single consumer-producer process atomically reading from and writing to Kafka. Such transactions are expected to be short-lived only for the duration of this loop. Therefore, any client downtime would result in the transaction being aborted by Kafka after the client is recovered so that the new producer session can start with a clean state. The same goes for any transactions that are in-doubt for too long, e.g. if the producer process is lost forever, Kafka aborts the transaction on timeout to prevent it from permanently blocking downstream consumers.

These protocol behaviors directly prevent Kafka producers from being safely usable as a participant in an externally-coordinated 2PC transaction. As a 2PC participant, a Kafka producer needs to be able to guarantee that once a transaction is pre-committed, it is permanently durable and can stay in-doubt as long as it takes for the external coordinator to issue the second phase that decides the transaction fate. In Flink terms, once a KafkaSink subtask flushes its writes to the ongoing transaction and acknowledges the corresponding Flink checkpoint (the pre-commit voting phase), the transaction should remain in-doubt until all sink subtasks also acknowledge the checkpoint and then JM (the external transaction coordinator) notifies all subtasks to commit their transactions (the commit phase).

Kafka is attempting to address this with KIP-939: Support Participation in 2PC. In short, with KIP-939 it is now possible to declare that a transactional Kafka producer is participating in an externally-coordinated 2PC transaction. Effectively, this means Kafka acknowledges that an external transaction coordinator exists and has have full control in deciding transaction fate; Kafka will no longer proactively abort transactions from the 2PC-participating producer based on timeout, and will also allow resuming previous transactions so that the external coordinator has the final say in whether to commit or abort it. For a more detailed summary of KIP-939, please see KIP-939 Public Interface Summary.

This FLIP proposes to update Flink's KafkaSink connector to integrate with the new public APIs made available by KIP-939 for proper 2PC integration. As the KafkaSink was already implemented on top of Flink's TwoPhaseCommittingSink abstraction, the actual code changes required is minimal. Practically, this would include:

  • Remove FlinkKafkaInternalProducer, which implements the Java reflection accesses on the Kafka client to bypass the Kafka transaction protocol
  • Integrate with the new KIP-939 transaction operation API methods
  • Implement state schema migration paths (via savepointing and recovering the job) due to a required schema change in the KafkaCommittable

To conclude, this improvement brings the following benefits for our users:

  • No more risk of data loss when using the KafkaSink under EXACTLY_ONCE mode

  • A much more robust implementation that does not rely on Java reflection on the Kafka Java client. For Flink devs, this improves code maintainability for the KafkaSink, and overall makes it much easier to upgrade Kafka client version. For our users, this will indirectly resolve many productions issues users have been reporting with their Kafka-writing Flink jobs, such as long startup times and orphaned hanging transactions.

KIP-939 Public Interface Summary

KIP-939 adds proper support for 2PC participation with the following public-facing changes. Here we omit details on the underlying transaction protocol changes, and only describe changes on the Producer Java client API as that is what Flink’s KafkaSink interfaces with.

  1. transaction.two.phase.commit.enable producer client property: this property has been added for the Producer Java client, which should be set to true (default is false) for all producers started by Flink's KafkaSink to declare that the producers are participating in a distributed 2PC transaction, coordinated by Flink. Effectively, this tells Kafka to never abort these producers' transactions by timeout, and enables the following transaction operations below.

  2. New Producer#initTransactions(boolean keepPreparedTxn) method: alongside the existing Producer#initTransactions() method, an additional Producer#initTransactions(boolean) overload variant has been added. Just like the former, the new method should be called on producer initialization to obtain its internal producer ID and epoch from Kafka. The difference is that while the former alwaysaborts any old transactions issued by previous sessions of the producer, the latter variant allows choosing to retain the old transaction so that Flink may choose to either commit or abort it.
  3. New Producer#prepareTransaction()::PreparedTxnState method: calling this prepares / pre-commits the producer’s ongoing transaction. After calling this, the producer changes to a state where no more records can be written to the transaction, and only either Producer#commitTransaction(), Producer#abortTransaction(), or Producer#completeTransaction(PreparedTxnState) can be called to finalize the transaction.

  4. New Producer#completeTransaction(PreparedTxnState state)

For (2), Kafka’s new Producer#prepareTransaction() method fits naturally with Flink’s TwoPhaseCommittingSink.PrecommittingSinkWriter abstraction. For (3), this finally solves the KafkaSink’s long-standing tech debt of having to use Java reflection to bypass the old initTransactions() protocol; the KafkaSink can now use Kafka public APIs to resume previous successfully pre-committed transactions without it getting auto-aborted by Kafka. The following section below will go through the new integration in detail.

Public Interface Changes

This FLIP does not require any changes to the existing public Flink Source / Sink interfaces for connectors. It does affect other user-facing aspects specific to the Kafka Sink connector when used under EXACTLY_ONCE mode, namely: 1) Minimum required Kafka broker version, 2) KafkaCommittable state schema.

Minimum required Kafka Broker Version

The user’s Kafka cluster must be upgraded to a minimal version that supports KIP-939 in order to use the new KafkaSink version.

KafkaCommittable schema change

A checkpointed KafkaCommittable would now become only:

class KafkaCommittable {
    String transactionalId;
PreparedTxnState preparedTxnState; }

as opposed to the previous schema which contains (producerId, producerEpoch, TID). We no longer need to persist producerId and producerEpoch because the Kafka’s new InitPidRequest protocol version allows resuming the previous ongoing transaction instead of always aborting it, so the KafkaSink no longer needs to bypass the protocol by checkpointing these values only to inject them into the Kafka client at restore time.

The new PreparedTxnState object encapsulates metadata returned by the Kafka client upon pre-committing / preparing the transaction.

Any user tooling that reads Flink checkpoints to, for example, inspect TIDs need to be updated to recognize the new schema.

Proposed Changes for Flink’s KafkaSink

Here we’ll go through the interaction changes between Flink’s KafkaSink and Kafka throughout the following lifecycles of the 2PC integration: 1) pre-commit phase (i.e. on checkpoint barrier), 2) commit phase (i.e. on checkpoint complete RPC notification), and 3) restore phase (i.e. on failure and restoring from successful Flink checkpoint).

As you’ll see, none of the main 2PC interaction flow has been altered and we also do not require any changes to the TwoPhaseCommittingSink abstraction. The actual changes are quite minimal as we're mainly just calling 2 new methods on Kafka's producer Java client, but for the sake of completeness, this FLIP will briefly illustrate the full 2PC integration while highlighting the actual changes.

Pre-Commit Phase (on checkpoint)

On pre-commit (i.e. when the checkpoint barrier arrives at a KafkaSink operator subtask), the following operations are performed:

  1. Current producer is flushed and the current transaction is prepared using the new Producer#prepareTransaction() method.

  2. The producer instance holding the prepared transaction, as well as the TID of the prepared transaction, is collected in a buffer ordered by the checkpoint ID associated with the pre-commit. The buffer may contain other prepared producers/TIDs of earlier pre-committed transactions (i.e. of previous checkpoints) that have not been committed yet.

  3. The buffer is snapshotted to obtain all pre-committed TIDs that are awaiting to be committed. This snapshot is written to Flink managed state backend for checkpoint persistence.

  4. Finally, the subtask obtains a new producer instance (with a different TID) for the next checkpoint’s transaction.

The above steps are done atomically w.r.t. the checkpoint barrier and don’t necessarily need to be done in this exact order (except from step 1 and 2).

Note that in step 3, we only need to snapshot the TID of transactions in Flink checkpoints, as opposed to before where the TID, internal producer ID, and epoch (latter two obtained via Java reflection on the producer instance) needs to be snapshotted. This is because on restore, we no longer need to inject the internal producer ID and epoch on resumed producer instances to bypass the initTransactions() method. More on this in the On Restore section.

Failure Points

  • If the flush() or prepareTransaction() call in step 1 fails, then we strictly fail the job as this is a synchronization point failure in 2PC.

  • Failing to write the snapshot of the buffer in step 3 to Flink’s managed state backend is also a synchronization point failure and will fail the job.

  • For step 4, if a new producer instance cannot be obtained for the next checkpoint’s transaction (either the initialization failed, or all possible TIDs have been depleted which is a possibility with FLIP-ZZZ: TID Pooling for KafkaSink), we choose to fail the job by design. Strictly speaking, this is just implementation detail and not a synchronization point failure for 2PC; theoretically, it is possible to delay / retry obtaining the new producer instance.

Commit Phase (on checkpoint complete notification)

The commit phase (i.e. when all KafkaSink subtasks have successfully checkpointed, and a RPC notification is sent from the JM to notify them of the completion) remains the same without any changes required.

When a KafkaSink subtask is notified that checkpoint N was successfully completed, all buffered prepared producers up to checkpoint N (recall that the buffer is ordered by checkpoint ID) will be retrieved and committed. Afterwards, the prepared producers may be removed from the buffer and released (depending on the implementation, this can mean either putting it back into a producer instance pool for reuse by future checkpoints if we reuse TIDs, or just closed).

Failure Points

  • If commitTransaction() call fails for any transaction, the KafkaSink retries the commit later. After attempting up to a maximum number of retries, only then does the Flink job fail.

Restore Phase (on operator initialization)

On restore, the KafkaSink will fully resolve (either commit or abort) all dangling transactions possibly started by previous execution attempts, taking into account that previous executions may have been executed with varying parallelism settings. This restore-time resolution logic ensures that the new execution attempt starts from a clean state w.r.t. transactions status' within Kafka.

All open transactions from previous execution attempts can be categorized as the following:

  1. TIDs within PrecommittedRange: these TIDs have been successfully pre-committed, and therefore are always persisted within completed Flink checkpoints. When the Flink job fails and restores from a checkpoint, all TIDs read from the checkpoint are within PrecommittedRange and should be committed.

  2. TIDs outside of PrecommittedRange: these TIDs were NOT successfully pre-committed, and are NOT written in the restored Flink checkpoint. All TIDs in this range need to be aborted. To obtain the TIDs, the KafkaSink has to either 1) query Kafka to list all TIDs that are possibly within this range, or 2) iterate through all possible TIDs that can be in this range.

Determining the PrecommittedRange depends on the specific algorithm the KafkaSink uses to construct TIDs. For example, as of v3.0.0 of the KafkaSink, TIDs are constructed as {userPrefix}-{subtaskId}-{checkpointId} and therefore always strictly increasing as the job runs. This means that for a given restored checkpoint ID N, the PrecommittedRange of TIDs to commit is simply all TIDs with checkpointId portion up to N, while all other TIDs with {checkpointId} portion being N+1 up to should be aborted. For other TID construction algorithms, e.g. FLIP-ZZZ: TID Pooling for KafkaSink, calculating the PrecommittedRange would be different. For the purpose of this FLIP, the specific TID construction algorithm is orthogonal implementation detail; it is sufficient to assume that there is a deterministic PrecommittedRange of possible TIDs that need to be committed, while all TIDs outside of that range should be aborted.

The main notable change here is in case (1), where for each restored TID the KafkaSink needs to create a new producer instance to resume the transaction and commit it. As opposed to before where we had to create this producer instance and use Java reflection to inject the internal producer ID and epoch in order to bypass the initTransactions() call, we can now simply initialize the producer using initTransactions(true) to retain the previous prepared transaction.

Compatibility, Deprecation, and Migration Plan

Upgrading to new KafkaSink

To upgrade, Flink jobs using older versions of the KafkaSink will need to do the following:

  1. Upgrade their Kafka cluster version to a minimum version that supports KIP-939.

  2. If authentication is enabled on the Kafka cluster, make sure that it is configured so that respective users have the TWO_PHASE_COMMIT ACL permissions set on the TransactionalId resource.

  3. Take a savepoint of their Flink job, and then stop it.

  4. Upgrade their job application code to use the new KafkaSink version. No code changes are required from the user; they simply need to upgrade the flink-connector-kafka dependency and recompile the job jar.

  5. Submit the upgraded job jar, configured to restore from the savepoint taken in step 3.

Note that if step 5 is done while the Kafka cluster upgrade is still in the progress of being rolled out, the job will fail whenever a transaction request reaches a Kafka broker that has not been rolled yet. It is recommended to only upgrade the Flink job once the Kafka cluster upgrade has been fully completed.

It will be strongly recommended to upgrade the KafkaSink version as soon as possible, since it inherently poses a risk of data loss.

Relaxing TWO_PHASE_COMMIT ACL requirement for smoother upgrade path

So far, when authentication is enabled for the Kafka cluster, KIP-939 assumes that the TWO_PHASE_COMMIT ACL is setup in order for authenticated producer clients to set transaction.two.phase.commit.true to true as well as use Producer#initTransactions(true) to resume previous transactions. In other words, to use the new KafkaSink, the TWO_PHASE_COMMIT ACL must be setup as mentioned in step 2 in the section above. KIP-939 gates 2PC participation behind a new ACL because it is arguably a dangerous feature when used incorrectly (e.g. when used without a properly functioning external transaction coordinator, or even the lack of one).

However, it is entirely possible that a Flink user may not have access to the Kafka cluster admin to setup the TWO_PHASE_COMMIT ACL. For example, a Flink user may be using a cloud service provider for their Kafka deployment, with which ACL setup may be out of their control. In this case, users are hard-blocked in upgrading to the new KafkaSink.

If the Flink community thinks that it is important to still allow users to upgrade to the new KafkaSink in the above scenario, as a joint FLIP-KIP across the Flink and Kafka community, it may be possible to have KIP-939 relax the ACL constraint such that the Producer#initTransactions(true) operation only needs WRITE ACL to work and not the extra new TWO_PHASE_COMMIT ACL. In other words, producer clients can still resume previous transactions without having to enable 2PC. Under this mode with the new KafkaSink, since 2PC participation is not enabled, transactions may still be aborted by Kafka based on timeout (and potentially cause data loss), but at least it no longer requires Java reflection to bypass Kafka’s transaction protocol and manually inject the producer ID and epoch. Upgrading to the new KafkaSink is still highly desirable in this case as it will be working against stable Kafka client APIs and much less likely to bump into issues.

Appendix: Known Critical Issues with current KafkaSink

Data Loss Scenario

Data loss under exactly-once mode occurs for Flink's KafkaSink under the following scenario:

  1. Upon receiving the checkpoint barrier, KafkaSink subtasks successfully flushes to their current transaction, and acknowledges the JM of completion.
  2. After the JM receives acknowledgement from all sink subtasks, it commits the current checkpoint as completed (in other words, voting phase of 2PC succeeds and is recorded by the transaction manager).
  3. To trigger following commit phase of 2PC, the JM notifies all sink subtasks of checkpoint completion.
  4. KafkaSink subtasks receive the notification, but by the time it tries to commit the pre-committed transaction, Kafka had already aborted the transaction due to exceeding
  5. Since the decision that the global distribution transaction has succeeded was already committed by Flink, it could only attempt to restart from the completed checkpoint and try to re-commit the transactions, only to repeatedly fail to do so.

Generally speaking, as long as there is enough delay for sink subtasks to receive checkpoint complete notifications to trigger the second phase of 2PC, the scenario described above can occur. This can happen simply due to a misfortunately configured combo of Flink's checkpoint interval / delay and Kafka transaction timeout, or if the Flink job happens to fail right after a checkpoint completes and remains down for an extended amount of time.

Using Java Reflections to Bypass InitProducerId request on Restore

At any point in time, a Flink job may fail for any reason, causing it restart from the last completed Flink checkpoint. Within the Flink checkpoints, the KafkaSink stores TIDs (Kafka's of Kafka transactions that were pre-commited as part of the checkpoint. On restart, these TIDs are restored and their previous ongoing transactions should be (possibly redundantly) committed.

The main issue is that Kafka, by definition of its current transaction protocol, does not allow a recovered producer client instance to resume its previous transaction. By protocol, all newly created or recovered producer clients are expected to issue an InitProducerId request to initialize the producers (e.g. obtain internal producer id and epoch). When brokers receive this initialization request, it would check if the producer instance had a previous on-going transaction, and if so, abort it.

To be able to resume pre-commited transactions, Flink's KafkaSink avoids having to issue InitProducerId  after recovering producer clients by 1) extracting the internal producer id and epoch, and storing them alongside TIDs in Flink checkpoints, 2) on restart, these values are restored and injected in the client, and 3) alter the client state so that it is considered to be mid-transaction. All of this is done via Java reflection as these are not publicly accessible.

The code that implements all of this can be found in FlinkKafkaInternalProducer :