Status

Current state: Accepted

Discussion thread: here

JIRA: KAFKA-19446 - Getting issue details... STATUS

Motivation

Kafka’s transactional model enables exactly-once semantics (EOS) by having the coordinator manage commits and aborts, while partition leaders apply them through control records. However, late or duplicate EndTransaction markers have historically posed a correctness risk, as leaders could not reliably distinguish them from markers belonging to the active transaction. With Transaction Version 2 (TV2) and its epoch-bump contract, we can finally close this long-standing gap and reinforce exactly-once guarantees.

The introduction of KIP-890 brought a protocol change with Transaction Version 2 (TV2), where the transaction coordinator always increments the producer epoch by one (+1) before writing the final transaction marker (commit or abort). This change tightened the contract between the coordinator and partition leaders: a valid TV2 marker must have an epoch strictly greater than the producer’s current epoch at the leader.

To enforce this contract, the marker must be validated at the time it is written to the partition logs. When a client issues an EndTxnRequest, the coordinator determines the outcome (commit or abort) and then sends WriteTxnMarkersRequest messages to all the partition leaders involved in the transaction. Upon receiving these requests, each leader performs validation during the log append step, comparing the marker’s epoch against its local producer state before appending the EndTxn control record. Currently, the check looks like this:

// ProducerAppendInfo.java – appendEndTxnMarker()
private void checkProducerEpoch(short producerEpoch, long offset) {             
	if (producerEpoch < updatedEntry.producerEpoch()) {
        String message = "Epoch of producer " + producerId + " at offset " + offset + " in " + topicPartition +
        " is " + producerEpoch + ", " + "which is smaller than the last seen epoch " + updatedEntry.producerEpoch();

        if (origin == AppendOrigin.REPLICATION) {
        	log.warn(message);
        } else {
        	// Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the
            // producer send response callback to differentiate from the former fatal exception,
            // letting client abort the ongoing transaction and retry.
            throw new InvalidProducerEpochException(message);
        }
    } 
}

This check accepts markers when producerEpoch >= currentProducerEpoch.

Under legacy transaction versions (TV0 and TV1) this was expected behavior: EndTxn markers were written with the same epoch as the transactional records, so equality matched the intended case. However, it also created a correctness gap. Because the coordinator did not bump the epoch at EndTxn time, leaders could not distinguish between an active marker and a stale one. If a duplicate marker arrived after a new transaction had already begun with the same epoch, the leader would treat it as valid and could mistakenly commit or abort records from the newer transaction. This threatens our EOS guarantees.

In other words, late-arriving markers have always been a potential problem in TV0/TV1 — the system simply lacked the means to distinguish between the late markers and the markers belonging to the active transaction.

With TV2, the semantics change. The coordinator always bumps the producer epoch before sending the final marker, establishing a clear invariant: a valid EndTxn marker must have producerEpoch == current + 1 at the leader. Any marker arriving with equality (producerEpoch == current) can now be safely identified as late or duplicate and rejected. This closes a long-standing gap in transaction handling and prevents scenarios where multiple transactions could be conflated under the same epoch.

The solution is simple, it just requires that leaders know which transaction version applies to each marker so they can apply the right validation logic. Today they do not — the same relaxed check is applied universally. The core of this KIP is therefore to make the transaction version explicit in the WriteTxnMarkersRequest, enabling leaders to enforce the correct validation rule:

  • Legacy TV (TV0 / TV1) : accept markerEpoch >= currentProducerEpoch.

  • >=TV2: require markerEpoch > currentProducerEpoch (equality indicates a late or duplicate marker and must be rejected).

This approach preserves the legacy behavior and backward compatibility, while ensuring that TV2 leaders enforce the intended epoch invariant and strengthen exactly-once guarantees.

Public Interfaces

We will bump the WriteTxnMarkersRequest API to version 2 and introduce a new field, TransactionVersion, which will enable the transaction coordinator to pass version information to the partition leaders.

WriteTxnMarkersRequest (apiKey: 27)

{
  "apiKey": 27,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteTxnMarkersRequest",
  "validVersions": "1-2",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "Markers", "type": "[]WritableTxnMarker", "versions": "0+",
      "about": "The transaction markers to be written.", "fields": [
      { "name": "ProducerId", "type": "int64", "versions": "0+",
        "entityType": "producerId", "about": "The current producer ID." },
      { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
        "about": "The current epoch associated with the producer ID." },
      { "name": "TransactionResult", "type": "bool", "versions": "0+",
        "about": "The result (false = ABORT, true = COMMIT)." },
      { "name": "Topics", "type": "[]WritableTxnMarkerTopic", "versions": "0+",
        "about": "Each topic to write markers for.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "entityType": "topicName", "about": "The topic name." },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
          "about": "Partition indexes to write markers for." }
      ]},
      { "name": "CoordinatorEpoch", "type": "int32", "versions": "0+",
        "about": "Epoch of the transaction state partition hosting this coordinator." },
      // --------- NEW FIELD (TV) ADDED BELOW --------
      { "name": "TransactionVersion", "type": "int8", "versions": "2+", "ignorable": true,
        "about": "Transaction version: 0/1 = legacy (TV0/TV1), 2 = TV2.", "default": "0" }
    ]}
  ]
}

Proposed Changes

Coordinator Changes

When processing an EndTxnRequest, the coordinator already determines the transaction version (legacy or TV2) and stores it in the transaction’s TransactionMetadata.

With this KIP, the coordinator will propagate the transaction version to partition leaders by including a new TransactionVersion field when building a WriteTxnMarkersRequest. This field is only included if the target broker supports request version 2; otherwise, the coordinator falls back to version 1, which omits the field. This allows leaders to apply the correct epoch validation rule while maintaining compatibility across mixed-version clusters.

Broker/Leader Changes

Leaders must apply strict validation when TV is known to be >=TV2:

// Pseudocode inside appendEndTxnMarker() or where checkProducerEpoch is called:

short current = updatedEntry.producerEpoch();
short marker = markerProducerEpoch;          
// Check if TransactionVersion field is available (version 2+)
int txnVersion = (request.version >= 2) ? request.transactionVersion() : 1;  

if (txnVersion >= 2) {
    // TV2: coordinator bumps epoch before marker; duplicates carry old epoch.
    // Accept only strictly greater epoch.
    if (marker <= current) {
        throw new InvalidProducerEpochException("Reject late/dup TV2 marker: " +
            "markerEpoch=" + marker + " <= currentEpoch=" + current);
    }
} else {
    // Legacy behavior
    if (marker < current) {
        throw new InvalidProducerEpochException("Marker epoch < current.");
    }
}


Compatibility, Deprecation, and Migration Plan

Clients

Clients require no changes. They continue to issue EndTxnRequest as before. With brokers that support this KIP, leaders apply version-aware validation: the legacy rule for TV0/TV1 and the stricter rule for versions greater than or equal to TV2 . This ensures stronger exactly-once guarantees without altering client behavior.

New Brokers

New brokers will support WriteTxnMarkersRequest v2, which includes the TransactionVersion field.

When processing requests, brokers will apply the appropriate validation rule based on the field value:

  • TV0 / TV1 (value = 0 or 1)                     →   markerEpoch >= currentProducerEpoch (legacy validation)

  • TV2 and future TV's (value >= 2)       →   markerEpoch > currentProducerEpoch (strict validation)

Version negotiation between brokers happens through the standard ApiVersionsRequest handshake. The transaction coordinator sends a WriteTxnMarkersRequest v2 only when both the coordinator and the target partition leader support version 2. This maintains compatibility in mixed-version clusters and ensures that stricter validation is applied only when both sides support the new protocol.

Old Brokers

Old brokers that only support WriteTxnMarkersRequest v1 will continue to use the legacy validation rule (markerEpoch >= currentProducerEpoch) for all requests. This behavior remains correct for legacy transactions but does not close the gap for TV2.

If a coordinator supports version 2 but a partition leader only supports version 1, the coordinator will automatically fall back to sending a version 1 request. This ensures smooth interoperability and maintains backward compatibility.

New Transaction Versions

Future transaction versions will also be subjected to a strict validation by default. If a newer version ever changes or removes the epoch bump behavior introduced in TV2, the broker logic should be updated accordingly. Applying strict validation to all versions ≥ TV2 keeps the behavior forward compatible and ensures the issue remains permanently resolved.

Test Plan

Integration testing will be done to test the various writeTxnMarker request scenarios like testing behavior when stale markers arrive with both TV1 and TV2.

Rejected Alternatives

  1. Depend on VerificationStateEntry.supportsEpochBump(): This method indicates whether an epoch bump has occurred, and in theory could help leaders distinguish between TV1 and TV2. However, its state is cleared after the first record batch is written. This leaves a protection gap for late or duplicate markers, which may still arrive after the state has been reset. Because it is not reliable as a persistent signal of the transaction version, it cannot help in enforcing stricter validation.

  2. Store transaction version in producer state entry: Enables explicit version tracking but adds storage overhead and requires bumping the record format, increasing complexity for little gain.

  3. Use a tagged field in the WriteTxnMarkersRequest : The tagged field approach provides a simple way to add a new field while maintaining backward compatibility. However, because version negotiation already occurs through the ApiVersionsRequest, backward compatibility is equally preserved with an API version bump. Kafka generally prefers version bumps for API evolution, as they make protocol changes explicit, allow the removal of deprecated versions over time, and provide clear visibility into which request versions clients and brokers are using.



  • No labels