Status

Current stateUnder Discussion

Discussion thread: here

JIRA: https://issues.apache.org/jira/browse/KAFKA-20191

Motivation

KafkaProducer.sendOffsetsToTransaction()  is designed to atomically commit consumer offsets as part of a producer transaction, enabling Exactly-Once Semantics (EOS) in consume-transform-produce pipelines. The intended semantics are clear: a consumer member should only commit offsets for partitions that are actually assigned to it.

However, the broker currently performs no such validation. After verifying the member's epoch, the server accepts transactional offset commits for any partition, regardless of whether that partition is assigned to the requesting member. This creates concrete problem:

Malicious manipulation: A member with a valid epoch can deliberately submit offsets for partitions assigned to other members. In the following scenario:

Member A — owns partition 0
Member B — owns partition 1

Member B calls:
producer.sendOffsetsToTransaction(
  {partition 0 → offset 9999},   // partition 0 is NOT assigned to B
  consumer.groupMetadata()       // B has a valid epoch
);

The broker currently accepts this request. Member A will subsequently read from offset 9999, silently skipping all records between its last committed offset and 9999. This violates the isolation guarantee that EOS is supposed to provide.

Public Interfaces

This KIP does not change any public client-side interfaces, but bump the following RPC versions:

  • TxnOffsetCommitRequest
  • TxnOffsetCommitResponse

Also add new error code:

  • UNASSIGNED_PARTITION (code 134)

Proposed Changes

This KIP builds on top of KIP-1251, which introduced a per-partition assignment epoch tracked on the broker. For each partition owned by a member, the broker records the epoch at which that partition was assigned. This mechanism directly encodes assignment information in epoch form, enabling a unified validation rule:

assignmentEpoch(partition) ≤ clientEpoch ≤ brokerEpoch

This single check naturally handles all cases:

Partition statecurrentPartitionEpochResult
Not assigned to anyone-1STALE_MEMBER_EPOCH
Assigned to a different memberReflects the other member's epoch, not this member'sSTALE_MEMBER_EPOCH
Assigned to this member, epoch current≤ clientEpochAccepted
 Assigned to this member, heartbeat bumped broker epoch≤ clientEpoch ≤ brokerEpochAccepted

RPC changes

TxnOffsetCommitRequest and TxnOffsetCommitResponse are both bumped to version 6. Clients using version 6 or above opt into the new validation semantics.

{
  "apiKey": 28,
  "type": "request",
  "listeners": ["broker"],
  "name": "TxnOffsetCommitRequest",
  // skip
  //
  // Version 6 adds server-side validation that the committed partitions are assigned to the requesting member.
  // Committing offsets for unassigned partitions returns UNASSIGNED_PARTITION per partition.
  "validVersions": "0-6",
{
  "apiKey": 28,
  "type": "response",
  "name": "TxnOffsetCommitResponse",
  // skip
  // Version 6 adds support for new per-partition error code UNASSIGNED_PARTITION, returned when
  // a member attempts to commit offsets for a partition not assigned to it.
  "validVersions": "0-6",

Compatibility, Deprecation, and Migration Plan

This change introduces stricter server-side validation, but existing clients are fully protected by the API version gate.

Existing clients using TxnOffsetCommit version 5 or below are not affected. The broker preserves the pre-existing behavior for all requests on older versions, so no existing application will see any change in behavior after upgrading the broker.

New clients that explicitly send version 6 opt into the new validation semantics. Correctly implemented EOS applications — which should only commit offsets for partitions they currently own — will not be affected. Buggy applications that commit offsets for unassigned partitions will now receive STALE_MEMBER_EPOCH per-partition rather than silently succeeding, which makes the bug visible and easier to diagnose.

Test Plan

  • All existing regression tests must pass.
  • New unit tests and integration tests should cover scenarios involving invalid cases.

Rejected Alternatives

n/a

  • No labels