Status

Current state: Under discussion

Discussion thread: thread-1 thread-2

Voting thread: 

JIRA: Related KAFKA-20381 - Getting issue details... STATUS

Summary: Extract transaction identity and lifecycle management from `KafkaProducer` into a first-class
`TransactionSession` client abstraction, enabling any Kafka client (producer, share consumer, external coordinator, or any other entity) 
to participate in transactions without producer coupling or reflection hacks.


1. Motivation

1.1 The Problem: Multi-Participant Transactions Need a Shared Identity Abstraction

Kafka's transaction coordinator is a general-purpose distributed transaction manager. Its metadata (TransactionMetadata) stores:

```
transactionalId -- session identity
producerId -- session token
producerEpoch -- fencing token
state -- lifecycle state (ONGOING, PREPARE_COMMIT, ...)
topicPartitions -- participating partitions
txnTimeoutMs -- correctness bound
```

None of these fields are producer-specific. The naming is historical -- producerId and producerEpoch could equally be called sessionId and sessionEpoch. The wire protocol RPCs (InitProducerId, AddPartitionsToTxn, EndTxn, TxnHeartbeat) carry only these identity fields. They do not carry a "client type" discriminator.

What KIP-939 Solved

KIP-939 enabled Kafka to act as a formal participant in Two-Phase Commit (2PC) transactions by:

  • Adding Recovery APIs: Introduced prepareTransaction() and completeTransaction(), allowing external systems to coordinate Kafka's commit/abort status.

  • Resilient Resumption: Allowed new producers to resume a "prepared" transaction after a crash without automatically aborting it.

  • Removing Timeouts: Provided an enable2Pc mode that sets transaction timeouts to infinity, preventing premature aborts during external coordination.

The Unsolved Problem

While KIP-939 added the right tools, it put them in the wrong place:

  • Wrong Abstraction: Transaction methods (like preparing and completing) are forced into the KafkaProducer class, even though they don't involve producing records.

  • Heavyweight Bloat: To simply commit a transaction, you are forced to instantiate a full KafkaProducer with its entire infrastructure (buffer pools, sender threads, serializers).

  • Inaccessible Logic: The core "brain" of transactions—the state machine and coordinator logic—is buried in internal producer packages, making it impossible for other clients (like consumers or custom coordinators) to use transactions independently.


1.2 Entities Already Need Transaction Participation

The Kafka ecosystem already has six distinct entity types that interact with the transaction coordinator, each working around the producer-centric API:

EntityRoleCurrent WorkaroundCore Pain Point
KafkaProducerOwnerNative APITransaction logic is monolithic and coupled to the produce path.
Kafka StreamsEOS ProcessorWraps ProducerLifecycle is forced to match record batching.
Connect SourceEOS IngestWraps ProducerBoundary management is coupled to producer lifecycle.
Connect SinkKIP-1302N/ANo way to share transaction identity between consumer and producer.
Flink / 2PCExternal CommitterReflectionFragile; manually forces state into TransactionManager internals.
Share ConsumerKIP-1289N/AMust "borrow" Producer IDs without a proper client abstraction.



2. Public Interfaces

2.1 New Class: TransactionSession

Package: org.apache.kafka.clients.transaction

TransactionSession is a lightweight, thread-safe client focused strictly on transaction coordinator interaction.

It decouples lifecycle management from record production.

```
public class TransactionSession implements Closeable {

    public TransactionSession(Map<String, Object> configs);

    public static TransactionSession resume(
        String transactionalId,
        long producerId,
        short producerEpoch,
        Map<String, Object> configs
    );

    // --- Lifecycle ---
    public void initialize();
    public void beginTransaction();
    public PreparedTxnState prepareTransaction();
    public void completeTransaction(PreparedTxnState preparedTxnState);
    public void commitTransaction();
    public void abortTransaction();

    // --- Identity ---
    public String transactionalId();

    // --- Participation ---
    public void addPartitionsToTransaction(Collection<TopicPartition> partitions);
    
    public void sendOffsetsToTransaction(
        Map<TopicPartition, OffsetAndMetadata> offsets,
        ConsumerGroupMetadata groupMetadata
    );

    public void addShareAcksToTransaction(
        String groupId,
        Collection<ShareAcknowledgment> acknowledgments
    );

    // --- Liveness ---
    public void heartbeat();

    @Override
    public void close();
}

```

2.2 Configuration

TransactionSession accepts a subset of existing producer configs plus one new config:

Config

Source

Description

transactional.id

Existing producer config

The transactional ID for this session. Required.

transaction.timeout.ms

Existing producer config

Correctness timeout for the transaction.

transaction.session.timeout.ms

KAFKA-20381 - Getting issue details... STATUS (new)

Session heartbeat timeout. -1 to disable.

bootstrap.servers

Existing

Broker addresses for coordinator discovery.

security.*

Existing

Authentication/encryption settings.

client.id

Existing

Client identifier for metrics and logging.

No new broker-side configs. No new wire protocol RPCs. TransactionSession sends the same RPCs that KafkaProducer sends today:

 FindCoordinator, InitProducerId, AddPartitionsToTxn, EndTxn, AddOffsetsToTxn, TxnOffsetCommit, TxnHeartbeat (KIP 1309).

2.3 Integration with KafkaProducer

KafkaProducer gains a new constructor and method to accept an external TransactionSession:


```
public class KafkaProducer<K, V> {
    /**
     * Bind producer to an external session. 
     * Lifecycle methods (e.g., beginTransaction) throw IllegalStateException.
     */
    public KafkaProducer(Map<String, Object> configs, TransactionSession session);

    /**
     * Access the session (internal or external). Returns null if non-transactional.
     */
    public TransactionSession transactionSession();
}

```


Backward Compatibility: Constructing a producer with transactional.id in the config works as it does today.

It will internally create a TransactionSession and use its existing methods (initTransactions, beginTransaction, etc.) as convenience wrappers.

No code changes are required for existing users.

2.4 Integration with KafkaShareConsumer

For KIP-1289 transactional acknowledgments, the share consumer accepts a TransactionSession:

```
public class KafkaShareConsumer<K, V> {

    /**

   * Acknowledge records within the provided session.

     */

    public void acknowledgeTransactionally(

        TransactionSession session,

        Map<TopicPartition, Set<Long>> acknowledgments

    );

}
```


3. Proposed Changes

3.1 Architecture: Before and After

Before (current):


After (this KIP):




3.2 Extracting TransactionSession from TransactionManager

ComponentResponsibility
TransactionSession (New)Identity (ID/Epoch), Lifecycle State Machine, Coordinator RPCs, and Heartbeat thread.
TransactionManager (Slimmed)Sequence numbers, in-flight partition management, and Sender thread interaction.

State mapping:

Current TransactionManager.StateNew TransactionSession.State
UNINITIALIZED / INITIALIZINGUNINITIALIZED / INITIALIZING
READY / IN_TRANSACTIONREADY / IN_TRANSACTION
PREPARED_TRANSACTIONPREPARED
COMMITTING_TRANSACTIONCOMMITTING
ABORTING_TRANSACTIONABORTING
ABORTABLE_ERROR / FATAL_ERRORABORTABLE_ERROR / FATAL_ERROR

3.3 No Wire Protocol Changes

This KIP reuses existing RPCs and schemas. TransactionSession becomes the new sender for all transaction-related requests:


RPCCurrent SenderNew Sender
FindCoordinator / InitProducerIdTransactionManagerTransactionSession
AddPartitionsToTxn / EndTxnTransactionManagerTransactionSession
AddOffsetsToTxn / TxnOffsetCommitTransactionManagerTransactionSession
TxnHeartbeat (KIP-1309)N/ATransactionSession
AddShareAcksToTxn (KIP-1289)N/ATransactionSession


3.4 Internal Network Client


TransactionSession uses a lightweight NetworkClient for coordinator communication, similar to the consumer's HeartbeatThread.


Dedicated Connection - Maintains its own connection to the coordinator broker.


4. Use Cases

4.1 Apache Flink: Lightweight Transaction Completion

Flink's exactly-once KafkaSink separates the write path (writer subtask) from the commit path (committer). 

```java
// Before (Flink -- reflection, fragile)
FlinkKafkaInternalProducer producer = new FlinkKafkaInternalProducer(configs);
producer.resumeTransaction(producerId, epoch); // reflection on TransactionManager internals
producer.commitTransaction();

// After (this KIP -- public API, stable)
TransactionSession session = TransactionSession.resume(
transactionalId, producerId, epoch, configs
);
session.commitTransaction();
session.close();
```

4.2 Share Group Consumer: Transactional Acknowledgments (KIP-1289)

```
// 1. Open shared session
TransactionSession session = new TransactionSession(configs);
session.initialize();
session.beginTransaction();

// 2. Producer writes records using session identity
KafkaProducer<K, V> producer = new KafkaProducer<>(producerConfigs, session);
producer.send(outputRecord);

// 3. Share consumer acknowledges within the SAME transaction
shareConsumer.acknowledgeTransactionally(session, acknowledgments);

// 4. Atomic commit for both entities
session.commitTransaction();

```

4.4 Kafka Connect: Exactly-Once Source Tasks

```java

class ExactlyOnceWorkerSourceTask {
    private TransactionSession session;
    private KafkaProducer<byte[], byte[]> producer;

    void maybeBeginTransaction() { session.beginTransaction(); }
    void commitTransaction()      { session.commitTransaction(); }
}

```



4.5 Custom 2PC Coordinators

```java

// Phase 1: Prepare
kafkaSession.beginTransaction();
producer.send(records);
database.prepareTransaction(dbTxnId); 
kafkaSession.prepareTransaction();    

// Phase 2: Commit (Recovery-friendly)
TransactionSession resumed = TransactionSession.resume(txnId, pid, epoch, configs);
resumed.commitTransaction();          
database.commitTransaction(dbTxnId);


```

4.6 Kafka Connect Sink with Share Groups: Exactly-Once Kafka-to-Kafka (KIP-1302)


// KIP-1302 exactly-once pattern with TransactionSession
void iterationExactlyOnce() {
    ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(pollTimeout);
    
    session.beginTransaction();
    try {
        // 1. Producer writes output records
        for (SinkRecord record : convertMessages(records)) {
            producer.send(new ProducerRecord<>(outputTopic, record.key(), record.value()));
        }

        // 2. Share consumer acknowledges input within the SAME session
        session.addShareAcksToTransaction(
            shareConsumer.groupMetadata().groupId(),
            ShareAcknowledgements.fromRecords(records, AcknowledgeType.ACCEPT)
        );

        session.commitTransaction();
    } catch (Exception e) {
        session.abortTransaction();
    }
}




Architectural comparison:




5. Compatibility, Deprecation, and Migration Plan

5.1 Full Backward Compatibility

KafkaProducer constructed with transactional.id in the config continues to work exactly as today.

Internally, it creates a TransactionSession and delegates to it, but the public API (initTransactions(), beginTransaction(), commitTransaction(), abortTransaction(), sendOffsetsToTransaction()) is unchanged.

5.2 Deprecation Path

API

Status

Replacement

KafkaProducer.initTransactions()

Not deprecated (convenience wrapper)

TransactionSession.initialize() for advanced use

KafkaProducer.beginTransaction()

Not deprecated (convenience wrapper)

TransactionSession.beginTransaction() for advanced use

KafkaProducer.commitTransaction()

Not deprecated (convenience wrapper)

TransactionSession.commitTransaction() for advanced use

KafkaProducer.abortTransaction()

Not deprecated (convenience wrapper)

TransactionSession.abortTransaction() for advanced use

KafkaProducer.sendOffsetsToTransaction()

Not deprecated (convenience wrapper)

TransactionSession.sendOffsetsToTransaction() for advanced use

No deprecations. The KafkaProducer convenience methods remain the recommended API for simple produce-and-commit patterns. TransactionSession is for advanced use cases: 2PC, cross-entity transactions, external coordinators.


6. Security

6.1 Authorization

TransactionSession requires the same ACLs as the current producer transaction API:

RPC

Resource Type

Operation

InitProducerId

TRANSACTIONAL_ID

WRITE

AddPartitionsToTxn

TRANSACTIONAL_ID

WRITE; TOPIC

EndTxn

TRANSACTIONAL_ID

WRITE

TxnHeartbeat (KIP 1309)

TRANSACTIONAL_ID

WRITE

AddShareAcksToTxn (KIP-1289)

TRANSACTIONAL_ID

WRITE; GROUP

6.2 Session Sharing

When a TransactionSession is shared between a producer and a share consumer (Use Case 4.2), both entities operate under the same transactionalId and require the same ACLs.

The session identity is not multiplied -- there is one session, one producerId, one epoch, regardless of how many clients use it


7. Test Plan

7.1 Unit Tests

TestDescription
Session LifecycleVerify transitions: UNINITIALIZEDINITIALIZINGREADYIN_TRANSACTIONCOMMITTINGREADY.
Session ResumeVerify resume() starts in IN_TRANSACTION and can execute commitTransaction().
Heartbeat LogicVerify heartbeat thread lifecycle based on transaction.session.timeout.ms.
Producer FencingEnsure KafkaProducer throws IllegalStateException on lifecycle calls when using an external session.
Backward CompatibilityVerify standard KafkaProducer transaction methods work via internal session delegation.
Epoch FencingVerify that a new session with the same transactional.id correctly fences the older session.
Identity AccessorsEnsure producerId(), producerEpoch(), and transactionalId() are accurate post-initialization.

7.2 Integration Tests

Producer + external session E2E

Resume and commit from different process

7.3 Compatibility Tests

TestDescription
Legacy ProducerRun existing test suite on KafkaProducer without external sessions to ensure zero regression.
Mixed ModeRun internal and external sessions concurrently on the same cluster to verify no interference.

9. Reference 

KIP-98 - Exactly Once Delivery and Transactional Messaging

KIP-939: Support Participation in 2PC

KIP-1289 Support Transactional Acknowledgments for Share Groups

KIP-1302: Support Share Groups (Queue Semantics) in Kafka Connect Sink Connectors

KIP-1309: Improve transaction liveness checking

KAFKA-20381 - Getting issue details... STATUS

10. Rejected Alternatives

--



  • No labels