Status

Current state: Accepted

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The OffsetCommit API has received several enhancements that were not applied to TxnOffsetCommit:

  • OffsetCommit v10 uses topic IDs instead of topic names (KIP-848).
  • OffsetCommit v9+ returns GROUP_ID_NOT_FOUND when the group does not exist and STALE_MEMBER_EPOCH when the member epoch is stale under the new consumer group protocol (KIP-848).

Meanwhile, TxnOffsetCommit still relies on topic names only, hardcodes Uuid.ZERO_UUID when persisting offsets in OffsetCommitValue records, and maps both GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH to ILLEGAL_GENERATION.

This KIP introduces TxnOffsetCommit v6 to close these gaps.

Public Interfaces

TxnOffsetCommit Request (v6)

Version 6 adds topic IDs (replacing topic names), renames GenerationId to GenerationIdOrMemberEpoch, and enables three new
error codes. Changes from v5 are marked with `// NEW` or `// CHANGED`:

{
  "apiKey": 28,
  "type": "request",
  "listeners": ["broker"],
  "name": "TxnOffsetCommitRequest",
  // ...
  // Version 6 adds support for topic IDs and removes support for
  // topic names. It also returns GROUP_ID_NOT_FOUND when the group
  // does not exist and STALE_MEMBER_EPOCH when the member epoch is
  // stale under the new consumer group protocol (KIP-1319).
  "validVersions": "0-6",                                    // CHANGED
  "flexibleVersions": "3+",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+",
      "entityType": "transactionalId",
      "about": "The ID of the transaction." },
    { "name": "GroupId", "type": "string", "versions": "0+",
      "entityType": "groupId",
      "about": "The ID of the group." },
    { "name": "ProducerId", "type": "int64", "versions": "0+",
      "entityType": "producerId",
      "about": "The current producer ID in use by the transactional ID." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "The current epoch associated with the producer ID." },
    { "name": "GenerationIdOrMemberEpoch", "type": "int32",  // CHANGED
      "versions": "3+", "default": "-1",
      "about": "The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol." },
    { "name": "MemberId", "type": "string", "versions": "3+",
      "default": "",
      "about": "The member ID assigned by the group coordinator." },
    { "name": "GroupInstanceId", "type": "string",
      "versions": "3+", "nullableVersions": "3+", "default": "null",
      "about": "The unique identifier of the consumer instance provided by end user." },
    { "name": "Topics", "type": "[]TxnOffsetCommitRequestTopic",
      "versions": "0+",
      "about": "Each topic that we want to commit offsets for.",
      "fields": [
        { "name": "Name", "type": "string",
          "versions": "0-5",                                  // CHANGED
          "entityType": "topicName", "ignorable": true,
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid",                  // NEW
          "versions": "6+", "ignorable": true,
          "about": "The topic ID." },
        { "name": "Partitions",
          "type": "[]TxnOffsetCommitRequestPartition",
          "versions": "0+",
          "about": "The partitions inside the topic that we want to commit offsets for.",
          "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+" },
            { "name": "CommittedOffset", "type": "int64", "versions": "0+" },
            { "name": "CommittedLeaderEpoch", "type": "int32",
              "versions": "2+", "default": "-1", "ignorable": true },
            { "name": "CommittedMetadata", "type": "string",
              "versions": "0+", "nullableVersions": "0+" }
          ]
        }
      ]
    }
  ]
}

Key changes:

  • Name bounded to 0-5, TopicId added at 6+  This follows the clean break pattern used by OffsetCommit v10, Fetch v13, and Produce v13.
  • GenerationId renamed to GenerationIdOrMemberEpoch. Aligns with OffsetCommitRequest. Source-level only; no wire format change.

TxnOffsetCommit Response (v6)

The response mirrors the request: v6 returns topic IDs instead of topic names.

{
  "apiKey": 28,
  "type": "response",
  "name": "TxnOffsetCommitResponse",
  // ...
  // Version 6 adds support for topic IDs and removes support for
  // topic names. It can also return GROUP_ID_NOT_FOUND and
  // STALE_MEMBER_EPOCH (KIP-1319).
  "validVersions": "0-6",                                    // CHANGED
  "flexibleVersions": "3+",
  // Supported errors:
  // ...
  // - GROUP_ID_NOT_FOUND (version 6+)                       // NEW
  // - STALE_MEMBER_EPOCH (version 6+)                       // NEW
  // - UNKNOWN_TOPIC_ID (version 6+)                         // NEW
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+" },
    { "name": "Topics",
      "type": "[]TxnOffsetCommitResponseTopic", "versions": "0+",
      "about": "The responses for each topic.", "fields": [
        { "name": "Name", "type": "string",
          "versions": "0-5",                                  // CHANGED
          "entityType": "topicName", "ignorable": true,
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid",                  // NEW
          "versions": "6+", "ignorable": true,
          "about": "The topic ID." },
        { "name": "Partitions",
          "type": "[]TxnOffsetCommitResponsePartition",
          "versions": "0+", "fields": [
            { "name": "PartitionIndex", "type": "int32", "versions": "0+" },
            { "name": "ErrorCode", "type": "int16", "versions": "0+" }
          ]
        }
      ]
    }
  ]
}

New error codes:

Error CodeWhen ReturnedPrior Behavior (v0-5)
GROUP_ID_NOT_FOUNDThe group does not exist and the request includes group membership information (`generationId >= 0`).Mapped to ILLEGAL_GENERATION.
STALE_MEMBER_EPOCHThe member epoch is stale under the new consumer group protocol.Mapped to ILLEGAL_GENERATION.
UNKNOWN_TOPIC_ID The topic ID cannot be resolved by the broker.N/A (topic names used).

For v0-5, all existing error mappings are preserved.

Proposed Changes

Broker-Side

  • Topic ID resolution. For v6 requests, the broker resolves topic IDs to names via the metadata cache before authorization and partition validation. This follows the same pattern as OffsetCommit v10. The per-partition error code returned for an invalid topic is determined in this order:
    • If the topic ID cannot be resolved by the broker, return UNKNOWN_TOPIC_ID for all partitions of that topic.
    • If the client is not authorized for the resolved topic name, return TOPIC_AUTHORIZATION_FAILED for all partitions of that topic.
    • If a requested partition does not exist, return UNKNOWN_TOPIC_OR_PARTITION for that partition only.
  • Error mapping. For v6+, GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are returned directly instead of being mapped to ILLEGAL_GENERATION. For v0-5, the existing mapping is preserved.
  • Persisting topic IDs. The broker populates the existing topicId tagged field in OffsetCommitValue v4 with the real topic ID instead of Uuid.ZERO_UUID. No new record schema is introduced.

Producer-Side

  • sendOffsetsToTransaction API unchanged. The public API does not change.
  • Topic ID resolution. When sendOffsetsToTransaction  is called and the broker supports v6, the producer adds the topics from the offset map to its metadata cache and waits for a refresh (bounded by max.block.ms) before building the request with the resolved topic IDs. No explicit cleanup is performed: the producer's metadata cache already evicts topics that have been idle for longer than metadata.max.idle.ms (default 5 minutes), which gives transient semantics for free. Topics that are committed repeatedly stay cached, which avoids a metadata wait on subsequent commits.
  • Fallback. If the broker supports v6 but any topic in the offset map has an unavailable topic ID, the producer falls back to v5 for that request (all-or-nothing, same pattern as OffsetCommit's canUseTopicIds). In practice, this should never trigger since Transaction V2 requires KRaft, which always assigns topic IDs.
  • Error handling. GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are treated as abortable errors. The application must abort the transaction. UNKNOWN_TOPIC_ID is treated as a retriable error consistent with UNKNOWN_TOPIC_OR_PARTITION. Since UnknownTopicIdException is a RetriableException, it falls into the existing retriable error handling path: the partition stays in pendingTxnOffsetCommits and the handler re-enqueues the request.
  • Response mapping. For v6 responses (keyed by topic ID), the handler uses the topic ID to name mapping built during request construction to correlate responses back to TopicPartition.

Compatibility, Deprecation, and Migration Plan

  • Version negotiation. v0-4 require no Transaction V2. v5 requires Transaction V2. v6 requires Transaction V2 and uses topic IDs with a fallback to v5.
  • Rolling upgrades. No new record schema is introduced. The existing OffsetCommitValue v4 topicId tagged field (`ignorable: true`) is populated with real topic IDs by new brokers and ZERO_UUID by old brokers.
  • Backward compatibility. Clients using v0-5 are unaffected. Brokers that do not support v6 negotiate a lower version.

Test Plan

The changes are covered by unit tests, integration tests, and system tests.

Rejected Alternatives

  • Splitting Into Two Versions (Error Codes + Topic IDs). OffsetCommit split error codes (v9) and topic IDs (v10) because v9 could be used without KRaft. For TxnOffsetCommit, v5+ already requires Transaction V2 (which requires KRaft), so any broker
    supporting v6 necessarily supports both capabilities. Splitting would add protocol complexity without practical benefit.
  • Keeping Topic Names Alongside Topic IDs in v6. The established Kafka convention s a clean break: Name in versions 0-N, TopicId in N+1 onward. Sending both adds redundancy and ambiguity.




  • No labels