DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: https://issues.apache.org/jira/browse/KAFKA-20191
Motivation
KafkaProducer.sendOffsetsToTransaction() is designed to atomically commit consumer offsets as part of a producer transaction, enabling Exactly-Once Semantics (EOS) in consume-transform-produce pipelines. The intended semantics are clear: a consumer member should only commit offsets for partitions that are actually assigned to it.
However, the broker currently performs no such validation. After verifying the member's epoch, the server accepts transactional offset commits for any partition, regardless of whether that partition is assigned to the requesting member. This creates concrete problem:
Malicious manipulation: A member with a valid epoch can deliberately submit offsets for partitions assigned to other members. In the following scenario:
Member A — owns partition 0
Member B — owns partition 1
Member B calls:
producer.sendOffsetsToTransaction(
{partition 0 → offset 9999}, // partition 0 is NOT assigned to B
consumer.groupMetadata() // B has a valid epoch
);
The broker currently accepts this request. Member A will subsequently read from offset 9999, silently skipping all records between its last committed offset and 9999. This violates the isolation guarantee that EOS is supposed to provide.
Public Interfaces
This KIP does not change any public client-side interfaces, but bump the following RPC versions:
TxnOffsetCommitRequest
TxnOffsetCommitResponse
Also add new error code:
- UNASSIGNED_PARTITION (code 134)
Proposed Changes
This KIP builds on top of KIP-1251, which introduced a per-partition assignment epoch tracked on the broker. For each partition owned by a member, the broker records the epoch at which that partition was assigned. This mechanism directly encodes assignment information in epoch form, enabling a unified validation rule:
assignmentEpoch(partition) ≤ clientEpoch ≤ brokerEpochThis single check naturally handles all cases:
| Partition state | currentPartitionEpoch | Result |
|---|---|---|
| Not assigned to anyone | -1 | STALE_MEMBER_EPOCH |
| Assigned to a different member | Reflects the other member's epoch, not this member's | STALE_MEMBER_EPOCH |
| Assigned to this member, epoch current | ≤ clientEpoch | Accepted |
| Assigned to this member, heartbeat bumped broker epoch | ≤ clientEpoch ≤ brokerEpoch | Accepted |
RPC changes
TxnOffsetCommitRequest and TxnOffsetCommitResponse are both bumped to version 6. Clients using version 6 or above opt into the new validation semantics.
{
"apiKey": 28,
"type": "request",
"listeners": ["broker"],
"name": "TxnOffsetCommitRequest",
// skip
//
// Version 6 adds server-side validation that the committed partitions are assigned to the requesting member.
// Committing offsets for unassigned partitions returns UNASSIGNED_PARTITION per partition.
"validVersions": "0-6",
{
"apiKey": 28,
"type": "response",
"name": "TxnOffsetCommitResponse",
// skip
// Version 6 adds support for new per-partition error code UNASSIGNED_PARTITION, returned when
// a member attempts to commit offsets for a partition not assigned to it.
"validVersions": "0-6",
Compatibility, Deprecation, and Migration Plan
This change introduces stricter server-side validation, but existing clients are fully protected by the API version gate.
Existing clients using TxnOffsetCommit version 5 or below are not affected. The broker preserves the pre-existing behavior for all requests on older versions, so no existing application will see any change in behavior after upgrading the broker.
New clients that explicitly send version 6 opt into the new validation semantics. Correctly implemented EOS applications — which should only commit offsets for partitions they currently own — will not be affected. Buggy applications that commit offsets for unassigned partitions will now receive STALE_MEMBER_EPOCH per-partition rather than silently succeeding, which makes the bug visible and easier to diagnose.
Test Plan
- All existing regression tests must pass.
- New unit tests and integration tests should cover scenarios involving invalid cases.
Rejected Alternatives
n/a