This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Voting

Discussion thread: here 

JIRA: [KAFKA-19883]( KAFKA-19883 - Getting issue details... STATUS )

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

1 Motivation

Today, the Producer's sendOffsetsToTransaction(offsets, consumerGroupMetadata) allows EOS in read-process-write topologies that consume from

regular consumer groups. With KIP-932 introducing share groups, the equivalent capability is missing for share-group consumers.

This blocks share-group adoption in:

 1.  MirrorMaker and other Kafka-to-Kafka mirroring/forwarding pipelines.
 2.  Kafka Streams stateless topologies that want to use share groups for parallelism beyond partition count.

 3.  Atomic DLQ write in different connectors


The Goal: Atomic "Read-Process-Write"

Enable Exactly-Once Semantics (EOS) by making acknowledgments part of a Kafka transaction.

Either the output is produced AND the source record is acknowledged, or neither happens. This KIP is scopred for Kafka producer write AND consumes from a share group.

1.1 Background: Share Groups

Current State Machine:

  • AVAILABLE: Ready for delivery.

  • ACQUIRED: Locked by a consumer.

  • RENEW
  • ACKNOWLEDGED / ARCHIVED: Terminal states; records are never redelivered.

  • AVAILABLE (Released): Returned to the pool for redelivery.

Current Acknowledgment Modes:

  1. Implicit: Automatic ack on the subsequent poll().

  2. Explicit: Manual ack via acknowledge(record, type) followed by commitSync()

2 Public Interfaces

Public API additions:

  • Producer.sendShareAcknowledgementsToTransaction(Map<TopicIdPartition, List<AcknowledgementBatch>>, ShareGroupMetadata).
  • ShareGroupMetadata class (groupId, memberId, memberEpoch, optional groupInstanceId).
  • ShareConsumer.shareGroupMetadata() to obtain the above.


Producer (clients module)

public interface Producer<K, V> {

    /**
     * Sends a list of share-group acknowledgements to the consumer-coordinator and marks
     * them for atomic commit alongside the records produced in this transaction.
     *
     * The acknowledgements are staged on the broker in a TX_PENDING state until the
     * transaction is committed or aborted. On commit, ACCEPT records transition to
     * ACKNOWLEDGED and REJECT records transition to ARCHIVING/ARCHIVED. On abort,
     * all staged records revert to ACQUIRED and remain owned by the original consumer
     * until either the consumer re-acknowledges them or the acquisition lock expires.
     *
     * @param acknowledgements Per-partition list of acknowledgement batches. Only
     *                         AcknowledgeType.ACCEPT (1) and AcknowledgeType.REJECT (3)
     *                         are valid inside a transaction. RELEASE (2), RENEW (4),
     *                         and GAP (0) are rejected with InvalidRecordStateException.
     * @param groupMetadata    Snapshot of the share consumer's group identity, obtained
     *                         from ShareConsumer.shareGroupMetadata().
     *
     * @throws IllegalStateException if no transaction is in progress, or if the producer
     *                               is not transactional.
     * @throws ProducerFencedException if another producer with the same transactionalId
     *                                 has fenced this one.
     * @throws UnsupportedVersionException if the cluster does not advertise apiKey 93
     *                                     (TxnShareAcknowledge) via ApiVersions.
     * @throws GroupAuthorizationException if the configured principal cannot Write to
     *                                     the share group.
     * @throws InvalidProducerEpochException if the producer's epoch is stale.
     * @throws KafkaException for other non-fatal errors that may be retried by aborting
     *                        the transaction and retrying the entire read-process-write
     *                        loop.
     *
     * Threading: Returns immediately after enqueuing the request for the producer's
     *	background Sender thread. The broker has NOT processed it yet.
     * 	Any error is reported when commitTransaction() or abortTransaction() is later called.
     */
    void sendShareAcknowledgementsToTransaction(
        Map<TopicIdPartition, List<AcknowledgementBatch>> acknowledgements,
        ShareGroupMetadata groupMetadata
    ) throws ProducerFencedException;
}


ShareConsumer (clients module)

public interface ShareConsumer<K, V> {

    /**
     * Returns an immutable snapshot of this consumer's share-group identity for use
     * with Producer.sendShareAcknowledgementsToTransaction.
     *
     * The snapshot captures groupId, memberId, and memberEpoch atomically; if a
     * rebalance changes the memberEpoch between the snapshot and the producer call,
     * the broker will reject the staging request with STALE_MEMBER_EPOCH and the
     * user must abort the transaction and retry the read-process-write loop.
     *
     * @throws UnsupportedVersionException if the cluster does not support KIP-1289.
     * @throws TimeoutException if the snapshot cannot be obtained within the
     *                          configured default.api.timeout.ms.
     *
     * Threading: thread-safe; safe to call concurrently with poll() and acknowledge().
     */
    ShareGroupMetadata shareGroupMetadata();
}


ShareGroupMetadata (new class in clients module, package o.a.k.clients.consumer)

/* Thread-safe, Immutable, Concurrent with poll/acknowledge */
public final class ShareGroupMetadata {
    public ShareGroupMetadata(String groupId, String memberId, int memberEpoch);
    public String groupId();
    public String memberId();
    public int memberEpoch();
    @Override public boolean equals(Object other);
    @Override public int hashCode();
    @Override public String toString();
}


Wire protocol additions:

  • New TxnShareAcknowledge RPC (apiKey 93 or next free).
  • Schema mirrors TxnOffsetCommitRequest shape but carries AcknowledgementBatch instead of OffsetCommitInfo.


New RPC: TxnShareAcknowledgeRequest (apiKey 93)
Listeners: broker.
Acknowledge type values: 0=Gap, 1=Accept, 2=Release, 3=Reject, 4=Renew.
Transactional constraint: only 1 (Accept) and 3 (Reject) are valid inside a transaction.
A batch containing any other value is rejected with INVALID_RECORD_STATE.
FieldTypeNotes
TransactionalIdstring (nullable)The producer's transactional.id.
GroupIdstring, entityType=groupIdThe share group ID.
ProducerIdint64, entityType=producerIdFor fencing.
ProducerEpochint16For fencing.
MemberIdstring, entityType=memberIdThe share group member ID.
MemberEpochint32For share-group fencing.
Topics[]TxnShareAcknowledgeTopicmapKey=true on TopicId.
Topics.TopicIduuid, mapKey=true
Topics.Partitions[]TxnShareAcknowledgePartitionmapKey=true on PartitionIndex.
Partitions.PartitionIndexint32, mapKey=true
Partitions.AcknowledgementBatches[]TxnShareAcknowledgeBatch
Batch.FirstOffsetint64Inclusive.
Batch.LastOffsetint64Inclusive.
Batch.AcknowledgeTypes[]int8Per-offset ack type byte. Size 1 = uniform type for whole range.
New RPC: TxnShareAcknowledgeResponse (apiKey 94, v0)

Top-level supported errors:
- GROUP_AUTHORIZATION_FAILED
- TOPIC_AUTHORIZATION_FAILED
- TRANSACTIONAL_ID_AUTHORIZATION_FAILED
- TRANSACTIONAL_ID_NOT_FOUND
- INVALID_PRODUCER_EPOCH / PRODUCER_FENCED
- INVALID_PRODUCER_ID_MAPPING
- INVALID_TXN_STATE
- UNKNOWN_MEMBER_ID
- STALE_MEMBER_EPOCH
- TRANSACTION_ABORTABLE (KIP-890)
- UNKNOWN_SERVER_ERROR

Per-partition supported errors:
- UNKNOWN_TOPIC_OR_PARTITION / UNKNOWN_TOPIC_ID
- NOT_LEADER_OR_FOLLOWER (with CurrentLeader populated)
- INVALID_RECORD_STATE
- INVALID_REQUEST
- KAFKA_STORAGE_ERROR

FieldTypeNotes
ThrottleTimeMsint32
ErrorCodeint16Top-level error.
Responses[]TxnShareAcknowledgeTopicResponse
Responses.TopicIduuid, mapKey=true
Responses.Partitions[]TxnShareAcknowledgePartitionResponse
Partitions.PartitionIndexint32
Partitions.ErrorCodeint16Per-partition error.
Partitions.ErrorMessagestring (nullable)
Partitions.CurrentLeaderLeaderIdAndEpoch (tagged)Populated on NOT_LEADER_OR_FOLLOWER.
NodeEndpoints[]NodeEndpoint (tagged)Top-level: address of any new leader referenced above.


Existing RPCs NOT changed

- WriteTxnMarkers (apiKey 27): NO schema change. The broker hooks the
  existing marker arrival in KafkaApis.handleWriteTxnMarkersRequest and broadcasts
  to all SharePartition instances on that broker. Per-record fencing by
  (producerId, producerEpoch) filters out non-participants. This means the
  TransactionCoordinator does NOT need to track share-partitions as transaction
  participants explicitly — the broadcast is correct because the per-record
  state filter is the source of truth.

- AddPartitionsToTxn (apiKey 24): NO schema change. KIP-1289 does NOT add share
  partitions as transaction participants. The transactional output-write path
  continues to use AddPartitionsToTxn for the output topics; the share-group
  staging path uses TxnShareAcknowledge directly (TV2-only, single round trip).

- ApiVersions (apiKey 18): NO schema change, but the broker MUST advertise
  apiKey 93 only when feature-flag share.version >= 2 (see Enablement section).

- TxnOffsetCommit (apiKey 28): NO schema change. Independent path for regular
  consumer groups; share groups use the new RPC.


Internal Topic Record Schemas

__transaction_state: unchanged

share-partition participation in transactions is tracked via the existing TransactionMetadata.topicPartitions field

(mechanism: broker-internal AddPartitionsToTxn triggered by TxnShareAcknowledge handler, mirroring how __consumer_offsets-N is registered via AddOffsetsToTxn for regular consumer groups)

Flow — Regular Consumer Group 
  • Producer writes to output topic
    • Source Component: TransactionManager (Client)
    • Action: Sends AddPartitionsToTxn("output-A-0") to TxnCoord.
    • State Change: TxnCoord records participants = {output-A-0} in __transaction_state.
  • Producer calls sendOffsetsToTransaction("my-group")
    • Source Component: TransactionManager (Client) & TransactionCoordinator (Broker Core)
    • Action: Sends AddOffsetsToTxn("my-group") to TxnCoord.
    • Execution: TxnCoord computes hash("my-group") = (let's say) 7.
    • State Change: Records participants = {output-A-0, __consumer_offsets-7} in __transaction_state .
  • Producer commits
    • Source Component: TransactionCoordinator (Broker Core)
    • Execution: Reads participants from __transaction_state and resolves brokers via MetadataCache:
      • output-A-0  — > Broker 3
      • __consumer_offsets-7 ---→ Broker 4
    • Action: Sends WriteTxnMarkers to Broker 3 AND Broker 4.
  • Each broker writes a control batch to its log
    • Source Component: KafkaApis & GroupCoordinator (Broker Core)
    • Execution:
      • Broker 3: Appends COMMIT batch to output-A-0 log (makes records visible to read_committed consumers).
      • Broker 4: Appends COMMIT batch to __consumer_offsets-7 log (makes staged offsets visible to GroupCoordinator).
Flow — Share Group (THIS KIP)
  • producer.send("output-A", record)
    • Source Component: TransactionManager (Client)
    • Action: Sends AddPartitionsToTxnRequest(["output-A-0"]).
    • State Change: Updates local state to TransactionMetadata.topicPartitions = {output-A-0}.
  • producer.sendShareAcknowledgementsToTransaction(acks, meta)
    • Source Component: TransactionManager (Client)  ---→  KafkaApis (Broker Core)
    • Action: Sends TxnShareAcknowledgeRequest to Broker 5 (the SharePartition leader - lets say it is broker 5).
    • Broker Execution: KafkaApis intercepts the request and performs two background operations:
      1. Resolves the state partition: shareCoordinatorPartition = (__share_group_state, partitionFor(groupId)).
      2. First call txnCoordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, Set.of(shareCoordinatorPartition), callback, TV_2, requestLocal) 
        1. Then callback → Calls txnCoordinator.handleAddPartitionsToTransaction to dynamically register __share_group_state-N in __transaction_state.
        2. persists to __transaction_state via TransactionLogValue: topicPartitions = {output-A-0, __share_group_state-N}
    • State Change: TransactionMetadata now tracks both topicPartitions = {output-A-0, __share_group_state-N}.
    • Staging Phase: Only on success, SharePartitionManager stages the transactional acknowledgements, transitioning the internal InFlightState to TX_PENDING.
  • producer.commitTransaction()
    • Source Component: Client  -------> TransactionCoordinator(Broker Core)
    • Action: Sends EndTxnRequest(COMMIT).
    • Execution: TxnCoord reads the topic partitions from state and resolves the physical leaders via MetadataCache:
      • output-A-0 --------> Broker 3
      • __share_group_state-N  -----→ Broker 5
    • Action: Dispatches WriteTxnMarkers to both brokers.
  • Each broker handles the transaction markers
    • Execution on Broker 3 (Output Side):
      • Appends the COMMIT control batch to the output-A-0 log.
      • KIP-1289 hook runs (no-op since no TX_PENDING here)
    • Execution on Broker 5 (Share Side):
      • Appends the COMMIT control batch to the __share_group_state-N log.
      • Fires the KIP-1289 hook: Triggers sharePartitionManager.applyTxnMarker(COMMIT).
      • Scans the internal partitionCache, finds the matching TX_PENDING flight states, and commits them to ACKNOWLEDGED.
  • Transaction Completion
    • Action: Both brokers return successful ACKs back to TxnCoord.
    • State Change: TxnCoord transitions the state to COMPLETE_COMMIT.
    • Result: The client-side blocking producer.commitTransaction() call returns successfully.

[Note: Only TV 2 supporting - broker-side auto-registration and proper epoch fencing, do not want to maintain TV1 code. TV2 is already default option since 4.0]

__share_group_state schema unchanged 

 no ShareCoordinatorShard changes; no snapshot replay changes; no compatibility risk.

Inter-broker protocol unchanged

 existing handleAddPartitionsToTransaction accepts arbitrary TopicPartition; no new code path on TxnCoord side

State machine additions:

  • New transient state TX_PENDING 
  • On WriteTxnMarkers commit: TX_PENDING(ACCEPT) → ACKNOWLEDGED; same for RELEASE and REJECT.
  • On WriteTxnMarkers abort: TX_PENDING(*) → back to ACQUIRED (lock continues; consumer can retry the work).

Pseudo code


// One-time setup
shareConsumer.subscribe(List.of("source-topic"));
producer.initTransactions();   // exactly once per producer instance

// CTP loop
while (running) {
    ConsumerRecords<K, V> records = shareConsumer.poll(Duration.ofSeconds(5));
    if (records.isEmpty()) continue;

    try {
        producer.beginTransaction();

        Map<TopicIdPartition, Acknowledgements> acks = new HashMap<>();

        for (ConsumerRecord<K, V> record : records) {
            V output = process(record);                                          
            producer.send(new ProducerRecord<>("destination-topic", output));    
            acks.computeIfAbsent(
                    new TopicIdPartition(record.topicId(), 
                            new TopicPartition(record.topic(), record.partition())),
                    k -> Acknowledgements.empty()
                ).add(record.offset(), AcknowledgeType.ACCEPT);                  // accumulate
        }

        // Compress acks to AcknowledgementBatch wire form and stage in the txn.
        producer.sendShareAcknowledgementsToTransaction(
            toBatches(acks),                                                      // Map<TopicIdPartition, List<AcknowledgementBatch>>
            shareConsumer.shareGroupMetadata()                                    
        );

        producer.commitTransaction();    // BLOCKS — true synchronisation point
        // Records are now both produced AND acknowledged ATOMICALLY.
        // Do NOT call shareConsumer.acknowledge(...) on these records.
		//    if called mistakenly then non-transactional acknowledge is rejected (TX_PENDING to ACKNOWLEDGED - InvalidRecordStateException); the transactional path works.

    } catch (ProducerFencedException | UnsupportedVersionException fatal) {
        throw fatal;                     // unrecoverable — kill the process
    } catch (KafkaException abortable) {
        producer.abortTransaction();     // BLOCKS — txn reverted; staged records revert to ACQUIRED on broker
        // Records will be re-delivered on next poll(); retry naturally.
    }
}


Corner cases 

Here are your corner cases formatted using the same clean, un-phased, human-style layout. The wording, titles, technical specifications, and internal error codes remain exactly as provided.

1. Broker Crash During TX_PENDING (in-memory state)

  • Scenario: broker stages TX_PENDING then crashes before marker arrives.
  • Behavior: TX_PENDING is in-memory only (__share_group_state schema unchanged in v1); records replay as ACQUIRED/AVAILABLE after broker restart and are redelivered.
  • Guarantee: no data loss; at-least-once on broker-crash window (existing share-group baseline).
  • Future: persistent TX_PENDING via extending ShareSnapshotValue.DeliveryState deferred to a follow-up KIP.

2. TxnCoord Registration Fails (COORDINATOR_NOT_AVAILABLE, NOT_COORDINATOR)

  • Scenario: broker-internal AddPartitionsToTxn returns retriable error.
  • Behavior: broker does NOT stage; returns error to producer; producer enters abortable state.
  • Recovery: user catches KafkaExceptionabortTransaction() → retries.

3. Producer Fenced Mid-Stage (PRODUCER_FENCED, INVALID_PRODUCER_EPOCH)

  • Scenario: zombie producer A (epoch 1) sends TxnShareAcknowledge after producer B (epoch 2) has been initialized.
  • Behavior: TxnCoord registration rejects with fencing error → no staging happens → producer A becomes fatal.
  • Late-arriving marker safety: per-InFlightState (stagedProducerId, stagedProducerEpoch) filter rejects mismatched markers; idempotent.

5. Duplicate WriteTxnMarkers Delivery

  • Scenario: TxnCoord retries marker after network failure; broker receives same marker twice.
  • Behavior: second invocation finds state != TX_PENDING (already resolved) → no-op.
  • Guarantee: idempotent by state filter — verified in InFlightStateTxnTest.

6. Acquisition Lock Expiry While TX_PENDING

  • Scenario: transaction stages records but stalls; lock duration expires before commit/abort.
  • Behavior: lock timer detects TX_PENDING with expired lock → reverts to ACQUIRED → re-acquirable; subsequent marker for the stale staging is filtered out (epoch mismatch).
  • Configuration: group.share.record.lock.duration.ms (default 30s; per share group).

7. Transaction Timeout While TX_PENDING (transaction.timeout.ms)

  • Scenario: producer crashes after staging; TxnCoord detects abandoned transaction via timeout.
  • Behavior: TxnCoord auto-sends WriteTxnMarkers(ABORT); broker reverts TX_PENDING → ACQUIRED → AVAILABLE on next lock cycle. No data loss; eventual redelivery.

8. Stale Member Epoch (STALE_MEMBER_EPOCH, UNKNOWN_MEMBER_ID)

  • Scenario: rebalance changes memberEpoch between shareGroupMetadata() snapshot and sendShareAcksToTransaction() call.
  • Behavior: broker compares request memberEpoch with current SharePartition owner; mismatch → returns STALE_MEMBER_EPOCH; no staging.
  • Recovery: abortable; user retries the entire read-process-write loop with a fresh snapshot.

9. Mixed-Version Cluster During Rolling Upgrade

  • Scenario: some brokers lack apiKey 93 (TxnShareAcknowledge).
  • Behavior: producer probes ApiVersions; if apiKey 93 absent on any broker → UnsupportedVersionException thrown synchronously, no bytes on wire.
  • Recovery: user must abort the transaction (output-side AddPartitionsToTxn still needs cleanup).
  • Gating: share.version=2 finalized feature flag controls broker advertisement.

10. Invalid Acknowledge Type Inside Transaction

  • Scenario: producer sends AcknowledgeType.RELEASE (2), RENEW (4), or GAP (0) inside sendShareAcksToTransaction.
  • Behavior: broker rejects with INVALID_RECORD_STATE; no staging.
  • Allowed values: only ACCEPT (1) and REJECT (3) — release/renew/gap have no transactional semantics.

11. Partial Multi-Partition Stage Failure

  • Scenario: producer stages on partitions A and B; A succeeds, B fails.
  • Behavior: per-partition 2PC rollback — already-staged records on A are reverted to ACQUIRED before the response returns (commit 8 on the branch).
  • Result: all-or-nothing per call; user sees error → aborts → retries.
  • Guarantee: no partial commit; clean rollback semantics.

12. Leader Change for __share_group_state-N Between Stage and Commit

  • Scenario: share-coordinator partition leadership moves from broker 5 to broker 7 between staging and WriteTxnMarkers dispatch.
  • Behavior: TxnCoord re-resolves leader at commit time via MetadataCache → marker goes to broker 7; broker 7's in-memory partitionCache does NOT contain the original TX_PENDING state.
  • Limitation in this intial version: TX_PENDING lost on share-coordinator leadership change (same root cause as Corner Case 1); lock-timeout reverts records; redelivery.
  • Future: persistent TX_PENDING closes this case as well.

13. Transaction Aborted Without Markers Reaching Broker

  • Scenario: transaction aborts but broker hosting TX_PENDING is partitioned from TxnCoord.
  • Behavior: lock timeout (Corner Case 6) eventually reverts; subsequent late-arriving marker is no-op (state already AVAILABLE or epoch mismatched).
  • Convergence guaranteed.

14. Concurrent Transactions From Same Producer (Different Epochs)

  • Scenario: producer rapidly bumps epoch; old in-flight TxnShareAcknowledge arrives with stale epoch.
  • Behavior: TxnCoord rejects with PRODUCER_FENCED; broker never stages. Per-record fencing on the state machine prevents any stale marker from resolving newer staging.

Note: We can persist the record state and get rid of case 1 in follow up KIP.

Metrics

All new metrics follow the existing Kafka metric conventions (Yammer for broker JMX, KafkaMetric for client-side).

Metric names mirror the established kafka.server:type=group-coordinator-metrics,name=... and kafka.server:type=share-coordinator-metrics,name=... patterns.

Backward-compatible: no existing metrics renamed or removed.

ModuleMetricTypePurpose
Broker — SharePartitionManagerTxnPendingRecordsCountGaugeCurrent count of records in TX_PENDING (primary health signal)
Broker — SharePartitionManagerTxnShareAcknowledgeRequestLatencyMsHistogramp99 latency of staging requests
Broker — SharePartitionManagerTxnPendingLockExpiredCountCounterAny non-zero = abandoned txns or missing markers (critical alert)
Broker — TransactionCoordinatorTransactionPartitionsCount (existing)GaugeReused; now includes __share_group_state-N entries
Producershare-ack-txn-send-rateMeterEOS-call throughput
Producershare-ack-txn-send-error-rateMeterStage-failure rate (drives retry loops)
Consumershare-group-metadata-fetch-rateMeterConfirms read-process-write loop is active
Total: 6 new + 1 reused = 7 metrics. Primary alert: TxnPendingRecordsCount > 0 sustained for > 60s.


Compatibility, Deprecation, and Migration Plan

Enablement and Rollout Plan

Feature Gating

opt-in, online-upgradable, and zero-disruption for existing workloads - No new feature flag introduced. Uses existing kafka-features.sh machinery.

Feature flagRequired levelWhy
share.version>= 2New level finalises KIP-1289; brokers below this advertise no apiKey <93>
transaction.version>= 2KIP-1289 is TV2-only (inherits auto-registration and proper epoch fencing)

Rolling Upgrade (Online; No Traffic Disruption)

Phase 1 — Software upgrade (rolling)
  Upgrade brokers one at a time to the binary containing KIP-1289.
  KIP-1289 code is dormant (share.version still 1).
  Standard Kafka rolling-restart

Phase 2 — Soak timing
  Verify cluster health

Phase 3 — Finalise feature
  kafka-features.sh upgrade --feature share.version --version 2
  - Online metadata propagation (update of  __cluster_metadata)
  - No restart, no rebalance, no leader change, no socket disruption
  - Brokers begin advertising apiKey 93

Phase 4 — Client onboarding
  Roll out applications calling sendShareAcknowledgementsToTransaction.
  Add the alerts (esp. TxnPendingRecordsCount) and relevant metrics in dashboard & observability.

Cross-Version Client/Broker Matrix

ClientBroker, share.version=2Broker, share.version=1Old broker (no binary)
NewWorksUnsupportedVersionException (synchronous, no wire bytes)Same
OldUnaffectedUnaffectedUnaffected

Downgrade

1. Drain in-flight TX_PENDING:
   - Halt producers calling sendShareAcknowledgementsToTransaction
   - Wait for transaction.timeout.ms (default 60s) for any abandoned txns to clear
2. kafka-features.sh downgrade --feature share.version --version 1
Broker rejects the downgrade RPC if any in-memory TX_PENDING exists, preventing data inconsistency.
Software downgrade (binary): must follow feature downgrade; standard rolling restart.

Test Plan

Rough implementation [The actual implementation will be phasewise with multiple smaller PRs]:
    https://github.com/apache/kafka/pull/22357

The verification strategy focuses on state machine integrity and fault tolerance under high-concurrency and failure scenarios.

  • Unit Tests: Validates state transitions (e.g., ACQUIRED to ACKNOWLEDGED on commit vs. AVAILABLE on abort), idempotency of operations, and transaction timeout/auto-abort logic.

  • Integration Tests: Focuses on end-to-end commit/abort flows, coordinator recovery, and multi-consumer behavior within a single group during network partitions.

  • System & Performance Tests: Benchmarks transactional vs. non-transactional modes and verifies exactly-once delivery.

  • Chaos Tests: Simulates broker and coordinator crashes specifically during critical phases like PREPARE_COMMIT to ensure protocol durability.

Follow-up KIP (deferred)

  • Persistent TX_PENDING in __share_group_state for crash-resilient EOS on broker failover during staging (addresses Corner Cases).
  • Per-share-partition metric for TX_PENDING residency time — would add a histogram of "time spent in TX_PENDING" useful for diagnosing slow producers; can be added in the persistence KIP without compatibility concerns.
  • External processing engines or database as 2PC participants for write/sink records.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

2 Comments

  1. Ref -   producer side 2PC (Kafka as participant)     KAFKA-15370 - Getting issue details... STATUS   KIP: KIP-939: Support Participation in 2PC#2PCRefresher