DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under discussion
Discussion thread: thread-1 thread-2,
Voting thread:
JIRA: Related KAFKA-20381 - Getting issue details... STATUS
Summary: Extract transaction identity and lifecycle management from `KafkaProducer` into a first-class
`TransactionSession` client abstraction, enabling any Kafka client (producer, share consumer, external coordinator, or any other entity)
to participate in transactions without producer coupling or reflection hacks.
1. Motivation
1.1 The Problem: Multi-Participant Transactions Need a Shared Identity Abstraction
Kafka's transaction coordinator is a general-purpose distributed transaction manager. Its metadata (TransactionMetadata) stores:
```
transactionalId -- session identity
producerId -- session token
producerEpoch -- fencing token
state -- lifecycle state (ONGOING, PREPARE_COMMIT, ...)
topicPartitions -- participating partitions
txnTimeoutMs -- correctness bound
```
None of these fields are producer-specific. The naming is historical -- producerId and producerEpoch could equally be called sessionId and sessionEpoch. The wire protocol RPCs (InitProducerId, AddPartitionsToTxn, EndTxn, TxnHeartbeat) carry only these identity fields. They do not carry a "client type" discriminator.
What KIP-939 Solved
KIP-939 enabled Kafka to act as a formal participant in Two-Phase Commit (2PC) transactions by:
Adding Recovery APIs: Introduced
prepareTransaction()andcompleteTransaction(), allowing external systems to coordinate Kafka's commit/abort status.Resilient Resumption: Allowed new producers to resume a "prepared" transaction after a crash without automatically aborting it.
Removing Timeouts: Provided an
enable2Pcmode that sets transaction timeouts to infinity, preventing premature aborts during external coordination.
The Unsolved Problem
While KIP-939 added the right tools, it put them in the wrong place:
Wrong Abstraction: Transaction methods (like preparing and completing) are forced into the
KafkaProducerclass, even though they don't involve producing records.Heavyweight Bloat: To simply commit a transaction, you are forced to instantiate a full
KafkaProducerwith its entire infrastructure (buffer pools, sender threads, serializers).Inaccessible Logic: The core "brain" of transactions—the state machine and coordinator logic—is buried in internal producer packages, making it impossible for other clients (like consumers or custom coordinators) to use transactions independently.
1.2 Entities Already Need Transaction Participation
The Kafka ecosystem already has six distinct entity types that interact with the transaction coordinator, each working around the producer-centric API:
| Entity | Role | Current Workaround | Core Pain Point |
| KafkaProducer | Owner | Native API | Transaction logic is monolithic and coupled to the produce path. |
| Kafka Streams | EOS Processor | Wraps Producer | Lifecycle is forced to match record batching. |
| Connect Source | EOS Ingest | Wraps Producer | Boundary management is coupled to producer lifecycle. |
| Connect Sink | KIP-1302 | N/A | No way to share transaction identity between consumer and producer. |
| Flink / 2PC | External Committer | Reflection | Fragile; manually forces state into TransactionManager internals. |
| Share Consumer | KIP-1289 | N/A | Must "borrow" Producer IDs without a proper client abstraction. |
With this KIP every transactional use case in Kafka becomes the same three-step pattern — construct a TransactionSession, bind clients to it, drive begin/prepare/commit/abort/resume on the session —
with the only differences being which clients you bind (KafkaProducer, KafkaConsumer, KafkaShareConsumer) and which lifecycle methods you use (commit for simple cases, prepare+complete for 2PC, resume for crash recovery.
2. Public Interfaces
2.1 New Class: TransactionSession
Package: org.apache.kafka.clients.transaction
TransactionSession is a lightweight, thread-safe client focused strictly on transaction coordinator interaction.
It decouples lifecycle management from record production.
```
public class TransactionSession implements Closeable {
public TransactionSession(Map<String, Object> configs);
public static TransactionSession resume(
String transactionalId,
long producerId,
short producerEpoch,
Map<String, Object> configs
);
// --- Lifecycle ---
public void initialize();
public void beginTransaction();
public PreparedTxnState prepareTransaction();
public void completeTransaction(PreparedTxnState preparedTxnState);
public void commitTransaction();
public void abortTransaction();
// --- Identity ---
public String transactionalId();
// --- Participation ---
public void addPartitionsToTransaction(Collection<TopicPartition> partitions);
public void sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata
);
public void addShareAcksToTransaction(
String groupId,
Collection<ShareAcknowledgment> acknowledgments
);
// --- Liveness ---
public void heartbeat();
@Override
public void close();
}
```
2.2 Configuration
TransactionSession accepts a subset of existing producer configs plus one new config:
Config | Source | Description |
|---|---|---|
Existing producer config | The transactional ID for this session. Required. | |
Existing producer config | Correctness timeout for the transaction. | |
KAFKA-20381 - Getting issue details... STATUS (new) | Session heartbeat timeout. -1 to disable. | |
| Existing | Broker addresses for coordinator discovery. |
| Existing | Authentication/encryption settings. |
Existing | Client identifier for metrics and logging. |
No new broker-side configs. No new wire protocol RPCs. TransactionSession sends the same RPCs that KafkaProducer sends today:
FindCoordinator, InitProducerId, AddPartitionsToTxn, EndTxn, AddOffsetsToTxn, TxnOffsetCommit, TxnHeartbeat (KIP 1309).
2.3 Integration with KafkaProducer
KafkaProducer gains a new constructor and method to accept an external TransactionSession:
```
public class KafkaProducer<K, V> {
/**
* Bind producer to an external session.
* Lifecycle methods (e.g., beginTransaction) throw IllegalStateException.
*/
public KafkaProducer(Map<String, Object> configs, TransactionSession session);
/**
* Access the session (internal or external). Returns null if non-transactional.
*/
public TransactionSession transactionSession();
}
```
Backward Compatibility: Constructing a producer with transactional.id in the config works as it does today.
It will internally create a TransactionSession and use its existing methods (initTransactions, beginTransaction, etc.) as convenience wrappers.
No code changes are required for existing users.
2.4 Integration with KafkaShareConsumer and Kafka Consumer
void bindTransactionSession(TransactionSession session);
void unbindTransactionSession();
3. Proposed Changes
3.1 Architecture: Before and After
Before (current):
After (this KIP):
4. Use Cases
4.1 Apache Flink: Lightweight Transaction Completion
Flink's exactly-once KafkaSink separates the write path (writer subtask) from the commit path (committer).
```java
// Before (Flink -- reflection, fragile)
FlinkKafkaInternalProducer producer = new FlinkKafkaInternalProducer(configs);
producer.resumeTransaction(producerId, epoch); // reflection on TransactionManager internals
producer.commitTransaction();
// After (this KIP -- public API, stable)
TransactionSession session = TransactionSession.resume(
transactionalId, producerId, epoch, configs
);
session.commitTransaction();
session.close();
```
The checkpoint execution pseudo code:
class FlinkKafkaSink {
// Phase: process()
TransactionSession session = new TransactionSession(cfg);
session.initialize();
KafkaProducer<K,V> producer = new KafkaProducer<>(cfg, session);
void onCheckpointBarrier(long checkpointId) {
session.beginTransaction();
for (record : bufferedSinceLastBarrier) producer.send(record);
// Phase 1: snapshotState() in Flink
PreparedTxnState prepared = session.prepareTransaction();
flinkCheckpoint.persist(checkpointId, session.transactionalId(),
session.producerId(), session.producerEpoch(),
prepared);
}
void onCheckpointComplete(long checkpointId) {
// Phase 2: notifyCheckpointComplete() in Flink
session.commitTransaction();
}
// Crash recovery — replaces FlinkKafkaInternalProducer.resumeTransaction(...)
void recover(CheckpointMeta meta) {
TransactionSession recovered = TransactionSession.resume(
meta.txnId, meta.pid, meta.epoch, cfg);
recovered.completeTransaction(meta.prepared); // idempotent commit
}
}
4.2 Consumer / Share Consumer
CTP - Consumer awared transaction rough idea (future KIP) [may be we can plan to have it as part of this KIP as subtask]
TransactionSession session = new TransactionSession(cfg);
session.initialize();
KafkaProducer<K,V> producer = new KafkaProducer<>(cfg, session);
KafkaConsumer<K,V> consumer = new KafkaConsumer<>(cfg);
consumer.subscribe(List.of("input"));
consumer.bindTransactionSession(session); // NEW
while (running) {
var records = consumer.poll(Duration.ofSeconds(1));
if (records.isEmpty()) continue;
session.beginTransaction();
try {
for (var r : records) producer.send(transform(r));
session.commitTransaction(); // offsets auto-pulled from bound consumer
} catch (KafkaException e) {
session.abortTransaction(); // consumer auto-rewinds
}
}
• consumer.bindTransactionSession(session) makes the consumer a participant.
• During IN_TRANSACTION: commitSync/seek/subscribe throw; poll permitted.
• During COMMITTING/ABORTING: poll blocks.
• On abortTransaction(): consumer's in-memory cursor is rolled back to committed() for affected partitions — no application code required.
• On commitTransaction(): session pulls latest consumed offsets from each bound consumer and includes them in TxnOffsetCommit automatically.
Share consumer + producer in same transaction
TransactionSession session = new TransactionSession(cfg);
session.initialize();
KafkaProducer<K,V> producer = new KafkaProducer<>(cfg, session);
KafkaShareConsumer<K,V> share = new KafkaShareConsumer<>(shareCfg);
share.subscribe(List.of("input-queue"));
share.bindTransactionSession(session);
while (running) {
var records = share.poll(Duration.ofSeconds(1));
if (records.isEmpty()) continue;
session.beginTransaction();
try {
for (var r : records) {
producer.send(transform(r));
share.acknowledge(r, AcknowledgeType.ACCEPT); // buffered into txn
}
session.commitTransaction(); // atomic
} catch (KafkaException e) {
session.abortTransaction(); // acks discarded → redeliver
}
}
• share.acknowledge(r, ACCEPT) while bound does not issue ShareAcknowledge; it buffers an entry.
• share.acknowledge(r, RENEW) always issues immediately (liveness, not transactional).
• session.commitTransaction() emits TxnShareAcknowledge (KIP-1289 wire RPC) stamped with (pid, epoch) + EndTxn(commit) —
coordinator writes commit markers to producer's data partitions AND share-coordinator's state atomically.
• On abort: buffered share acks are discarded; broker's share state is unchanged → locks expire → records redeliver.
4.4 Kafka Connect: Exactly-Once Source Tasks
Similar API
4.5 Custom 2PC Coordinators
Similar API
4.6 Kafka Connect Sink with Share Groups: Exactly-Once Kafka-to-Kafka (KIP-1302)
Similar API
Architectural comparison:
5. Compatibility, Deprecation, and Migration Plan
5.1 Full Backward Compatibility
KafkaProducer constructed with transactional.id in the config continues to work exactly as today.
Internally, it creates a TransactionSession and delegates to it, but the public API (initTransactions(), beginTransaction(), commitTransaction(), abortTransaction(), sendOffsetsToTransaction()) is unchanged.
5.2 Deprecation Path
API | Status | Replacement |
|---|---|---|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
No deprecations. The KafkaProducer convenience methods remain the recommended API for simple produce-and-commit patterns.
TransactionSession is for advanced use cases: 2PC, cross-entity transactions, external coordinators.
We can plan for deprecation in future once this feature is stable.
6. Security
6.1 Authorization
TransactionSession requires the same ACLs as the current producer transaction API:
RPC | Resource Type | Operation |
|---|---|---|
|
|
|
|
|
|
|
|
|
7. Test Plan
7.1 Unit Tests
| Test | Description |
| Session Lifecycle | Verify transitions: UNINITIALIZED → INITIALIZING → READY → IN_TRANSACTION → COMMITTING → READY. |
| Session Resume | Verify resume() starts in IN_TRANSACTION and can execute commitTransaction(). |
| Heartbeat Logic | Verify heartbeat thread lifecycle based on transaction.session.timeout.ms. |
| Backward Compatibility | Verify standard KafkaProducer transaction methods work via internal session delegation. |
| Epoch Fencing | Verify that a new session with the same transactional.id correctly fences the older session. |
7.2 Integration Tests
Producer + external session E2E
Resume and commit from different process
7.3 Compatibility Tests
| Test | Description |
| Legacy Producer | Run existing test suite on KafkaProducer without external sessions to ensure zero regression. |
| Mixed Mode | Run internal and external sessions concurrently on the same cluster to verify no interference. |
9. Reference
KIP-98 - Exactly Once Delivery and Transactional Messaging
KIP-939: Support Participation in 2PC
KIP-1289 Support Transactional Acknowledgments for Share Groups
KIP-1302: Support Share Groups (Queue Semantics) in Kafka Connect Sink Connectors
KIP-1309: Improve transaction liveness checking
KIP-1345: Cross-Cluster Atomic Transactions via Global Transaction Coordinator
KAFKA-20381 - Getting issue details... STATUS
10. Rejected Alternatives
--


