DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The OffsetCommit API has received several enhancements that were not applied to TxnOffsetCommit:
OffsetCommitv10 uses topic IDs instead of topic names (KIP-848).OffsetCommitv9+ returnsGROUP_ID_NOT_FOUNDwhen the group does not exist andSTALE_MEMBER_EPOCHwhen the member epoch is stale under the new consumer group protocol (KIP-848).
Meanwhile, TxnOffsetCommit still relies on topic names only, hardcodes Uuid.ZERO_UUID when persisting offsets in OffsetCommitValue records, and maps both GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH to ILLEGAL_GENERATION.
This KIP introduces TxnOffsetCommit v6 to close these gaps.
Public Interfaces
TxnOffsetCommit Request (v6)
Version 6 adds topic IDs (replacing topic names), renames GenerationId to GenerationIdOrMemberEpoch, and enables three new
error codes. Changes from v5 are marked with `// NEW` or `// CHANGED`:
{
"apiKey": 28,
"type": "request",
"listeners": ["broker"],
"name": "TxnOffsetCommitRequest",
// ...
// Version 6 adds support for topic IDs and removes support for
// topic names. It also returns GROUP_ID_NOT_FOUND when the group
// does not exist and STALE_MEMBER_EPOCH when the member epoch is
// stale under the new consumer group protocol (KIP-1319).
"validVersions": "0-6", // CHANGED
"flexibleVersions": "3+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+",
"entityType": "transactionalId",
"about": "The ID of the transaction." },
{ "name": "GroupId", "type": "string", "versions": "0+",
"entityType": "groupId",
"about": "The ID of the group." },
{ "name": "ProducerId", "type": "int64", "versions": "0+",
"entityType": "producerId",
"about": "The current producer ID in use by the transactional ID." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "The current epoch associated with the producer ID." },
{ "name": "GenerationIdOrMemberEpoch", "type": "int32", // CHANGED
"versions": "3+", "default": "-1",
"about": "The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol." },
{ "name": "MemberId", "type": "string", "versions": "3+",
"default": "",
"about": "The member ID assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string",
"versions": "3+", "nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end user." },
{ "name": "Topics", "type": "[]TxnOffsetCommitRequestTopic",
"versions": "0+",
"about": "Each topic that we want to commit offsets for.",
"fields": [
{ "name": "Name", "type": "string",
"versions": "0-5", // CHANGED
"entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", // NEW
"versions": "6+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions",
"type": "[]TxnOffsetCommitRequestPartition",
"versions": "0+",
"about": "The partitions inside the topic that we want to commit offsets for.",
"fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+" },
{ "name": "CommittedOffset", "type": "int64", "versions": "0+" },
{ "name": "CommittedLeaderEpoch", "type": "int32",
"versions": "2+", "default": "-1", "ignorable": true },
{ "name": "CommittedMetadata", "type": "string",
"versions": "0+", "nullableVersions": "0+" }
]
}
]
}
]
}
Key changes:
Namebounded to0-5,TopicIdadded at6+This follows the clean break pattern used byOffsetCommitv10,Fetchv13, andProducev13.GenerationIdrenamed toGenerationIdOrMemberEpoch. Aligns withOffsetCommitRequest. Source-level only; no wire format change.
TxnOffsetCommit Response (v6)
The response mirrors the request: v6 returns topic IDs instead of topic names.
{
"apiKey": 28,
"type": "response",
"name": "TxnOffsetCommitResponse",
// ...
// Version 6 adds support for topic IDs and removes support for
// topic names. It can also return GROUP_ID_NOT_FOUND and
// STALE_MEMBER_EPOCH (KIP-1319).
"validVersions": "0-6", // CHANGED
"flexibleVersions": "3+",
// Supported errors:
// ...
// - GROUP_ID_NOT_FOUND (version 6+) // NEW
// - STALE_MEMBER_EPOCH (version 6+) // NEW
// - UNKNOWN_TOPIC_ID (version 6+) // NEW
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
{ "name": "Topics",
"type": "[]TxnOffsetCommitResponseTopic", "versions": "0+",
"about": "The responses for each topic.", "fields": [
{ "name": "Name", "type": "string",
"versions": "0-5", // CHANGED
"entityType": "topicName", "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", // NEW
"versions": "6+", "ignorable": true,
"about": "The topic ID." },
{ "name": "Partitions",
"type": "[]TxnOffsetCommitResponsePartition",
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+" }
]
}
]
}
]
}
New error codes:
| Error Code | When Returned | Prior Behavior (v0-5) |
|---|---|---|
GROUP_ID_NOT_FOUND | The group does not exist and the request includes group membership information (`generationId >= 0`). | Mapped to ILLEGAL_GENERATION. |
STALE_MEMBER_EPOCH | The member epoch is stale under the new consumer group protocol. | Mapped to ILLEGAL_GENERATION. |
UNKNOWN_TOPIC_ID | The topic ID cannot be resolved by the broker. | N/A (topic names used). |
For v0-5, all existing error mappings are preserved.
Proposed Changes
Broker-Side
- Topic ID resolution. For v6 requests, the broker resolves topic IDs to names via the metadata cache before authorization and partition validation. This follows the same pattern as
OffsetCommitv10. The per-partition error code returned for an invalid topic is determined in this order:- If the topic ID cannot be resolved by the broker, return
UNKNOWN_TOPIC_IDfor all partitions of that topic. - If the client is not authorized for the resolved topic name, return
TOPIC_AUTHORIZATION_FAILEDfor all partitions of that topic. - If a requested partition does not exist, return
UNKNOWN_TOPIC_OR_PARTITIONfor that partition only.
- If the topic ID cannot be resolved by the broker, return
- Error mapping. For v6+,
GROUP_ID_NOT_FOUNDandSTALE_MEMBER_EPOCHare returned directly instead of being mapped toILLEGAL_GENERATION. For v0-5, the existing mapping is preserved. - Persisting topic IDs. The broker populates the existing
topicIdtagged field inOffsetCommitValuev4 with the real topic ID instead ofUuid.ZERO_UUID. No new record schema is introduced.
Producer-Side
sendOffsetsToTransactionAPI unchanged. The public API does not change.- Topic ID resolution. When
sendOffsetsToTransactionis called and the broker supports v6, the producer adds the topics from the offset map to its metadata cache and waits for a refresh (bounded bymax.block.ms) before building the request with the resolved topic IDs. No explicit cleanup is performed: the producer's metadata cache already evicts topics that have been idle for longer thanmetadata.max.idle.ms(default 5 minutes), which gives transient semantics for free. Topics that are committed repeatedly stay cached, which avoids a metadata wait on subsequent commits. - Fallback. If the broker supports v6 but any topic in the offset map has an unavailable topic ID, the producer falls back to v5 for that request (all-or-nothing, same pattern as
OffsetCommit'scanUseTopicIds). In practice, this should never trigger since Transaction V2 requires KRaft, which always assigns topic IDs. - Error handling.
GROUP_ID_NOT_FOUNDandSTALE_MEMBER_EPOCHare treated as abortable errors. The application must abort the transaction.UNKNOWN_TOPIC_IDis treated as a retriable error consistent withUNKNOWN_TOPIC_OR_PARTITION. SinceUnknownTopicIdExceptionis aRetriableException, it falls into the existing retriable error handling path: the partition stays inpendingTxnOffsetCommitsand the handler re-enqueues the request. - Response mapping. For v6 responses (keyed by topic ID), the handler uses the topic ID to name mapping built during request construction to correlate responses back to
TopicPartition.
Compatibility, Deprecation, and Migration Plan
- Version negotiation. v0-4 require no Transaction V2. v5 requires Transaction V2. v6 requires Transaction V2 and uses topic IDs with a fallback to v5.
- Rolling upgrades. No new record schema is introduced. The existing
OffsetCommitValuev4topicIdtagged field (`ignorable: true`) is populated with real topic IDs by new brokers andZERO_UUIDby old brokers. - Backward compatibility. Clients using v0-5 are unaffected. Brokers that do not support v6 negotiate a lower version.
Test Plan
The changes are covered by unit tests, integration tests, and system tests.
Rejected Alternatives
- Splitting Into Two Versions (Error Codes + Topic IDs).
OffsetCommitsplit error codes (v9) and topic IDs (v10) because v9 could be used without KRaft. ForTxnOffsetCommit, v5+ already requires Transaction V2 (which requires KRaft), so any broker
supporting v6 necessarily supports both capabilities. Splitting would add protocol complexity without practical benefit. - Keeping Topic Names Alongside Topic IDs in v6. The established Kafka convention s a clean break:
Namein versions 0-N,TopicIdin N+1 onward. Sending both adds redundancy and ambiguity.