This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: [KAFKA-19883]( KAFKA-19883 - Getting issue details... STATUS )

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

1 Motivation

In the current KIP-932 implementation, Share Group acknowledgments are immediate and irrevocable.

If a processing framework (Flink, Spark, etc.) crashes after acknowledging a record but before committing its own state, that record is lost.

  • Current State: ACKNOWLEDGED is a terminal state; there is no way to revert to AVAILABLE for redelivery.

  • The Risk: Permanent data loss during processing failures.

The Goal: Atomic "Read-Process-Write"

Enable Exactly-Once Semantics (EOS) by making acknowledgments part of a Kafka transaction. Either the output is produced AND the source record is acknowledged, or neither happens.

1.1 Background: Share Groups

Current State Machine:


  • AVAILABLE: Ready for delivery.

  • ACQUIRED: Locked by a consumer.

  • ACKNOWLEDGED / ARCHIVED: Terminal states; records are never redelivered.

  • AVAILABLE (Released): Returned to the pool for redelivery.


Current Acknowledgment Modes:

  1. Implicit: Automatic ack on the subsequent poll().

  2. Explicit: Manual ack via acknowledge(record, type) followed by commitSync()

1.2 Why Existing Consumer-Group Exactly-Once Doesn't Apply

Traditional Consumer Groups rely on replayability, which Share Groups lack:

  • Consumer Groups: Frameworks like Flink save offsets in their own state. On failure, they use seek(offset) to replay data. In this model, Kafka offset commits are "cosmetic" (non-critical) because Flink is the source of truth.

  • Share Groups: There is no seek() functionality. The broker manages delivery; once a record is acknowledged, it is removed from the delivery pipeline.

  • The Conflict: Because records cannot be replayed, the acknowledgment itself must be the transactional boundary. It must stay "pending" until the entire processing transaction is confirmed.

1.3 Existing Pattern: sendOffsetsToTransaction()

FeatureTraditional Consumer GroupsShare Groups (Proposal)
Commit MethodsendOffsetsToTransaction()sendShareAcksToTransaction()
Storage__consumer_offsets__share_group_state
Atomic FateRecords + Offsets commit togetherRecords + Acknowledgment commit together
On AbortConsumer re-reads from old offsetBroker reverts records to AVAILABLE

2. Use Cases

2.1 Consume-Transform-Produce (CTP)

Ensures that output records and source acknowledgments are committed as a single atomic unit within a Kafka-to-Kafka pipeline.

  • The Flow: beginTransaction()send(output)sendShareAcksToTransaction()commitTransaction().

  • Result: Output is visible and source records are finalized only if both operations succeed.

2.2 Source-Only Frameworks (No Kafka Producer)

For applications writing to external systems (Databases, S3) that require transactional acknowledgments coordinated with their own internal checkpoints.


2.3 End-to-End Exactly-Once (Flink/Spark/OLAP)

Integrates Share Groups into the two-phase commit (2PC) lifecycle of streaming engines.

  • Pre-commit: Sink records are flushed and share acks are added to the transaction.

  • Snapshot: Transactional metadata is saved to the framework state.

  • Commit: On checkpoint completion, the transaction is finalized.

  • Recovery: If the framework fails, the transaction is aborted; output is rolled back, and the broker automatically reverts share records to AVAILABLE for redelivery.

3 Public Interfaces

A new method is added to KafkaProducer to mirror traditional offset commits:

  • Method: sendShareAcksToTransaction(Map<TopicPartition, ShareAcknowledgements>, ShareGroupMetadata)

  • Why a new method? Unlike offsets, share acks target __share_group_state (not __consumer_offsets) and are managed by the ShareCoordinator (not the GroupCoordinator).

KIP-1310: General Transaction Session#4.5Custom2PCCoordinators

// Phase 1: Prepare
kafkaSession.beginTransaction();
producer.send(records);
database.prepareTransaction(dbTxnId); 
kafkaSession.prepareTransaction();    

// Phase 2: Commit (Recovery-friendly)
TransactionSession resumed = TransactionSession.resume(txnId, pid, epoch, configs);
resumed.commitTransaction();          
database.commitTransaction(dbTxnId);

3.5 New Metrics

Metric NameTypeDescription
share-transaction-activeGaugeNumber of active share-group transactions
share-transaction-prepare-time-msHistogramTime to prepare share ack transaction
share-transaction-commit-time-msHistogramTime to commit share ack transaction
share-transaction-abort-totalCounterTotal aborted share ack transactions
share-transaction-timeout-totalCounterTotal timed-out share ack transactions

4 Proposed Changes

4.1 Reuse of Existing 2PC Protocol

This KIP reuses Kafka's existing two-phase commit protocol. No new coordinator type or consensus protocol is introduced.

Transaction lifecycle (identical to existing):

EMPTY → ONGOING → PREPARE_COMMIT → COMPLETE_COMMIT
                → PREPARE_ABORT  → COMPLETE_ABORT

What changes is which partitions are added to the transaction:

API CallPartition Added to TransactionStorage Topic
producer.send(record)Data topic partitionUser topic
sendOffsetsToTransaction()__consumer_offsets partition for group__consumer_offsets
sendShareAcksToTransaction() (NEW)__share_group_state partition for group+topic__share_group_state

The WriteTxnMarkers request dispatches commit/abort markers to all partitions in the transaction set. If only sendShareAcksToTransaction() was called, markers go only to __share_group_state. If both sendOffsetsToTransaction() and sendShareAcksToTransaction() were called in the same transaction, markers go to both. The transaction coordinator does not distinguish between these — it just tracks partition sets.

4.2 Wire Protocol Changes

New request: AddShareAcksToTxnRequest (mirrors AddOffsetsToTxnRequest)

{
  "apiKey": TBD,
  "type": "request",
  "name": "AddShareAcksToTxnRequest",
  "validVersions": "0",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+" },
    { "name": "ProducerId", "type": "int64", "versions": "0+" },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
    { "name": "GroupId", "type": "string", "versions": "0+" },
    { "name": "Topics", "type": "[]AddShareAcksToTxnTopic", "versions": "0+",
      "fields": [
        { "name": "Name", "type": "string", "versions": "0+" },
        { "name": "Partitions", "type": "[]int32", "versions": "0+" }
      ]
    }
  ]
}

Purpose: Tells the TransactionCoordinator to add the __share_group_state partition(s) for the given {groupId, topicPartition} pair(s) to the producer's ongoing transaction. The partition is determined by ShareCoordinator.partitionFor(groupId, topicPartition), where topicPartition refers to the original data topic partition being acknowledged, and the function maps it to the corresponding __share_group_state internal partition that stores the state for that share-group + data-partition combination.

New request: TxnShareAcknowledgeRequest (mirrors TxnOffsetCommitRequest)

{
  "apiKey": TBD,
  "type": "request",
  "name": "TxnShareAcknowledgeRequest",
  "validVersions": "0",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+" },
    { "name": "TransactionalId", "type": "string", "versions": "0+" },
    { "name": "ProducerId", "type": "int64", "versions": "0+" },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
    { "name": "Topics", "type": "[]TxnShareAcknowledgeTopic", "versions": "0+",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+" },
        { "name": "Partitions", "type": "[]TxnShareAcknowledgePartition", "versions": "0+",
          "fields": [
            { "name": "Partition", "type": "int32", "versions": "0+" },
            { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
              "fields": [
                { "name": "FirstOffset", "type": "int64", "versions": "0+" },
                { "name": "LastOffset", "type": "int64", "versions": "0+" },
                { "name": "AcknowledgeType", "type": "int8", "versions": "0+" }
              ]
            }
          ]
        }
      ]
    }
  ]
}

Purpose: Sent to the ShareCoordinator to write acknowledgments as pending (uncommitted) within the transaction. The acks become visible only when the transaction commits.

4.3 ShareCoordinator Changes

The ShareCoordinator must handle transaction completion, mirroring GroupCoordinator.completeTransaction():


// In ShareCoordinatorShard (NEW method):
public CoordinatorResult<Void, CoordinatorRecord> completeTransaction(
    long producerId,
    short producerEpoch,
    TransactionResult result    // COMMIT or ABORT
) {
    if (result == TransactionResult.COMMIT) {
        // Materialize pending transactional acks into share-group state
        return applyPendingAcknowledgements(producerId, producerEpoch);
    } else {
        // Discard pending transactional acks
        return discardPendingAcknowledgements(producerId, producerEpoch);
    }
}




The existing hook ShareCoordinatorShard.replayEndTransactionMarker() already exists for replaying transaction markers during log recovery. This KIP extends it to also handle live transaction completion.

4.4 State Storage

Transactional acks are written to __share_group_state using CoordinatorRuntime.scheduleTransactionalWriteOperation(). This follows the same pattern used by GroupCoordinator for transactional offset commits to __consumer_offsets.

Records in __share_group_state are written with the producer's producerId and producerEpoch, making them part of the transaction. They become readable by other consumers only after the transaction commits and the WriteTxnMarkers COMMIT marker is written.


5 Flow diagram 


CLIENT --> BROKER communication (over the Kafka wire protocol):

  Share Consumer -----> Share Group Coordinator
    ShareFetch, ShareAcknowledge, ShareGroupHeartbeat

  TransactionManager --> Transaction Coordinator
    InitProducerId, AddPartitionsToTxn, AddShareAcksToTxn(NEW), EndTxn

  TransactionManager --> Share Group Coordinator
    TxnShareAcknowledgeRequest (NEW)

  Producer ------------> Data Partition Leaders
    ProduceRequest


BROKER --> BROKER communication (internal, not client-visible):

  Transaction Coordinator --> Data Partition Leaders
    WriteTxnMarkers (COMMIT/ABORT control record)

  Transaction Coordinator --> Group Coordinator (via __consumer_offsets leader)
    WriteTxnMarkers (materializes transactional offset commits)

  Transaction Coordinator --> Share Group Coordinator (via __share_group_state leader) [NEW]
    WriteTxnMarkers (materializes transactional share acks)



Abort case: 


6. Corner Cases

Failure Scenarios

  • Crash Before/During Txn: If the application crashes before commitTransaction(), the Transaction Coordinator (TC) auto-aborts

  • The "Critical Case" (Crash after sendShareAcks): If a crash occurs after acks are sent but before the final commit, the TC sends Abort  signal and ensures pending acks are discarded.

  • Zombie Producers: If a new producer instance starts, the old one is fenced. Any in-flight output or acks are discarded by the TC, preventing "split-brain" duplicates.

  • Post-Prepare Crash: If the TC crashes after logging PREPARE_COMMIT, the 2PC protocol ensures that the transaction is finalized upon recovery. Both outputs and acks will successfully materialize.


7. Compatibility, Deprecation, and Migration Plan

  • Zero Breaking Changes: Current implicit and explicit modes remain the default and continue to function as-is.

  • Opt-in Requirement: Users must explicitly enable the new mode via share.acknowledgement.mode = transactional.

  • Version Safety: * Clients will "fail fast" if the connected broker does not support the new TxnShareAcknowledgeRequest.

    • Fallback logic is included to issue warnings if a downgrade to non-transactional acks is necessary.

8. Test Plan

The verification strategy focuses on state machine integrity and fault tolerance under high-concurrency and failure scenarios.

  • Unit Tests: Validates state transitions (e.g., ACQUIRED to ACKNOWLEDGED on commit vs. AVAILABLE on abort), idempotency of operations, and transaction timeout/auto-abort logic.

  • Integration Tests: Focuses on end-to-end commit/abort flows, coordinator recovery, and multi-consumer behavior within a single group during network partitions.

  • System & Performance Tests: Benchmarks transactional vs. non-transactional modes and verifies exactly-once delivery within Flink/Spark checkpointing cycles.

  • Chaos Tests: Simulates broker and coordinator crashes specifically during critical phases like PREPARE_COMMIT to ensure protocol durability.


Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

1 Comment

  1. Ref -   producer side 2PC (Kafka as participant)     KAFKA-15370 - Getting issue details... STATUS   KIP: KIP-939: Support Participation in 2PC#2PCRefresher