DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Voting
Discussion thread: here
JIRA: [KAFKA-19883](
KAFKA-19883
-
Getting issue details...
STATUS
)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
1 Motivation
Today, the Producer's sendOffsetsToTransaction(offsets, consumerGroupMetadata) allows EOS in read-process-write topologies that consume from
regular consumer groups. With KIP-932 introducing share groups, the equivalent capability is missing for share-group consumers.
This blocks share-group adoption in:
1. MirrorMaker and other Kafka-to-Kafka mirroring/forwarding pipelines.
2. Kafka Streams stateless topologies that want to use share groups for parallelism beyond partition count.
3. Atomic DLQ write in different connectors
The Goal: Atomic "Read-Process-Write"
Enable Exactly-Once Semantics (EOS) by making acknowledgments part of a Kafka transaction.
Either the output is produced AND the source record is acknowledged, or neither happens. This KIP is scopred for Kafka producer write AND consumes from a share group.
1.1 Background: Share Groups
Current State Machine:
AVAILABLE: Ready for delivery.
ACQUIRED: Locked by a consumer.
- RENEW
ACKNOWLEDGED / ARCHIVED: Terminal states; records are never redelivered.
AVAILABLE (Released): Returned to the pool for redelivery.
Current Acknowledgment Modes:
Implicit: Automatic ack on the subsequent
poll().Explicit: Manual ack via
acknowledge(record, type)followed bycommitSync()
2 Public Interfaces
Public API additions:
- Producer.sendShareAcknowledgementsToTransaction(Map<TopicIdPartition, List<AcknowledgementBatch>>, ShareGroupMetadata).
- ShareGroupMetadata class (groupId, memberId, memberEpoch, optional groupInstanceId).
- ShareConsumer.shareGroupMetadata() to obtain the above.
Producer (clients module)
public interface Producer<K, V> {
/**
* Sends a list of share-group acknowledgements to the consumer-coordinator and marks
* them for atomic commit alongside the records produced in this transaction.
*
* The acknowledgements are staged on the broker in a TX_PENDING state until the
* transaction is committed or aborted. On commit, ACCEPT records transition to
* ACKNOWLEDGED and REJECT records transition to ARCHIVING/ARCHIVED. On abort,
* all staged records revert to ACQUIRED and remain owned by the original consumer
* until either the consumer re-acknowledges them or the acquisition lock expires.
*
* @param acknowledgements Per-partition list of acknowledgement batches. Only
* AcknowledgeType.ACCEPT (1) and AcknowledgeType.REJECT (3)
* are valid inside a transaction. RELEASE (2), RENEW (4),
* and GAP (0) are rejected with InvalidRecordStateException.
* @param groupMetadata Snapshot of the share consumer's group identity, obtained
* from ShareConsumer.shareGroupMetadata().
*
* @throws IllegalStateException if no transaction is in progress, or if the producer
* is not transactional.
* @throws ProducerFencedException if another producer with the same transactionalId
* has fenced this one.
* @throws UnsupportedVersionException if the cluster does not advertise apiKey 93
* (TxnShareAcknowledge) via ApiVersions.
* @throws GroupAuthorizationException if the configured principal cannot Write to
* the share group.
* @throws InvalidProducerEpochException if the producer's epoch is stale.
* @throws KafkaException for other non-fatal errors that may be retried by aborting
* the transaction and retrying the entire read-process-write
* loop.
*
* Threading: Returns immediately after enqueuing the request for the producer's
* background Sender thread. The broker has NOT processed it yet.
* Any error is reported when commitTransaction() or abortTransaction() is later called.
*/
void sendShareAcknowledgementsToTransaction(
Map<TopicIdPartition, List<AcknowledgementBatch>> acknowledgements,
ShareGroupMetadata groupMetadata
) throws ProducerFencedException;
}
ShareConsumer (clients module)
public interface ShareConsumer<K, V> {
/**
* Returns an immutable snapshot of this consumer's share-group identity for use
* with Producer.sendShareAcknowledgementsToTransaction.
*
* The snapshot captures groupId, memberId, and memberEpoch atomically; if a
* rebalance changes the memberEpoch between the snapshot and the producer call,
* the broker will reject the staging request with STALE_MEMBER_EPOCH and the
* user must abort the transaction and retry the read-process-write loop.
*
* @throws UnsupportedVersionException if the cluster does not support KIP-1289.
* @throws TimeoutException if the snapshot cannot be obtained within the
* configured default.api.timeout.ms.
*
* Threading: thread-safe; safe to call concurrently with poll() and acknowledge().
*/
ShareGroupMetadata shareGroupMetadata();
}
ShareGroupMetadata (new class in clients module, package o.a.k.clients.consumer)
/* Thread-safe, Immutable, Concurrent with poll/acknowledge */
public final class ShareGroupMetadata {
public ShareGroupMetadata(String groupId, String memberId, int memberEpoch);
public String groupId();
public String memberId();
public int memberEpoch();
@Override public boolean equals(Object other);
@Override public int hashCode();
@Override public String toString();
}
Wire protocol additions:
- New TxnShareAcknowledge RPC (apiKey 93 or next free).
- Schema mirrors TxnOffsetCommitRequest shape but carries AcknowledgementBatch instead of OffsetCommitInfo.
New RPC: TxnShareAcknowledgeRequest (apiKey 93) Listeners: broker. Acknowledge type values: 0=Gap, 1=Accept, 2=Release, 3=Reject, 4=Renew. Transactional constraint: only 1 (Accept) and 3 (Reject) are valid inside a transaction. A batch containing any other value is rejected with INVALID_RECORD_STATE.
| Field | Type | Notes |
|---|---|---|
| TransactionalId | string (nullable) | The producer's transactional.id. |
| GroupId | string, entityType=groupId | The share group ID. |
| ProducerId | int64, entityType=producerId | For fencing. |
| ProducerEpoch | int16 | For fencing. |
| MemberId | string, entityType=memberId | The share group member ID. |
| MemberEpoch | int32 | For share-group fencing. |
| Topics | []TxnShareAcknowledgeTopic | mapKey=true on TopicId. |
| Topics.TopicId | uuid, mapKey=true | |
| Topics.Partitions | []TxnShareAcknowledgePartition | mapKey=true on PartitionIndex. |
| Partitions.PartitionIndex | int32, mapKey=true | |
| Partitions.AcknowledgementBatches | []TxnShareAcknowledgeBatch | |
| Batch.FirstOffset | int64 | Inclusive. |
| Batch.LastOffset | int64 | Inclusive. |
| Batch.AcknowledgeTypes | []int8 | Per-offset ack type byte. Size 1 = uniform type for whole range. |
New RPC: TxnShareAcknowledgeResponse (apiKey 94, v0) Top-level supported errors: - GROUP_AUTHORIZATION_FAILED - TOPIC_AUTHORIZATION_FAILED - TRANSACTIONAL_ID_AUTHORIZATION_FAILED - TRANSACTIONAL_ID_NOT_FOUND - INVALID_PRODUCER_EPOCH / PRODUCER_FENCED - INVALID_PRODUCER_ID_MAPPING - INVALID_TXN_STATE - UNKNOWN_MEMBER_ID - STALE_MEMBER_EPOCH - TRANSACTION_ABORTABLE (KIP-890) - UNKNOWN_SERVER_ERROR Per-partition supported errors: - UNKNOWN_TOPIC_OR_PARTITION / UNKNOWN_TOPIC_ID - NOT_LEADER_OR_FOLLOWER (with CurrentLeader populated) - INVALID_RECORD_STATE - INVALID_REQUEST - KAFKA_STORAGE_ERROR
| Field | Type | Notes |
|---|---|---|
| ThrottleTimeMs | int32 | |
| ErrorCode | int16 | Top-level error. |
| Responses | []TxnShareAcknowledgeTopicResponse | |
| Responses.TopicId | uuid, mapKey=true | |
| Responses.Partitions | []TxnShareAcknowledgePartitionResponse | |
| Partitions.PartitionIndex | int32 | |
| Partitions.ErrorCode | int16 | Per-partition error. |
| Partitions.ErrorMessage | string (nullable) | |
| Partitions.CurrentLeader | LeaderIdAndEpoch (tagged) | Populated on NOT_LEADER_OR_FOLLOWER. |
| NodeEndpoints | []NodeEndpoint (tagged) | Top-level: address of any new leader referenced above. |
Existing RPCs NOT changed
- WriteTxnMarkers (apiKey 27): NO schema change. The broker hooks the
existing marker arrival in KafkaApis.handleWriteTxnMarkersRequest and broadcasts
to all SharePartition instances on that broker. Per-record fencing by
(producerId, producerEpoch) filters out non-participants. This means the
TransactionCoordinator does NOT need to track share-partitions as transaction
participants explicitly — the broadcast is correct because the per-record
state filter is the source of truth.
- AddPartitionsToTxn (apiKey 24): NO schema change. KIP-1289 does NOT add share
partitions as transaction participants. The transactional output-write path
continues to use AddPartitionsToTxn for the output topics; the share-group
staging path uses TxnShareAcknowledge directly (TV2-only, single round trip).
- ApiVersions (apiKey 18): NO schema change, but the broker MUST advertise
apiKey 93 only when feature-flag share.version >= 2 (see Enablement section).
- TxnOffsetCommit (apiKey 28): NO schema change. Independent path for regular
consumer groups; share groups use the new RPC.
Internal Topic Record Schemas
__transaction_state: unchanged
share-partition participation in transactions is tracked via the existing TransactionMetadata.topicPartitions field
(mechanism: broker-internal AddPartitionsToTxn triggered by TxnShareAcknowledge handler, mirroring how __consumer_offsets-N is registered via AddOffsetsToTxn for regular consumer groups)
Flow — Regular Consumer Group
- Producer writes to output topic
- Source Component:
TransactionManager(Client) - Action: Sends
AddPartitionsToTxn("output-A-0")toTxnCoord. - State Change:
TxnCoordrecordsparticipants = {output-A-0}in__transaction_state.
- Source Component:
- Producer calls sendOffsetsToTransaction("my-group")
- Source Component:
TransactionManager(Client) &TransactionCoordinator(Broker Core) - Action: Sends
AddOffsetsToTxn("my-group")toTxnCoord. - Execution:
TxnCoordcomputeshash("my-group") = (let's say) 7. - State Change: Records
participants = {output-A-0, __consumer_offsets-7}in__transaction_state.
- Source Component:
- Producer commits
- Source Component:
TransactionCoordinator(Broker Core) - Execution: Reads participants from
__transaction_stateand resolves brokers viaMetadataCache:output-A-0— > Broker 3__consumer_offsets-7---→ Broker 4
- Action: Sends
WriteTxnMarkersto Broker 3 AND Broker 4.
- Source Component:
- Each broker writes a control batch to its log
- Source Component:
KafkaApis&GroupCoordinator(Broker Core) - Execution:
- Broker 3: Appends
COMMITbatch tooutput-A-0log (makes records visible toread_committedconsumers). - Broker 4: Appends
COMMITbatch to__consumer_offsets-7log (makes staged offsets visible toGroupCoordinator).
- Broker 3: Appends
- Source Component:
Flow — Share Group (THIS KIP)
- producer.send("output-A", record)
- Source Component:
TransactionManager(Client) - Action: Sends
AddPartitionsToTxnRequest(["output-A-0"]). - State Change: Updates local state to
TransactionMetadata.topicPartitions = {output-A-0}.
- Source Component:
- producer.sendShareAcknowledgementsToTransaction(acks, meta)
- Source Component:
TransactionManager(Client) ---→KafkaApis(Broker Core) - Action: Sends
TxnShareAcknowledgeRequestto Broker 5 (theSharePartitionleader - lets say it is broker 5). - Broker Execution:
KafkaApisintercepts the request and performs two background operations:- Resolves the state partition:
shareCoordinatorPartition = (__share_group_state, partitionFor(groupId)). - First call txnCoordinator.handleAddPartitionsToTransaction(transactionalId, producerId, producerEpoch, Set.of(shareCoordinatorPartition), callback, TV_2, requestLocal)
- Then callback → Calls
txnCoordinator.handleAddPartitionsToTransactionto dynamically register__share_group_state-Nin__transaction_state. - persists to
__transaction_stateviaTransactionLogValue:topicPartitions = {output-A-0, __share_group_state-N}
- Then callback → Calls
- Resolves the state partition:
- State Change:
TransactionMetadatanow tracks bothtopicPartitions = {output-A-0, __share_group_state-N}. - Staging Phase: Only on success,
SharePartitionManagerstages the transactional acknowledgements, transitioning the internalInFlightStatetoTX_PENDING.
- Source Component:
- producer.commitTransaction()
- Source Component: Client ------->
TransactionCoordinator(Broker Core) - Action: Sends
EndTxnRequest(COMMIT). - Execution:
TxnCoordreads the topic partitions from state and resolves the physical leaders viaMetadataCache:output-A-0--------> Broker 3__share_group_state-N-----→ Broker 5
- Action: Dispatches
WriteTxnMarkersto both brokers.
- Source Component: Client ------->
- Each broker handles the transaction markers
- Execution on Broker 3 (Output Side):
- Appends the
COMMITcontrol batch to theoutput-A-0log. - KIP-1289 hook runs (no-op since no
TX_PENDINGhere)
- Appends the
- Execution on Broker 5 (Share Side):
- Appends the
COMMITcontrol batch to the__share_group_state-Nlog. - Fires the KIP-1289 hook: Triggers
sharePartitionManager.applyTxnMarker(COMMIT). - Scans the internal
partitionCache, finds the matchingTX_PENDINGflight states, and commits them toACKNOWLEDGED.
- Appends the
- Execution on Broker 3 (Output Side):
- Transaction Completion
- Action: Both brokers return successful ACKs back to
TxnCoord. - State Change:
TxnCoordtransitions the state toCOMPLETE_COMMIT. - Result: The client-side blocking
producer.commitTransaction()call returns successfully.
- Action: Both brokers return successful ACKs back to
[Note: Only TV 2 supporting - broker-side auto-registration and proper epoch fencing, do not want to maintain TV1 code. TV2 is already default option since 4.0]
__share_group_state schema unchanged
no ShareCoordinatorShard changes; no snapshot replay changes; no compatibility risk.
Inter-broker protocol unchanged
existing handleAddPartitionsToTransaction accepts arbitrary TopicPartition; no new code path on TxnCoord side
State machine additions:
- New transient state TX_PENDING
- On WriteTxnMarkers commit: TX_PENDING(ACCEPT) → ACKNOWLEDGED; same for RELEASE and REJECT.
- On WriteTxnMarkers abort: TX_PENDING(*) → back to ACQUIRED (lock continues; consumer can retry the work).
Pseudo code
// One-time setup
shareConsumer.subscribe(List.of("source-topic"));
producer.initTransactions(); // exactly once per producer instance
// CTP loop
while (running) {
ConsumerRecords<K, V> records = shareConsumer.poll(Duration.ofSeconds(5));
if (records.isEmpty()) continue;
try {
producer.beginTransaction();
Map<TopicIdPartition, Acknowledgements> acks = new HashMap<>();
for (ConsumerRecord<K, V> record : records) {
V output = process(record);
producer.send(new ProducerRecord<>("destination-topic", output));
acks.computeIfAbsent(
new TopicIdPartition(record.topicId(),
new TopicPartition(record.topic(), record.partition())),
k -> Acknowledgements.empty()
).add(record.offset(), AcknowledgeType.ACCEPT); // accumulate
}
// Compress acks to AcknowledgementBatch wire form and stage in the txn.
producer.sendShareAcknowledgementsToTransaction(
toBatches(acks), // Map<TopicIdPartition, List<AcknowledgementBatch>>
shareConsumer.shareGroupMetadata()
);
producer.commitTransaction(); // BLOCKS — true synchronisation point
// Records are now both produced AND acknowledged ATOMICALLY.
// Do NOT call shareConsumer.acknowledge(...) on these records.
// if called mistakenly then non-transactional acknowledge is rejected (TX_PENDING to ACKNOWLEDGED - InvalidRecordStateException); the transactional path works.
} catch (ProducerFencedException | UnsupportedVersionException fatal) {
throw fatal; // unrecoverable — kill the process
} catch (KafkaException abortable) {
producer.abortTransaction(); // BLOCKS — txn reverted; staged records revert to ACQUIRED on broker
// Records will be re-delivered on next poll(); retry naturally.
}
}
Corner cases
Here are your corner cases formatted using the same clean, un-phased, human-style layout. The wording, titles, technical specifications, and internal error codes remain exactly as provided.
1. Broker Crash During TX_PENDING (in-memory state)
- Scenario: broker stages TX_PENDING then crashes before marker arrives.
- Behavior: TX_PENDING is in-memory only (
__share_group_stateschema unchanged in v1); records replay as ACQUIRED/AVAILABLE after broker restart and are redelivered. - Guarantee: no data loss; at-least-once on broker-crash window (existing share-group baseline).
- Future: persistent TX_PENDING via extending
ShareSnapshotValue.DeliveryStatedeferred to a follow-up KIP.
2. TxnCoord Registration Fails (COORDINATOR_NOT_AVAILABLE, NOT_COORDINATOR)
- Scenario: broker-internal
AddPartitionsToTxnreturns retriable error. - Behavior: broker does NOT stage; returns error to producer; producer enters abortable state.
- Recovery: user catches
KafkaException→abortTransaction()→ retries.
3. Producer Fenced Mid-Stage (PRODUCER_FENCED, INVALID_PRODUCER_EPOCH)
- Scenario: zombie producer A (epoch 1) sends
TxnShareAcknowledgeafter producer B (epoch 2) has been initialized. - Behavior: TxnCoord registration rejects with fencing error → no staging happens → producer A becomes fatal.
- Late-arriving marker safety: per-
InFlightState(stagedProducerId,stagedProducerEpoch) filter rejects mismatched markers; idempotent.
5. Duplicate WriteTxnMarkers Delivery
- Scenario: TxnCoord retries marker after network failure; broker receives same marker twice.
- Behavior: second invocation finds state != TX_PENDING (already resolved) → no-op.
- Guarantee: idempotent by state filter — verified in
InFlightStateTxnTest.
6. Acquisition Lock Expiry While TX_PENDING
- Scenario: transaction stages records but stalls; lock duration expires before commit/abort.
- Behavior: lock timer detects TX_PENDING with expired lock → reverts to ACQUIRED → re-acquirable; subsequent marker for the stale staging is filtered out (epoch mismatch).
- Configuration:
group.share.record.lock.duration.ms(default 30s; per share group).
7. Transaction Timeout While TX_PENDING (transaction.timeout.ms)
- Scenario: producer crashes after staging; TxnCoord detects abandoned transaction via timeout.
- Behavior: TxnCoord auto-sends
WriteTxnMarkers(ABORT); broker reverts TX_PENDING → ACQUIRED → AVAILABLE on next lock cycle. No data loss; eventual redelivery.
8. Stale Member Epoch (STALE_MEMBER_EPOCH, UNKNOWN_MEMBER_ID)
- Scenario: rebalance changes
memberEpochbetweenshareGroupMetadata()snapshot andsendShareAcksToTransaction()call. - Behavior: broker compares request
memberEpochwith currentSharePartitionowner; mismatch → returnsSTALE_MEMBER_EPOCH; no staging. - Recovery: abortable; user retries the entire read-process-write loop with a fresh snapshot.
9. Mixed-Version Cluster During Rolling Upgrade
- Scenario: some brokers lack apiKey 93 (
TxnShareAcknowledge). - Behavior: producer probes
ApiVersions; if apiKey 93 absent on any broker →UnsupportedVersionExceptionthrown synchronously, no bytes on wire. - Recovery: user must abort the transaction (output-side
AddPartitionsToTxnstill needs cleanup). - Gating:
share.version=2finalized feature flag controls broker advertisement.
10. Invalid Acknowledge Type Inside Transaction
- Scenario: producer sends
AcknowledgeType.RELEASE(2),RENEW(4), orGAP(0) insidesendShareAcksToTransaction. - Behavior: broker rejects with
INVALID_RECORD_STATE; no staging. - Allowed values: only
ACCEPT(1) andREJECT(3) — release/renew/gap have no transactional semantics.
11. Partial Multi-Partition Stage Failure
- Scenario: producer stages on partitions A and B; A succeeds, B fails.
- Behavior: per-partition 2PC rollback — already-staged records on A are reverted to ACQUIRED before the response returns (commit 8 on the branch).
- Result: all-or-nothing per call; user sees error → aborts → retries.
- Guarantee: no partial commit; clean rollback semantics.
12. Leader Change for __share_group_state-N Between Stage and Commit
- Scenario: share-coordinator partition leadership moves from broker 5 to broker 7 between staging and
WriteTxnMarkersdispatch. - Behavior: TxnCoord re-resolves leader at commit time via
MetadataCache→ marker goes to broker 7; broker 7's in-memorypartitionCachedoes NOT contain the original TX_PENDING state. - Limitation in this intial version: TX_PENDING lost on share-coordinator leadership change (same root cause as Corner Case 1); lock-timeout reverts records; redelivery.
- Future: persistent TX_PENDING closes this case as well.
13. Transaction Aborted Without Markers Reaching Broker
- Scenario: transaction aborts but broker hosting TX_PENDING is partitioned from TxnCoord.
- Behavior: lock timeout (Corner Case 6) eventually reverts; subsequent late-arriving marker is no-op (state already AVAILABLE or epoch mismatched).
- Convergence guaranteed.
14. Concurrent Transactions From Same Producer (Different Epochs)
- Scenario: producer rapidly bumps epoch; old in-flight
TxnShareAcknowledgearrives with stale epoch. - Behavior: TxnCoord rejects with
PRODUCER_FENCED; broker never stages. Per-record fencing on the state machine prevents any stale marker from resolving newer staging.
Note: We can persist the record state and get rid of case 1 in follow up KIP.
Metrics
All new metrics follow the existing Kafka metric conventions (Yammer for broker JMX, KafkaMetric for client-side).
Metric names mirror the established kafka.server:type=group-coordinator-metrics,name=... and kafka.server:type=share-coordinator-metrics,name=... patterns.
Backward-compatible: no existing metrics renamed or removed.
| Module | Metric | Type | Purpose |
|---|---|---|---|
| Broker — SharePartitionManager | TxnPendingRecordsCount | Gauge | Current count of records in TX_PENDING (primary health signal) |
| Broker — SharePartitionManager | TxnShareAcknowledgeRequestLatencyMs | Histogram | p99 latency of staging requests |
| Broker — SharePartitionManager | TxnPendingLockExpiredCount | Counter | Any non-zero = abandoned txns or missing markers (critical alert) |
| Broker — TransactionCoordinator | TransactionPartitionsCount (existing) | Gauge | Reused; now includes __share_group_state-N entries |
| Producer | share-ack-txn-send-rate | Meter | EOS-call throughput |
| Producer | share-ack-txn-send-error-rate | Meter | Stage-failure rate (drives retry loops) |
| Consumer | share-group-metadata-fetch-rate | Meter | Confirms read-process-write loop is active |
| Total: 6 new + 1 reused = 7 metrics. Primary alert: TxnPendingRecordsCount > 0 sustained for > 60s. |
Compatibility, Deprecation, and Migration Plan
Enablement and Rollout Plan
Feature Gating
opt-in, online-upgradable, and zero-disruption for existing workloads - No new feature flag introduced. Uses existing kafka-features.sh machinery.
| Feature flag | Required level | Why |
|---|---|---|
| share.version | >= 2 | New level finalises KIP-1289; brokers below this advertise no apiKey <93> |
| transaction.version | >= 2 | KIP-1289 is TV2-only (inherits auto-registration and proper epoch fencing) |
Rolling Upgrade (Online; No Traffic Disruption)
Phase 1 — Software upgrade (rolling)
Upgrade brokers one at a time to the binary containing KIP-1289.
KIP-1289 code is dormant (share.version still 1).
Standard Kafka rolling-restart
Phase 2 — Soak timing
Verify cluster health
Phase 3 — Finalise feature
kafka-features.sh upgrade --feature share.version --version 2
- Online metadata propagation (update of __cluster_metadata)
- No restart, no rebalance, no leader change, no socket disruption
- Brokers begin advertising apiKey 93
Phase 4 — Client onboarding
Roll out applications calling sendShareAcknowledgementsToTransaction.
Add the alerts (esp. TxnPendingRecordsCount) and relevant metrics in dashboard & observability.
Cross-Version Client/Broker Matrix
| Client | Broker, share.version=2 | Broker, share.version=1 | Old broker (no binary) |
|---|---|---|---|
| New | Works | UnsupportedVersionException (synchronous, no wire bytes) | Same |
| Old | Unaffected | Unaffected | Unaffected |
Downgrade
1. Drain in-flight TX_PENDING:
- Halt producers calling sendShareAcknowledgementsToTransaction
- Wait for transaction.timeout.ms (default 60s) for any abandoned txns to clear
2. kafka-features.sh downgrade --feature share.version --version 1
Broker rejects the downgrade RPC if any in-memory TX_PENDING exists, preventing data inconsistency.
Software downgrade (binary): must follow feature downgrade; standard rolling restart.
Test Plan
Rough implementation [The actual implementation will be phasewise with multiple smaller PRs]:
https://github.com/apache/kafka/pull/22357
The verification strategy focuses on state machine integrity and fault tolerance under high-concurrency and failure scenarios.
Unit Tests: Validates state transitions (e.g.,
ACQUIREDtoACKNOWLEDGEDon commit vs.AVAILABLEon abort), idempotency of operations, and transaction timeout/auto-abort logic.Integration Tests: Focuses on end-to-end commit/abort flows, coordinator recovery, and multi-consumer behavior within a single group during network partitions.
System & Performance Tests: Benchmarks transactional vs. non-transactional modes and verifies exactly-once delivery.
Chaos Tests: Simulates broker and coordinator crashes specifically during critical phases like
PREPARE_COMMITto ensure protocol durability.
Follow-up KIP (deferred)
- Persistent TX_PENDING in __share_group_state for crash-resilient EOS on broker failover during staging (addresses Corner Cases).
- Per-share-partition metric for TX_PENDING residency time — would add a histogram of "time spent in TX_PENDING" useful for diagnosing slow producers; can be added in the persistence KIP without compatibility concerns.
- External processing engines or database as 2PC participants for write/sink records.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
2 Comments
Shekhar Rajak
Ref - producer side 2PC (Kafka as participant) KAFKA-15370 - Getting issue details... STATUS KIP: KIP-939: Support Participation in 2PC#2PCRefresher
Shekhar Rajak
This will help in https://cwiki.apache.org/confluence/x/JpU8G