DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: [KAFKA-19883](
KAFKA-19883
-
Getting issue details...
STATUS
)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
1 Motivation
In the current KIP-932 implementation, Share Group acknowledgments are immediate and irrevocable.
If a processing framework (Flink, Spark, etc.) crashes after acknowledging a record but before committing its own state, that record is lost.
Current State:
ACKNOWLEDGEDis a terminal state; there is no way to revert toAVAILABLEfor redelivery.The Risk: Permanent data loss during processing failures.
The Goal: Atomic "Read-Process-Write"
Enable Exactly-Once Semantics (EOS) by making acknowledgments part of a Kafka transaction. Either the output is produced AND the source record is acknowledged, or neither happens.
1.1 Background: Share Groups
Current State Machine:
AVAILABLE: Ready for delivery.
ACQUIRED: Locked by a consumer.
ACKNOWLEDGED / ARCHIVED: Terminal states; records are never redelivered.
AVAILABLE (Released): Returned to the pool for redelivery.
Current Acknowledgment Modes:
Implicit: Automatic ack on the subsequent
poll().Explicit: Manual ack via
acknowledge(record, type)followed bycommitSync()
1.2 Why Existing Consumer-Group Exactly-Once Doesn't Apply
Traditional Consumer Groups rely on replayability, which Share Groups lack:
Consumer Groups: Frameworks like Flink save offsets in their own state. On failure, they use
seek(offset)to replay data. In this model, Kafka offset commits are "cosmetic" (non-critical) because Flink is the source of truth.Share Groups: There is no
seek()functionality. The broker manages delivery; once a record is acknowledged, it is removed from the delivery pipeline.The Conflict: Because records cannot be replayed, the acknowledgment itself must be the transactional boundary. It must stay "pending" until the entire processing transaction is confirmed.
1.3 Existing Pattern: sendOffsetsToTransaction()
| Feature | Traditional Consumer Groups | Share Groups (Proposal) |
| Commit Method | sendOffsetsToTransaction() | sendShareAcksToTransaction() |
| Storage | __consumer_offsets | __share_group_state |
| Atomic Fate | Records + Offsets commit together | Records + Acknowledgment commit together |
| On Abort | Consumer re-reads from old offset | Broker reverts records to AVAILABLE |
2. Use Cases
2.1 Consume-Transform-Produce (CTP)
Ensures that output records and source acknowledgments are committed as a single atomic unit within a Kafka-to-Kafka pipeline.
The Flow:
beginTransaction()→send(output)→sendShareAcksToTransaction()→commitTransaction().Result: Output is visible and source records are finalized only if both operations succeed.
2.2 Source-Only Frameworks (No Kafka Producer)
For applications writing to external systems (Databases, S3) that require transactional acknowledgments coordinated with their own internal checkpoints.
The Flow: Aggregates acks and commits them via a background transaction once the external sink confirms the write.
2.3 End-to-End Exactly-Once (Flink/Spark/OLAP)
Integrates Share Groups into the two-phase commit (2PC) lifecycle of streaming engines.
Pre-commit: Sink records are flushed and share acks are added to the transaction.
Snapshot: Transactional metadata is saved to the framework state.
Commit: On checkpoint completion, the transaction is finalized.
Recovery: If the framework fails, the transaction is aborted; output is rolled back, and the broker automatically reverts share records to
AVAILABLEfor redelivery.
3 Public Interfaces
A new method is added to KafkaProducer to mirror traditional offset commits:
Method:sendShareAcksToTransaction(Map<TopicPartition, ShareAcknowledgements>, ShareGroupMetadata)Why a new method? Unlike offsets, share acks target.__share_group_state(not__consumer_offsets) and are managed by theShareCoordinator(not theGroupCoordinator)
KIP-1310: General Transaction Session#4.5Custom2PCCoordinators
// Phase 1: Prepare
kafkaSession.beginTransaction();
producer.send(records);
database.prepareTransaction(dbTxnId);
kafkaSession.prepareTransaction();
// Phase 2: Commit (Recovery-friendly)
TransactionSession resumed = TransactionSession.resume(txnId, pid, epoch, configs);
resumed.commitTransaction();
database.commitTransaction(dbTxnId);
3.5 New Metrics
| Metric Name | Type | Description |
|---|---|---|
share-transaction-active | Gauge | Number of active share-group transactions |
share-transaction-prepare-time-ms | Histogram | Time to prepare share ack transaction |
share-transaction-commit-time-ms | Histogram | Time to commit share ack transaction |
share-transaction-abort-total | Counter | Total aborted share ack transactions |
share-transaction-timeout-total | Counter | Total timed-out share ack transactions |
4 Proposed Changes
4.1 Reuse of Existing 2PC Protocol
This KIP reuses Kafka's existing two-phase commit protocol. No new coordinator type or consensus protocol is introduced.
Transaction lifecycle (identical to existing):
EMPTY → ONGOING → PREPARE_COMMIT → COMPLETE_COMMIT
→ PREPARE_ABORT → COMPLETE_ABORT
What changes is which partitions are added to the transaction:
| API Call | Partition Added to Transaction | Storage Topic |
|---|---|---|
producer.send(record) | Data topic partition | User topic |
sendOffsetsToTransaction() | __consumer_offsets partition for group | __consumer_offsets |
sendShareAcksToTransaction() (NEW) | __share_group_state partition for group+topic | __share_group_state |
The WriteTxnMarkers request dispatches commit/abort markers to all partitions in the transaction set. If only sendShareAcksToTransaction() was called, markers go only to __share_group_state. If both sendOffsetsToTransaction() and sendShareAcksToTransaction() were called in the same transaction, markers go to both. The transaction coordinator does not distinguish between these — it just tracks partition sets.
4.2 Wire Protocol Changes
New request: AddShareAcksToTxnRequest (mirrors AddOffsetsToTxnRequest)
{
"apiKey": TBD,
"type": "request",
"name": "AddShareAcksToTxnRequest",
"validVersions": "0",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+" },
{ "name": "ProducerId", "type": "int64", "versions": "0+" },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
{ "name": "GroupId", "type": "string", "versions": "0+" },
{ "name": "Topics", "type": "[]AddShareAcksToTxnTopic", "versions": "0+",
"fields": [
{ "name": "Name", "type": "string", "versions": "0+" },
{ "name": "Partitions", "type": "[]int32", "versions": "0+" }
]
}
]
}
Purpose: Tells the TransactionCoordinator to add the __share_group_state partition(s) for the given {groupId, topicPartition} pair(s) to the producer's ongoing transaction. The partition is determined by ShareCoordinator.partitionFor(groupId, topicPartition), where topicPartition refers to the original data topic partition being acknowledged, and the function maps it to the corresponding __share_group_state internal partition that stores the state for that share-group + data-partition combination.
New request: TxnShareAcknowledgeRequest (mirrors TxnOffsetCommitRequest)
{
"apiKey": TBD,
"type": "request",
"name": "TxnShareAcknowledgeRequest",
"validVersions": "0",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+" },
{ "name": "TransactionalId", "type": "string", "versions": "0+" },
{ "name": "ProducerId", "type": "int64", "versions": "0+" },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
{ "name": "Topics", "type": "[]TxnShareAcknowledgeTopic", "versions": "0+",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+" },
{ "name": "Partitions", "type": "[]TxnShareAcknowledgePartition", "versions": "0+",
"fields": [
{ "name": "Partition", "type": "int32", "versions": "0+" },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+" },
{ "name": "LastOffset", "type": "int64", "versions": "0+" },
{ "name": "AcknowledgeType", "type": "int8", "versions": "0+" }
]
}
]
}
]
}
]
}
Purpose: Sent to the ShareCoordinator to write acknowledgments as pending (uncommitted) within the transaction. The acks become visible only when the transaction commits.
4.3 ShareCoordinator Changes
The ShareCoordinator must handle transaction completion, mirroring GroupCoordinator.completeTransaction():
// In ShareCoordinatorShard (NEW method):
public CoordinatorResult<Void, CoordinatorRecord> completeTransaction(
long producerId,
short producerEpoch,
TransactionResult result // COMMIT or ABORT
) {
if (result == TransactionResult.COMMIT) {
// Materialize pending transactional acks into share-group state
return applyPendingAcknowledgements(producerId, producerEpoch);
} else {
// Discard pending transactional acks
return discardPendingAcknowledgements(producerId, producerEpoch);
}
}
The existing hook ShareCoordinatorShard.replayEndTransactionMarker() already exists for replaying transaction markers during log recovery. This KIP extends it to also handle live transaction completion.
4.4 State Storage
Transactional acks are written to __share_group_state using CoordinatorRuntime.scheduleTransactionalWriteOperation(). This follows the same pattern used by GroupCoordinator for transactional offset commits to __consumer_offsets.
Records in __share_group_state are written with the producer's producerId and producerEpoch, making them part of the transaction. They become readable by other consumers only after the transaction commits and the WriteTxnMarkers COMMIT marker is written.
5 Flow diagram
CLIENT --> BROKER communication (over the Kafka wire protocol):
Share Consumer -----> Share Group Coordinator
ShareFetch, ShareAcknowledge, ShareGroupHeartbeat
TransactionManager --> Transaction Coordinator
InitProducerId, AddPartitionsToTxn, AddShareAcksToTxn(NEW), EndTxn
TransactionManager --> Share Group Coordinator
TxnShareAcknowledgeRequest (NEW)
Producer ------------> Data Partition Leaders
ProduceRequest
BROKER --> BROKER communication (internal, not client-visible):
Transaction Coordinator --> Data Partition Leaders
WriteTxnMarkers (COMMIT/ABORT control record)
Transaction Coordinator --> Group Coordinator (via __consumer_offsets leader)
WriteTxnMarkers (materializes transactional offset commits)
Transaction Coordinator --> Share Group Coordinator (via __share_group_state leader) [NEW]
WriteTxnMarkers (materializes transactional share acks)
Abort case:
6. Corner Cases
Failure Scenarios
Crash Before/During Txn: If the application crashes before
commitTransaction(), the Transaction Coordinator (TC) auto-abortsThe "Critical Case" (Crash after
sendShareAcks): If a crash occurs after acks are sent but before the final commit, the TC sends Abort signal and ensures pending acks are discarded.Zombie Producers: If a new producer instance starts, the old one is fenced. Any in-flight output or acks are discarded by the TC, preventing "split-brain" duplicates.
Post-Prepare Crash: If the TC crashes after logging
PREPARE_COMMIT, the 2PC protocol ensures that the transaction is finalized upon recovery. Both outputs and acks will successfully materialize.
7. Compatibility, Deprecation, and Migration Plan
Zero Breaking Changes: Current
implicitandexplicitmodes remain the default and continue to function as-is.Opt-in Requirement: Users must explicitly enable the new mode via
share.acknowledgement.mode = transactional.Version Safety: * Clients will "fail fast" if the connected broker does not support the new
TxnShareAcknowledgeRequest.Fallback logic is included to issue warnings if a downgrade to non-transactional acks is necessary.
8. Test Plan
The verification strategy focuses on state machine integrity and fault tolerance under high-concurrency and failure scenarios.
Unit Tests: Validates state transitions (e.g.,
ACQUIREDtoACKNOWLEDGEDon commit vs.AVAILABLEon abort), idempotency of operations, and transaction timeout/auto-abort logic.Integration Tests: Focuses on end-to-end commit/abort flows, coordinator recovery, and multi-consumer behavior within a single group during network partitions.
System & Performance Tests: Benchmarks transactional vs. non-transactional modes and verifies exactly-once delivery within Flink/Spark checkpointing cycles.
Chaos Tests: Simulates broker and coordinator crashes specifically during critical phases like
PREPARE_COMMITto ensure protocol durability.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

1 Comment
Shekhar Rajak
Ref - producer side 2PC (Kafka as participant) KAFKA-15370 - Getting issue details... STATUS KIP: KIP-939: Support Participation in 2PC#2PCRefresher