DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: [KAFKA-19883](
KAFKA-19883
-
Getting issue details...
STATUS
)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
1 Motivation
Today, the Producer's sendOffsetsToTransaction(offsets, consumerGroupMetadata) allows EOS in read-process-write topologies that consume from
regular consumer groups. With KIP-932 introducing share groups, the equivalent capability is missing for share-group consumers.
This blocks share-group adoption in:
1. MirrorMaker and other Kafka-to-Kafka mirroring/forwarding pipelines.
2. Kafka Streams stateless topologies that want to use share groups for parallelism beyond partition count.
3. Atomic DLQ write in different connectors
The Goal: Atomic "Read-Process-Write"
Enable Exactly-Once Semantics (EOS) by making acknowledgments part of a Kafka transaction.
Either the output is produced AND the source record is acknowledged, or neither happens. This KIP is scopred for Kafka producer write AND consumes from a share group.
1.1 Background: Share Groups
Current State Machine:
AVAILABLE: Ready for delivery.
ACQUIRED: Locked by a consumer.
- RENEW
ACKNOWLEDGED / ARCHIVED: Terminal states; records are never redelivered.
AVAILABLE (Released): Returned to the pool for redelivery.
Current Acknowledgment Modes:
Implicit: Automatic ack on the subsequent
poll().Explicit: Manual ack via
acknowledge(record, type)followed bycommitSync()
2 Public Interfaces
Public API additions:
- Producer.sendShareAcknowledgementsToTransaction(Map<TopicIdPartition, List<AcknowledgementBatch>>, ShareGroupMetadata).
- ShareGroupMetadata class (groupId, memberId, memberEpoch, optional groupInstanceId).
- ShareConsumer.shareGroupMetadata() to obtain the above.
Wire protocol additions:
- New TxnShareAcknowledge RPC (apiKey 80 or next free).
- Schema mirrors TxnOffsetCommitRequest shape but carries AcknowledgementBatch instead of OffsetCommitInfo.
Broker / coordinator additions:
- TransactionCoordinator routes TxnShareAcknowledge to the share coordinator partitions (similar to how it routes TxnOffsetCommit to consumer-coordinator partitions today).
- ShareCoordinator gets a transactional state model: pending ACKs are staged with (producerId, producerEpoch); on WriteTxnMarkers from the txn coordinator, they're either committed (state advances) or aborted (state rolled back).
- New ShareTransactionalUpdateRecord in __share_group_state (or the ShareUpdateRecord is extended with optional producer ID/epoch fields to mark transactional intent).
SharePartition state machine additions:
- New transient state TX_PENDING on a batch (orthogonal to RecordState); records in this state are NOT visible to peers, NOT redeliverable, lock NOT eligible for expiry.
- On WriteTxnMarkers commit: TX_PENDING(ACCEPT) → ACKNOWLEDGED; same for RELEASE and REJECT.
- On WriteTxnMarkers abort: TX_PENDING(*) → back to ACQUIRED (lock continues; consumer can retry the work).
Pseudo code
// Kafka-as-destination path (true EOS via TxnShareAcknowledge):
batch boundary (or per-record, depending on tx granularity):
producer.beginTransaction()
for each (record, output) in batch:
producer.send(destinationTopic, output)
producer.sendShareAcknowledgementsToTransaction(
ackMap, // record→ACCEPT for processed records
shareConsumer.shareGroupMetadata()
)
producer.commitTransaction()
// No separate consumer.acknowledge() — the ACK is in the transaction.
-------------------
// Pure-external-sink path (at-least-once + idempotent destination):[this KIP is not changin anything of this flow]
worker thread per record:
output = process(record)
externalSink.write(output, idempotencyKey) // user task does this
consumer.acknowledgeAsync(record, ACCEPT)
.thenAccept(_ -> markComplete(record))
3. Compatibility, Deprecation, and Migration Plan
Zero Breaking Changes: Current
implicitandexplicitmodes remain the default and continue to function as-is.Opt-in Requirement: Users must explicitly enable the new mode.
4. Test Plan
The verification strategy focuses on state machine integrity and fault tolerance under high-concurrency and failure scenarios.
Unit Tests: Validates state transitions (e.g.,
ACQUIREDtoACKNOWLEDGEDon commit vs.AVAILABLEon abort), idempotency of operations, and transaction timeout/auto-abort logic.Integration Tests: Focuses on end-to-end commit/abort flows, coordinator recovery, and multi-consumer behavior within a single group during network partitions.
System & Performance Tests: Benchmarks transactional vs. non-transactional modes and verifies exactly-once delivery.
Chaos Tests: Simulates broker and coordinator crashes specifically during critical phases like
PREPARE_COMMITto ensure protocol durability.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
2 Comments
Shekhar Rajak
Ref - producer side 2PC (Kafka as participant) KAFKA-15370 - Getting issue details... STATUS KIP: KIP-939: Support Participation in 2PC#2PCRefresher
Shekhar Rajak
This will help in https://cwiki.apache.org/confluence/x/JpU8G