DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under discussion
Discussion thread: thread-1 thread-2,
Voting thread:
JIRA: Related KAFKA-20381 - Getting issue details... STATUS
Summary: Extract transaction identity and lifecycle management from `KafkaProducer` into a first-class
`TransactionSession` client abstraction, enabling any Kafka client (producer, share consumer, external coordinator, or any other entity)
to participate in transactions without producer coupling or reflection hacks.
1. Motivation
1.1 The Problem: Multi-Participant Transactions Need a Shared Identity Abstraction
Kafka's transaction coordinator is a general-purpose distributed transaction manager. Its metadata (TransactionMetadata) stores:
```
transactionalId -- session identity
producerId -- session token
producerEpoch -- fencing token
state -- lifecycle state (ONGOING, PREPARE_COMMIT, ...)
topicPartitions -- participating partitions
txnTimeoutMs -- correctness bound
```
None of these fields are producer-specific. The naming is historical -- producerId and producerEpoch could equally be called sessionId and sessionEpoch. The wire protocol RPCs (InitProducerId, AddPartitionsToTxn, EndTxn, TxnHeartbeat) carry only these identity fields. They do not carry a "client type" discriminator.
What KIP-939 Solved
KIP-939 enabled Kafka to act as a formal participant in Two-Phase Commit (2PC) transactions by:
Adding Recovery APIs: Introduced
prepareTransaction()andcompleteTransaction(), allowing external systems to coordinate Kafka's commit/abort status.Resilient Resumption: Allowed new producers to resume a "prepared" transaction after a crash without automatically aborting it.
Removing Timeouts: Provided an
enable2Pcmode that sets transaction timeouts to infinity, preventing premature aborts during external coordination.
The Unsolved Problem
While KIP-939 added the right tools, it put them in the wrong place:
Wrong Abstraction: Transaction methods (like preparing and completing) are forced into the
KafkaProducerclass, even though they don't involve producing records.Heavyweight Bloat: To simply commit a transaction, you are forced to instantiate a full
KafkaProducerwith its entire infrastructure (buffer pools, sender threads, serializers).Inaccessible Logic: The core "brain" of transactions—the state machine and coordinator logic—is buried in internal producer packages, making it impossible for other clients (like consumers or custom coordinators) to use transactions independently.
1.2 Entities Already Need Transaction Participation
The Kafka ecosystem already has six distinct entity types that interact with the transaction coordinator, each working around the producer-centric API:
| Entity | Role | Current Workaround | Core Pain Point |
| KafkaProducer | Owner | Native API | Transaction logic is monolithic and coupled to the produce path. |
| Kafka Streams | EOS Processor | Wraps Producer | Lifecycle is forced to match record batching. |
| Connect Source | EOS Ingest | Wraps Producer | Boundary management is coupled to producer lifecycle. |
| Connect Sink | KIP-1302 | N/A | No way to share transaction identity between consumer and producer. |
| Flink / 2PC | External Committer | Reflection | Fragile; manually forces state into TransactionManager internals. |
| Share Consumer | KIP-1289 | N/A | Must "borrow" Producer IDs without a proper client abstraction. |
2. Public Interfaces
2.1 New Class: TransactionSession
Package: org.apache.kafka.clients.transaction
TransactionSession is a lightweight, thread-safe client focused strictly on transaction coordinator interaction.
It decouples lifecycle management from record production.
```
public class TransactionSession implements Closeable {
public TransactionSession(Map<String, Object> configs);
public static TransactionSession resume(
String transactionalId,
long producerId,
short producerEpoch,
Map<String, Object> configs
);
// --- Lifecycle ---
public void initialize();
public void beginTransaction();
public PreparedTxnState prepareTransaction();
public void completeTransaction(PreparedTxnState preparedTxnState);
public void commitTransaction();
public void abortTransaction();
// --- Identity ---
public String transactionalId();
// --- Participation ---
public void addPartitionsToTransaction(Collection<TopicPartition> partitions);
public void sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata
);
public void addShareAcksToTransaction(
String groupId,
Collection<ShareAcknowledgment> acknowledgments
);
// --- Liveness ---
public void heartbeat();
@Override
public void close();
}
```
2.2 Configuration
TransactionSession accepts a subset of existing producer configs plus one new config:
Config | Source | Description |
|---|---|---|
Existing producer config | The transactional ID for this session. Required. | |
Existing producer config | Correctness timeout for the transaction. | |
KAFKA-20381 - Getting issue details... STATUS (new) | Session heartbeat timeout. -1 to disable. | |
| Existing | Broker addresses for coordinator discovery. |
| Existing | Authentication/encryption settings. |
Existing | Client identifier for metrics and logging. |
No new broker-side configs. No new wire protocol RPCs. TransactionSession sends the same RPCs that KafkaProducer sends today:
FindCoordinator, InitProducerId, AddPartitionsToTxn, EndTxn, AddOffsetsToTxn, TxnOffsetCommit, TxnHeartbeat (KIP 1309).
2.3 Integration with KafkaProducer
KafkaProducer gains a new constructor and method to accept an external TransactionSession:
```
public class KafkaProducer<K, V> {
/**
* Bind producer to an external session.
* Lifecycle methods (e.g., beginTransaction) throw IllegalStateException.
*/
public KafkaProducer(Map<String, Object> configs, TransactionSession session);
/**
* Access the session (internal or external). Returns null if non-transactional.
*/
public TransactionSession transactionSession();
}
```
Backward Compatibility: Constructing a producer with transactional.id in the config works as it does today.
It will internally create a TransactionSession and use its existing methods (initTransactions, beginTransaction, etc.) as convenience wrappers.
No code changes are required for existing users.
2.4 Integration with KafkaShareConsumer
For KIP-1289 transactional acknowledgments, the share consumer accepts a TransactionSession:
```
public class KafkaShareConsumer<K, V> {
/**
* Acknowledge records within the provided session.
*/
public void acknowledgeTransactionally(
TransactionSession session,
Map<TopicPartition, Set<Long>> acknowledgments
);
}
```3. Proposed Changes
3.1 Architecture: Before and After
Before (current):
After (this KIP):
3.2 Extracting TransactionSession from TransactionManager
| Component | Responsibility |
| TransactionSession (New) | Identity (ID/Epoch), Lifecycle State Machine, Coordinator RPCs, and Heartbeat thread. |
| TransactionManager (Slimmed) | Sequence numbers, in-flight partition management, and Sender thread interaction. |
State mapping:
| Current TransactionManager.State | New TransactionSession.State |
UNINITIALIZED / INITIALIZING | UNINITIALIZED / INITIALIZING |
READY / IN_TRANSACTION | READY / IN_TRANSACTION |
PREPARED_TRANSACTION | PREPARED |
COMMITTING_TRANSACTION | COMMITTING |
ABORTING_TRANSACTION | ABORTING |
ABORTABLE_ERROR / FATAL_ERROR | ABORTABLE_ERROR / FATAL_ERROR |
3.3 No Wire Protocol Changes
This KIP reuses existing RPCs and schemas. TransactionSession becomes the new sender for all transaction-related requests:
| RPC | Current Sender | New Sender |
FindCoordinator / InitProducerId | TransactionManager | TransactionSession |
AddPartitionsToTxn / EndTxn | TransactionManager | TransactionSession |
AddOffsetsToTxn / TxnOffsetCommit | TransactionManager | TransactionSession |
TxnHeartbeat (KIP-1309) | N/A | TransactionSession |
AddShareAcksToTxn (KIP-1289) | N/A | TransactionSession |
3.4 Internal Network Client
TransactionSession uses a lightweight NetworkClient for coordinator communication, similar to the consumer's HeartbeatThread.
Dedicated Connection - Maintains its own connection to the coordinator broker.
4. Use Cases
4.1 Apache Flink: Lightweight Transaction Completion
Flink's exactly-once KafkaSink separates the write path (writer subtask) from the commit path (committer).
```java
// Before (Flink -- reflection, fragile)
FlinkKafkaInternalProducer producer = new FlinkKafkaInternalProducer(configs);
producer.resumeTransaction(producerId, epoch); // reflection on TransactionManager internals
producer.commitTransaction();
// After (this KIP -- public API, stable)
TransactionSession session = TransactionSession.resume(
transactionalId, producerId, epoch, configs
);
session.commitTransaction();
session.close();
```
4.2 Share Group Consumer: Transactional Acknowledgments (KIP-1289)
```
// 1. Open shared session
TransactionSession session = new TransactionSession(configs);
session.initialize();
session.beginTransaction();
// 2. Producer writes records using session identity
KafkaProducer<K, V> producer = new KafkaProducer<>(producerConfigs, session);
producer.send(outputRecord);
// 3. Share consumer acknowledges within the SAME transaction
shareConsumer.acknowledgeTransactionally(session, acknowledgments);
// 4. Atomic commit for both entities
session.commitTransaction();
```
4.4 Kafka Connect: Exactly-Once Source Tasks
```java
class ExactlyOnceWorkerSourceTask {
private TransactionSession session;
private KafkaProducer<byte[], byte[]> producer;
void maybeBeginTransaction() { session.beginTransaction(); }
void commitTransaction() { session.commitTransaction(); }
}
```
4.5 Custom 2PC Coordinators
```java
// Phase 1: Prepare
kafkaSession.beginTransaction();
producer.send(records);
database.prepareTransaction(dbTxnId);
kafkaSession.prepareTransaction();
// Phase 2: Commit (Recovery-friendly)
TransactionSession resumed = TransactionSession.resume(txnId, pid, epoch, configs);
resumed.commitTransaction();
database.commitTransaction(dbTxnId);
```
4.6 Kafka Connect Sink with Share Groups: Exactly-Once Kafka-to-Kafka (KIP-1302)
// KIP-1302 exactly-once pattern with TransactionSession
void iterationExactlyOnce() {
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(pollTimeout);
session.beginTransaction();
try {
// 1. Producer writes output records
for (SinkRecord record : convertMessages(records)) {
producer.send(new ProducerRecord<>(outputTopic, record.key(), record.value()));
}
// 2. Share consumer acknowledges input within the SAME session
session.addShareAcksToTransaction(
shareConsumer.groupMetadata().groupId(),
ShareAcknowledgements.fromRecords(records, AcknowledgeType.ACCEPT)
);
session.commitTransaction();
} catch (Exception e) {
session.abortTransaction();
}
}
Architectural comparison:
5. Compatibility, Deprecation, and Migration Plan
5.1 Full Backward Compatibility
KafkaProducer constructed with transactional.id in the config continues to work exactly as today.
Internally, it creates a TransactionSession and delegates to it, but the public API (initTransactions(), beginTransaction(), commitTransaction(), abortTransaction(), sendOffsetsToTransaction()) is unchanged.
5.2 Deprecation Path
API | Status | Replacement |
|---|---|---|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
| Not deprecated (convenience wrapper) |
|
No deprecations. The KafkaProducer convenience methods remain the recommended API for simple produce-and-commit patterns. TransactionSession is for advanced use cases: 2PC, cross-entity transactions, external coordinators.
6. Security
6.1 Authorization
TransactionSession requires the same ACLs as the current producer transaction API:
RPC | Resource Type | Operation |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
6.2 Session Sharing
When a TransactionSession is shared between a producer and a share consumer (Use Case 4.2), both entities operate under the same transactionalId and require the same ACLs.
The session identity is not multiplied -- there is one session, one producerId, one epoch, regardless of how many clients use it
7. Test Plan
7.1 Unit Tests
| Test | Description |
| Session Lifecycle | Verify transitions: UNINITIALIZED → INITIALIZING → READY → IN_TRANSACTION → COMMITTING → READY. |
| Session Resume | Verify resume() starts in IN_TRANSACTION and can execute commitTransaction(). |
| Heartbeat Logic | Verify heartbeat thread lifecycle based on transaction.session.timeout.ms. |
| Producer Fencing | Ensure KafkaProducer throws IllegalStateException on lifecycle calls when using an external session. |
| Backward Compatibility | Verify standard KafkaProducer transaction methods work via internal session delegation. |
| Epoch Fencing | Verify that a new session with the same transactional.id correctly fences the older session. |
| Identity Accessors | Ensure producerId(), producerEpoch(), and transactionalId() are accurate post-initialization. |
7.2 Integration Tests
Producer + external session E2E
Resume and commit from different process
7.3 Compatibility Tests
| Test | Description |
| Legacy Producer | Run existing test suite on KafkaProducer without external sessions to ensure zero regression. |
| Mixed Mode | Run internal and external sessions concurrently on the same cluster to verify no interference. |
9. Reference
KIP-98 - Exactly Once Delivery and Transactional Messaging
KIP-939: Support Participation in 2PC
KIP-1289 Support Transactional Acknowledgments for Share Groups
KIP-1302: Support Share Groups (Queue Semantics) in Kafka Connect Sink Connectors
KIP-1309: Improve transaction liveness checking
KAFKA-20381 - Getting issue details... STATUS
10. Rejected Alternatives
--


