This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: [KAFKA-19883]( KAFKA-19883 - Getting issue details... STATUS )

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

1 Motivation

Today, the Producer's sendOffsetsToTransaction(offsets, consumerGroupMetadata) allows EOS in read-process-write topologies that consume from

regular consumer groups. With KIP-932 introducing share groups, the equivalent capability is missing for share-group consumers.

This blocks share-group adoption in:

 1.  MirrorMaker and other Kafka-to-Kafka mirroring/forwarding pipelines.
 2.  Kafka Streams stateless topologies that want to use share groups for parallelism beyond partition count.

 3.  Atomic DLQ write in different connectors


The Goal: Atomic "Read-Process-Write"

Enable Exactly-Once Semantics (EOS) by making acknowledgments part of a Kafka transaction.

Either the output is produced AND the source record is acknowledged, or neither happens. This KIP is scopred for Kafka producer write AND consumes from a share group.

1.1 Background: Share Groups

Current State Machine:

  • AVAILABLE: Ready for delivery.

  • ACQUIRED: Locked by a consumer.

  • RENEW
  • ACKNOWLEDGED / ARCHIVED: Terminal states; records are never redelivered.

  • AVAILABLE (Released): Returned to the pool for redelivery.

Current Acknowledgment Modes:

  1. Implicit: Automatic ack on the subsequent poll().

  2. Explicit: Manual ack via acknowledge(record, type) followed by commitSync()

2 Public Interfaces

Public API additions:

  • Producer.sendShareAcknowledgementsToTransaction(Map<TopicIdPartition, List<AcknowledgementBatch>>, ShareGroupMetadata).
  • ShareGroupMetadata class (groupId, memberId, memberEpoch, optional groupInstanceId).
  • ShareConsumer.shareGroupMetadata() to obtain the above.

Wire protocol additions:

  • New TxnShareAcknowledge RPC (apiKey 80 or next free).
  • Schema mirrors TxnOffsetCommitRequest shape but carries AcknowledgementBatch instead of OffsetCommitInfo.

Broker / coordinator additions:

  • TransactionCoordinator routes TxnShareAcknowledge to the share coordinator partitions (similar to how it routes TxnOffsetCommit to consumer-coordinator partitions today).
  • ShareCoordinator gets a transactional state model: pending ACKs are staged with (producerId, producerEpoch); on WriteTxnMarkers from the txn coordinator, they're either committed (state advances) or aborted (state rolled back).
  • New ShareTransactionalUpdateRecord in __share_group_state (or the ShareUpdateRecord is extended with optional producer ID/epoch fields to mark transactional intent).

SharePartition state machine additions:

  • New transient state TX_PENDING on a batch (orthogonal to RecordState); records in this state are NOT visible to peers, NOT redeliverable, lock NOT eligible for expiry.
  • On WriteTxnMarkers commit: TX_PENDING(ACCEPT) → ACKNOWLEDGED; same for RELEASE and REJECT.
  • On WriteTxnMarkers abort: TX_PENDING(*) → back to ACQUIRED (lock continues; consumer can retry the work).

Pseudo code


// Kafka-as-destination path (true EOS via TxnShareAcknowledge):
batch boundary (or per-record, depending on tx granularity):
    producer.beginTransaction()
    for each (record, output) in batch:
        producer.send(destinationTopic, output)
    producer.sendShareAcknowledgementsToTransaction(
        ackMap,                                  // record→ACCEPT for processed records
        shareConsumer.shareGroupMetadata()
    )
    producer.commitTransaction()
    // No separate consumer.acknowledge() — the ACK is in the transaction.

-------------------

// Pure-external-sink path (at-least-once + idempotent destination):[this KIP is not changin anything of this flow]
worker thread per record:
    output = process(record)
    externalSink.write(output, idempotencyKey)        // user task does this
    consumer.acknowledgeAsync(record, ACCEPT)
        .thenAccept(_ -> markComplete(record))


3. Compatibility, Deprecation, and Migration Plan

  • Zero Breaking Changes: Current implicit and explicit modes remain the default and continue to function as-is.

  • Opt-in Requirement: Users must explicitly enable the new mode.

4. Test Plan

The verification strategy focuses on state machine integrity and fault tolerance under high-concurrency and failure scenarios.

  • Unit Tests: Validates state transitions (e.g., ACQUIRED to ACKNOWLEDGED on commit vs. AVAILABLE on abort), idempotency of operations, and transaction timeout/auto-abort logic.

  • Integration Tests: Focuses on end-to-end commit/abort flows, coordinator recovery, and multi-consumer behavior within a single group during network partitions.

  • System & Performance Tests: Benchmarks transactional vs. non-transactional modes and verifies exactly-once delivery.

  • Chaos Tests: Simulates broker and coordinator crashes specifically during critical phases like PREPARE_COMMIT to ensure protocol durability.


Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

2 Comments

  1. Ref -   producer side 2PC (Kafka as participant)     KAFKA-15370 - Getting issue details... STATUS   KIP: KIP-939: Support Participation in 2PC#2PCRefresher