Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Flink's exactly-once Kafka sink implements a two-phase commit pattern on top of Kafka's transactional protocol. To accommodate Flink's checkpointing intervals and potential recovery delays, the Kafka transaction timeout is typically increased well beyond Kafka's defaults. While necessary for correctness during normal operation, this creates a significant operational hazard when lingering ONGOING transactions are left unresolved.

Lingering Kafka transactions can occur whenever Flink does not complete its normal commit-or-abort protocol for an open transaction. Common scenarios include:

  • A job fails and never recovers (exceeds max restart attempts, cluster decommissioned, or job abandoned)

  • A job is canceled

  • A transient failure (e.g., network issue) prevents the committer from completing the Kafka transaction during a savepoint, leaving the savepoint with committed Flink state but an uncommitted Kafka transaction

  • This list is not exhaustive - any scenario where Flink fails to commit or abort a transaction can result in lingering transactions

In all these cases, Kafka transactions remain in the ONGOING state until the broker's transaction coordinator aborts them after the configured timeout. During this window, downstream consumers configured with isolation.level=read_committed are blocked at the Last Stable Offset (LSO) and cannot make progress. That may effectively cause a production outage for all downstream systems. When the transaction eventually times out and is aborted by the broker, any data written within that transaction is lost, even though Flink's checkpoint or savepoint considered it successfully delivered.

Currently, there is no built-in tooling to resolve this situation. Aborting a lingering transaction requires knowledge of Kafka's producer fencing mechanism, while committing one requires the exact producerId and epoch from Flink's checkpoint state and the use of FlinkKafkaInternalProducer, which relies on reflection to resume a transaction. This coupling to Flink internals means the tool must live within the Flink connector ecosystem rather than being a generic Kafka utility.

This FLIP proposes a dedicated CLI tool, shipped as part of flink-connector-kafka, that allows operators to abort or commit lingering Kafka transactions produced by Flink's exactly-once sink. The initial scope covers two operations: abort (via producer fencing) and commit (via transaction resumption with producerId/epoch). Future enhancements may include list/describe commands for transaction discovery, integration with the State Processor API to automatically extract producerId/epoch from checkpoint metadata, and bulk operations across multiple transactional IDs.

Public Interfaces

This FLIP introduces the following public interfaces:

1. A new CLI tool packaged as a standalone uber-jar (flink-connector-kafka-transaction-tool-<version>-uber-jar.jar) with the following command-line arguments:

  • --action: The operation to perform (abort or commit)
  • --bootstrap-servers: Kafka bootstrap servers
  • --transactional-id: The transactional ID of the lingering transaction
  • --producer-id: (required for commit) The producer ID from Flink checkpoint state
  • --epoch: (required for commit) The producer epoch from Flink checkpoint state
  • --command-config: (optional) Path to a properties file with additional Kafka client configuration (e.g., SSL/SASL settings)

2. A new Maven artifact: org.apache.flink:flink-connector-kafka-transaction-tool

The tool is invoked as:

java -jar flink-connector-kafka-transaction-tool-<version>-uber-jar.jar \
    --action <abort|commit>       \
    --bootstrap-servers <servers> \
    --transactional-id <id>       \
    --producer-id <pid>           # required for commit \
    --epoch <epoch>               # required for commit \
    --command-config <file>       # optional: Kafka client properties (e.g., SSL/SASL)

A --help flag displays usage information.

No existing public interfaces are modified or removed. All new Java classes are annotated with @Internal.

Usage Examples

Abort a lingering transaction:

java -jar flink-connector-kafka-transaction-tool-5.0-uber-jar.jar \                                                        
    --action abort                    \
    --bootstrap-servers broker1:9092  \                                                                                     
    --transactional-id kafka-sink-0-1

Commit a lingering transaction:

java -jar flink-connector-kafka-transaction-tool-5.0-uber-jar.jar \                                                        
    --action commit                   \
    --bootstrap-servers broker1:9092  \                                                                                     
    --transactional-id kafka-sink-0-1 \                                                                                    
    --producer-id 15                  \                                                                                                     
    --epoch 3                                                                                                                                                           

Connect to a secured Kafka cluster:

java -jar flink-connector-kafka-transaction-tool-5.0-uber-jar.jar \
    --action abort                    \
    --bootstrap-servers broker1:9093  \
    --transactional-id kafka-sink-0-1 \
    --command-config client.properties  

Proposed Changes

Overview

A new Maven module flink-connector-kafka-transaction-tool will be added to the flink-connector-kafka repository. It produces a self-contained uber-jar that operators can run without a Flink cluster.

Operations

This tool is intended for scenarios where the Flink job is no longer running and will not restart (e.g., maximum restart attempts exceeded or no Flink cluster available). If the job can restart, let Flink's built-in recovery mechanism handle it, manual intervention is not needed.

The tool supports two operations: 

Abort

Aborts a lingering transaction by creating a new KafkaProducer with the same transactional.id and calling initTransactions(). This triggers Kafka's producer fencing mechanism: the broker bumps the epoch, fences the previous producer, and aborts its open transaction. This operation only requires the bootstrap servers and the transactional ID.

If the transaction has already been aborted or transaction.timeout.ms expired, the abort operation succeeds silently.

Commit 

Commits a lingering transaction by resuming it with the exact producerId and epoch from Flink's checkpoint state or logs. This uses FlinkKafkaInternalProducer.resumeTransaction(), which sets the producerId and epoch via reflection, bypassing initTransactions() to avoid fencing and aborting the transaction.

This operation requires the bootstrap servers, transactional ID, producerId, and epoch.

If the transaction has already been aborted (either by broker timeout or by another producer fencing), the commit will fail - the data from an aborted transaction cannot be recovered. If the transaction has already been committed, the commit operation succeeds silently (idempotent). In both failure cases, the tool does not alter the existing transaction state or data in Kafka.

How to obtain producerId and epoch

To commit a transaction, the operator needs the producerId and epoch.

These can be obtained from:

  • Flink job logs (logged during checkpoint)
  • Kafka's Admin API: AdminClient.describeTransactions() returns the producerId and producer's epoch for an ONGOING transaction. Note: If the Flink job has restarted, AdminClient.describeTransactions() may return the metadata of the new, active transaction - not the lingering one. Committing with these values would interfere with the running job's state. Use this method only when you have confirmed the job is stopped.
  • Flink checkpoint metadata (future enhancement via State Processor API integration)

Packaging

The tool is packaged as an uber-jar using maven-shade-plugin. It bundles:

  • Kafka clients
  • Flink's FlinkKafkaInternalProducer (for the commit path) 
  • slf4j-simple for logging
  • commons-cli for argument parsing 

The uber-jar is published to Maven Central as a classified artifact (classifier: uber-jar) alongside the thin jar.

The tool can be executed from the broker pod directly (where authentication is typically not required) or from any machine with network access to the Kafka cluster. For secured clusters, the --command-config option allows passing a properties file with SSL/SASL and other Kafka client configuration.

Documentation

Website documentation will be provided as part of this FLIP's implementation, including:

  • Instructions for downloading the tool
  • Tool overview and usage examples
  • Operational runbook with step-by-step instructions for identifying and resolving lingering transactions

Future Work

The following enhancements are out of scope for the initial implementation but represent natural extensions:

  • list command: List all transactions matching a transactional ID prefix, showing their state, producerId, and epoch

  • describe command: Show detailed information about a specific transaction

  • Bulk abort: Abort all transactions matching a prefix in a single invocation

  • State Processor API integration: Automatically extract producerId and epoch from Flink checkpoint/savepoint metadata

Compatibility, Deprecation, and Migration Plan

This FLIP introduces a new standalone tool. It does not modify any existing APIs, classes, or behavior. There is no impact on existing users and no migration is required.

Test Plan

The tool is tested at two levels:

  • Unit tests verify input validation and CLI argument parsing (KafkaTransactionManagerTest, KafkaTransactionToolTest).

  • Integration tests using Testcontainers verify end-to-end behavior against a real Kafka broker (KafkaTransactionManagerITCase):

    • Abort a specific transaction while leaving other transactions unaffected.

    • Commit a specific transaction with correct producerId/epoch and verify records become readable by read_committed consumers.

    • Verify idempotent behavior: abort after timeout succeeds, commit after commit succeeds, commit after abort fails.
    • Verify that commit with incorrect producerId fails.

    • Verify transaction handling during a savepoint-based job lifecycle using a MiniCluster with an exactly-once KafkaSink.

Rejected Alternatives

Use Kafka's built-in kafka-transactions.sh tool

Kafka ships a transaction management script, but it only supports listing and describing transactions. It cannot commit a Flink transaction because that requires resuming the transaction with a specific producerId and epoch using FlinkKafkaInternalProducer's reflection-based approach. Aborting is possible via kafka-transactions.sh (by fencing the producer), but providing both operations in a single Flink-aware tool gives operators a consistent interface.

Wait for transaction timeout

Operators can wait for the broker to abort the transaction after the configured timeout. However, this means downstream consumers are blocked for the entire timeout duration (typically increased well beyond Kafka's defaults for Flink's exactly-once sink), and any data in the transaction is lost upon abort. The tool allows operators to resolve the situation immediately and, in the commit case, without data loss.