For a share group, the group coordinator persists two three kinds of records:

  • ConsumerGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members. However, the members of the groups and their assignments are not persisted . This means that existing members will have to rejoin the share group following a group coordinator failoverand will be recalculated by the new group coordinator.

The SimpleShareAssignor

This KIP introduces . It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records.


When a batch of records is first read from the log and added to the in-flight records for a share-partition, the broker does not know whether the set of records between the batch’s base offset and the last offset contains any gaps, as might occur for example as a result of log compaction. When the broker does not know which offsets correspond to records, the batch is considered an unmaterialized record batch. Rather than forcing the broker to iterate through all of the records in all cases, which might require decompressing every batch, the broker can send sends unmaterialized record batches to consumers. It initially assumes that all offsets between the base offset and the last offset correspond to records. When the consumer processes the batch, it may find gaps and it reports these using the ShareFetch or ShareAcknowledge API. This means that the presence of unmaterialized record batches containing gaps might temporarily inflate the number of in-flight records, but this will be resolved by the consumer acknowledgements.

The share-partition leader clearly has to look within the data returned from the replica manager in order to understand the record batches it fetches. This means that records retrieved using a share group are not able to benefit from the zero-copy optimisation.

By iterating over the record batches but not iterating over the individual records within, the share-partition leader is able to understand the log without having to decompress the records. There is one exception to this and that is to do with reading the transaction end markers as described in the next section.


The component which manages durable share-partition state is called the share-group state persister. Eventually, this could be a pluggable component which implements a new interface . For now, it is not pluggable, and Kafka includes a built-in default implementation . This section describes the behavior of this built-in persister.

When records are acknowledged using the ShareFetch  or ShareAcknowledge  RPCs, durable state will need to be written by the persister. The share-partition leader serving these requests will wait for the persister to complete writing the durable state before responding.

The share coordinator and the share-group state topic


The share-partition leader and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState , ReadShareGroupState , WriteShareGroupState , and DeleteShareGroupState . The share-partition leader uses the FindCoordinator RPC to find its share coordinator, with a new key_type ( FindCoordinatorRequest.CoordinatorType.SHARE  with value 2) and the key of "group:topicId:partition" .


When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize the its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code . This tells indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the  configuration. Otherwise, it returns the share-partition state to the share-partition leader which uses it to initialize.

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.



State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

Acknowledge 100-109


SPSO=110, SPEO=110

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1

Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1

Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1

Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Acknowledge 110 (consumer 1)


SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

Code Block
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []


the initial ShareGroupHeartbeat



a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ConsumerGroupMetadata record

to the consumer offsets topic to persist the group's existence.
for the share group, a ShareGroupMemberMetadata record for the member, and a ShareGroupPartitionMetadata record for the share-partitions which are just about to be initialized, onto the __consumer_offsets  topic

and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record to the __share_group_state  topic.
. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareCheckpoint record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes an updated ShareGroupPartitionMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partition is now able to be included in an assignment in the share group.

Enum constantDescription
ACCEPT  (01)The record was consumed successfully
RELEASE  (12)The record was not consumed successfully. Release it for another delivery attempt.
REJECT  (23)The record was not consumed successfully. Reject it and do not release it for another delivery attempt.


  • auto.offset.reset : this is handled by a dynamic group configuration 
  •  and : share groups do not support auto-commit
  • : this concept is not supported by share groups
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported
  • protocol.type : this configuration is used to select the group protocol used for KafkaConsumer

Kafka protocol changes

This KIP introduces the following new APIs:

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupOffsetsState  - for reading the offsets from the share-partition state from a share coordinator

The KIP also introduces version 5 of FindCoordinatorRequest to add the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2. There is no change to the schema.

Error Error codes

This KIP adds the following error codes the Kafka protocol.


Code Block
  "apiKey": NN,
  "type": "response",
  "name": "ShareGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - NOT_COORDINATOR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member instance ID." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rack ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The current member epoch." },
            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." }
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
        "about": "The assigned topic-partitions to the member." },
      { "name": "Error", "type": "int8", "versions": "0+",
        "about": "The assigned error." },
      { "name": "MetadataVersion", "type": "int32", "versions": "0+",
        "about": "The assignor metadata version." },
      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata bytes." }


For the AcknowledgementBatches of each topic-partition, the BaseOffsets  must be ascending order and the ranges must be non-overlapping.non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "Base offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }

Response schema

Code Block
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions
Code Block
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponseShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // Supported errors: - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_TOPICSERVER_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "ThrottleTimeMsTopicId", "type": "int32uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,

          "about": "The top level response error code error code, or 0 if there was no error." },
        { "name": "ResponsesCurrentLeader", "type": "[]ShareFetchableTopicResponseLeaderIdAndEpoch", "versions": "0+",
      "aboutfields": "The[
 response topics.", "fields": [
      { "name": "TopicIdLeaderId", "type": "uuidint32", "versions": "0+", "ignorable": true,
            "about": "The unique topic ID."ID of the current leader or -1 if the leader is unknown." },
          { "name": "PartitionsLeaderEpoch", "type": "[]PartitionDataint32", "versions": "0+",
            "about": "The latest topicknown leader partitionsepoch.", "fields": [ }
    { "name": "PartitionIndexNodeEndpoints", "type": "int32[]NodeEndpoint", "versions": "0+",
          "about": "The partition index." },
  Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "ErrorCodeNodeId", "type": "int16int32", "versions": "0+",
        "mapKey": true, "aboutentityType": "brokerId"The error code, or 0 if there was no fetch error"about": "The ID of the associated node." },
        { "name": "AcknowledgeErrorCodeHost", "type": "int16string", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge errornode's hostname." },
         { "name": "CurrentLeaderPort", "type": "LeaderIdAndEpochint32", "versions": "0+",
        "fieldsabout": [
    "The node's port." },
      { "name": "LeaderIdRack", "type": "int32string", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
        "about": "The IDrack of the current leadernode, or -1null if the leader is unknown it has not been assigned to a rack." },

AlterShareGroupOffsets API

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
  "apiKey": NN,
  "type":    { "request",
  "listeners": ["broker"],
  "name": "LeaderEpochAlterShareGroupOffsetsRequest", 
  "typevalidVersions": "int320", 
  "versionsflexibleVersions": "0+",
            "aboutfields": "The latest known leader epoch." }
         { "name": "RecordsGroupId", "type": "recordsstring", "versions": "0+", "nullableVersionsentityType": "0+groupId", 
      "about": "The recordgroup dataidentifier." },
        { "name": "AcquiredRecordsTopics", "type": "[]AcquiredRecordsAlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The acquired records topics to alter offsets for.",  "fields":  [
         { {"name": "BaseOffsetTopicName", "type":  "int64string", "versions": "0+", "aboutentityType": "topicName", "The earliest offset in this batch of acquired records.""mapKey": true,
        "about": "The topic name." },
         { {"name": "LastOffsetPartitions", "type": "int64[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "TheEach lastpartition offsetto ofalter this batch of acquired recordsoffsets for."},
 "fields": [
        { "name": "DeliveryCountPartitionIndex", "type": "int16int32", "versions": "0+",
 "about": "The delivery count of this batch of acquired records."}
    "about": "The partition index." },
        { "name": "NodeEndpointsStartOffset", "type": "[]NodeEndpointint64", "versions": "0+",
                "about": "EndpointsThe forshare-partition allstart current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [offset." }

Response schema

Code Block
  "nameapiKey": "NodeId"NN,
  "type": "int32response",
  "versionsname": "0+AlterShareGroupOffsetsResponse",
        "mapKeyvalidVersions": true"0",
  "entityTypeflexibleVersions": "brokerId0+", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
  // Supported errors:
  // - NOT_COORDINATOR (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "RackThrottleTimeMs", "type": "stringint32", "versions": "0+", "nullableVersionsignorable": "0+", "default": "null",true,
        "about": "The rackduration ofin themilliseconds node,for orwhich nullthe ifrequest itwas has not been assigned to a rack." }

Response schema

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The share-partition leader handles this API.


ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ThrottleTimeMsReadShareGroupStateRequest",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+",
  "ignorablefields": true,[
     { "aboutname": "GroupId"The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota, "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "ResponsesTopics", "type": "[]DeleteShareGroupOffsetsResponseTopicReadStateData", "versions": "0+",
      "about": "The resultsdata for eachthe topictopics.", "fields": [
      { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+", "entityType": "topicName",
        "about": "The topic nameidentifier." },
      { "name": "TopicIdPartitions", "type": "uuid[]PartitionData", "versions": "0+", "ignorable": true,
        "about":  "The data uniquefor topicthe IDpartitions.", "fields": },[
        { "name": "PartitionsPartition", "type": "[]DeleteShareGroupOffsetsResponsePartitionint32", "versions": "0+",
    "fields      "about": [
  "The partition index." }

Response schema

Code Block
  "apiKey": NN,
  "type": "response",
  "name": "PartitionIndexReadShareGroupStateResponse",
  "validVersions": "0",
  "type": "int32", "versions": "0+",
          "about": "The partition index." },
        flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCodeResults", "type": "int16[]ReadStateResult", "versions": "0+",
          "about": "The errorread code, or 0 if there was no error." },
 results", "fields": [
       { "name": "ErrorMessageTopicId", "type": "stringuuid", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The share-partition leader handles this API.

            "about": "The results for each topic.", "fields": [delivery count." }

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
  "nameapiKey": "TopicName"NN,
  "type": "stringrequest",
  "versionslisteners": ["0+broker"],
  "entityTypename": "topicNameWriteShareGroupStateRequest",
     "validVersions": "0",
   "aboutflexibleVersions": "0+"The,
 topic name."fields": },[
      { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+", "ignorable": true,
        "about": "The uniquegroup topic IDidentifier." },
      { "name": "PartitionsTopics", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versionsWriteStateData", "versions": "0+",
      "about": "0+The data for the topics.", "fields": [
        { "name": "PartitionIndexTopicId", "type": "int32uuid", "versions": "0+",
          "about": "The partitiontopic indexidentifier." },
        { "name": "StartOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+",
          "about":  "The data share-partitionfor startthe offsetpartitions."}, "fields": [
        { "name": "ErrorCodePartition", "type": "int16int32", "versions": "0+",
          "about": "The error code, or 0 if there was no errorpartition index." },
        { "name": "ErrorMessageStateEpoch", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The errorstate message,epoch orfor null if there was no errorthis share-partition." },


The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

ReadShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

DeleteShareGroupState API

ReadShareGroupOffsetsState API

This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ConsumerGroupMetadata record is written. When the group is deleted, a tombstone record is written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.


Code Block
  "apiKeytype": NN"data",
  "typename": "requestConsumerGroupMetadataKey",
  "listenersvalidVersions": ["broker3"],
  "nameflexibleVersions": "ReadShareGroupOffsetsStateRequestnone",
  "validVersionsfields": [
    { "name": "0GroupId",
  "flexibleVersionstype": "0+string",
  "fieldsversions": ["3",
       { "nameabout": "GroupId", The group id." }


A new version of the record value is introduced contains the Type  field. For a share group, the type will be "share" . For a consumer group, the type is the default value of 0. The values are from the ordinal values of the enumeration org.apache.kafka.coordinator.Group.GroupType .

Code Block
  "type": "data",
  "name": "ConsumerGroupMetadataValue",
  "validVersions": "0-1",
  "flexibleVersions""type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "aboutfields": "The[
 topic identifier." },
    { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
      "about": "The partitiongroup indexepoch." },

Response schema

Code Block
  "apiKey": NN,
  "type // Version 1 adds Type field
    { "name": "responseType",
  "nametype": "ReadShareGroupOffsetsStateResponseint8",
  "validVersionsversions": "01+",
  "flexibleVersionsdefault": "0+",
  // - NOT_COORDINATOR  "about": "The group type - 0:consumer, 1:classic, 3:share." }


Code Block
  "type": "data",
  "name": "ShareGroupMemberMetadataKey",
  "validVersions": "10",
  "flexibleVersions": "none",
  "fields": [(version 0+)  
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StateEpochGroupId", "type": "int32string", "versions": "0+10",
      "about": "The state epoch for this share-partitiongroup id." },
    { "name": "StartOffsetMemberId", "type": "int64string", "versions": "0+10",
      "about": "The share-partition start offsetmember id." }


This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ConsumerGroupMetadata record is written. When the group is deleted, a tombstone record is written.

  "type": "data",
  "name": "ConsumerGroupMetadataKeyShareGroupMemberMetadataValue",
  "validVersions": "30",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "RackId", "GroupId"versions": "0+", "typenullableVersions": "string0+", "versionstype": "3string",
      "about": "The (optional) grouprack id." },
    {  ]


A new version of the record value is introduced contains the Type  field. For a share group, the type will be "share" . For a consumer group, the type can be omitted (null) or "consumer" .

Code Block
  "type": "data",
 "name": "ClientId", "versions": "0+", "type": "string",
      "about": "The client id." },
    { "name": "ConsumerGroupMetadataValueClientHost",
  "validVersionsversions": "0-1+",
  "flexibleVersionstype": "0+string",
      "fieldsabout": ["The client host." },
    { "name": "EpochSubscribedTopicNames", "typeversions": "int320+", "versionstype": "0+[]string",
      "about": "The group epoch list of subscribed topic names." },
    // Version 1 adds Type field
    { "name": "TypeRebalanceTimeoutMs", "type": "stringint32", "versions": "10+", "nullableVersionsdefault": "-1+",
      "about": "The group type - null indicates consumer group.rebalance timeout" }


Code Block
  "type": "data",
  "name": "ShareGroupPartitionMetadataKey",
  "validVersions": "9",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "39",
      "about": "The group id." }


Code Block
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "90",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32" },
    { "name": "TopicsInitializedTopics", "versions": "0+", "type": "[]TopicMetadata" },
    { "name": "InitializingTopics", "versions": "0+", "type": "[]TopicIndexMetadata" },
    { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicMetadata" }
  "commonStructs": [
    { "name": "TopicMetadata", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    { "name": "TopicIndexMetadata", "versions": "0+", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid" },
      { "name": "StartPartitionIndex", "versions": "0+", "type": "int32" },
      { "name": "EndPartitionIndex", "versions": "0+", "type": "int32" }

The InitializingTopics  field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When the share coordinator successfully responds to InitializeShareGroupState  , the topic-partitions are moved into the Topics InitializedTopics  field.

In a similar way, the DeletingTopics  field is used as the first stage of a two-stage process to delete the persistent state for a set of share-partitions.
