Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Arrays in state RPCs, persister clarifications, review comments

...

For a share group, the group coordinator persists two three kinds of records:

  • ConsumerGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members. However, the members of the groups and their assignments are not persisted . This means that existing members will have to rejoin the share group following a group coordinator failoverand will be recalculated by the new group coordinator.

The SimpleShareAssignor

This KIP introduces org.apache.kafka.coordinator.group.assignor.SimpleShareAssignor . It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records.

...

When a batch of records is first read from the log and added to the in-flight records for a share-partition, the broker does not know whether the set of records between the batch’s base offset and the last offset contains any gaps, as might occur for example as a result of log compaction. When the broker does not know which offsets correspond to records, the batch is considered an unmaterialized record batch. Rather than forcing the broker to iterate through all of the records in all cases, which might require decompressing every batch, the broker can send sends unmaterialized record batches to consumers. It initially assumes that all offsets between the base offset and the last offset correspond to records. When the consumer processes the batch, it may find gaps and it reports these using the ShareFetch or ShareAcknowledge API. This means that the presence of unmaterialized record batches containing gaps might temporarily inflate the number of in-flight records, but this will be resolved by the consumer acknowledgements.

The share-partition leader clearly has to look within the data returned from the replica manager in order to understand the record batches it fetches. This means that records retrieved using a share group are not able to benefit from the zero-copy optimisation.

By iterating over the record batches but not iterating over the individual records within, the share-partition leader is able to understand the log without having to decompress the records. There is one exception to this and that is to do with reading the transaction end markers as described in the next section.

...

The component which manages durable share-partition state is called the share-group state persister. Eventually, this could be a pluggable component which implements a new interface org.apache.kafka.server.group.share.Persister . For now, it is not pluggable, and Kafka includes a built-in default implementation org.apache.kafka.server.group.share.DefaultStatePersister . This section describes the behavior of this built-in persister.

When records are acknowledged using the ShareFetch  or ShareAcknowledge  RPCs, durable state will need to be written by the persister. The share-partition leader serving these requests will wait for the persister to complete writing the durable state before responding.

The share coordinator and the share-group state topic

...

The share-partition leader and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState , ReadShareGroupState , WriteShareGroupState , and DeleteShareGroupState . The share-partition leader uses the FindCoordinator RPC to find its share coordinator, with a new key_type ( FindCoordinatorRequest.CoordinatorType.SHARE  with value 2) and the key of "group:topicId:partition" .

...

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize the its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code . This tells indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset  configuration. Otherwise, it returns the share-partition state to the share-partition leader which uses it to initialize.

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

...

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.


Acknowledge 119 (consumer 3Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


...

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of

a

the initial ShareGroupHeartbeat

 

request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ConsumerGroupMetadata record

to the consumer offsets topic to persist the group's existence.
Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to

for the share group, a ShareGroupMemberMetadata record for the member, and a ShareGroupPartitionMetadata record for the share-partitions which are just about to be initialized, onto the __consumer_offsets  topic

and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record to the __share_group_state  topic.
List share groupsGroup coordinatorDescribe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.

. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareCheckpoint record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes an updated ShareGroupPartitionMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partition is now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record with the new state epoch to the __share_group_state  topic .and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record with the new state epoch to the __share_group_state  topic.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsets.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ConsumerGroupMetadata record to the __consumer_offsets  topic.

...

Enum constantDescription
ACCEPT  (01)The record was consumed successfully
RELEASE  (12)The record was not consumed successfully. Release it for another delivery attempt.
REJECT  (23)The record was not consumed successfully. Reject it and do not release it for another delivery attempt.

...

  • auto.offset.reset : this is handled by a dynamic group configuration group.share.auto.offset.reset 
  • enable.auto.commit  and auto.commit.interval.ms : share groups do not support auto-commit
  • group.instance.id : this concept is not supported by share groups
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported
  • protocol.type : this configuration is used to select the group protocol used for KafkaConsumer

Kafka protocol changes

This KIP introduces the following new APIs:

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupOffsetsState  - for reading the offsets from the share-partition state from a share coordinator

The KIP also introduces version 5 of FindCoordinatorRequest to add the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2. There is no change to the schema.

Error Error codes

This KIP adds the following error codes the Kafka protocol.

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member instance ID." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rack ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The current member epoch." },
            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." }
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
        "about": "The assigned topic-partitions to the member." },
      { "name": "Error", "type": "int8", "versions": "0+",
        "about": "The assigned error." },
      { "name": "MetadataVersion", "type": "int32", "versions": "0+",
        "about": "The assignor metadata version." },
      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata bytes." }
    ]}
  ]
}

...

For the AcknowledgementBatches of each topic-partition, the BaseOffsets  must be ascending order and the ranges must be non-overlapping.non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "Base offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions
Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fieldsabout": ["The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "GroupIdNodeEndpoints", "type": "string[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "nullableVersions"fields": [
      { "name": "0+NodeId", "defaulttype": "nullint32", "entityTypeversions": "groupId0+",
        "mapKey": true, "entityType": "brokerId", "about": "The group identifierThe ID of the associated node." },
      { "name": "MemberIdHost", "type": "string", "versions": "0+",
 "nullableVersions": "0+",
      "about": "The membernode's IDhostname." },
      { "name": "ShareSessionEpochPort", "type": "int32", "versions": "0+",
        "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requestsnode's port." },
        { "name": "MaxWaitMsRack", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The maximumrack timeof inthe millisecondsnode, toor waitnull forif theit has not been assigned to a rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the BaseOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN,
  response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32request",
 "versions": "0+", "defaultlisteners": ["0x7fffffffbroker", "ignorable": true,
   ],
   "aboutname": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "nameShareAcknowledgeRequest",
  "validVersions": "Topics0",
 "type": "[]FetchTopic", "versions"flexibleVersions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+", "ignorablenullableVersions": true, "about": "The unique topic ID."},
      { "name0+", "default": "Partitionsnull", "typeentityType": "[]FetchPartitiongroupId", "versions": "0+",
        "about": "The partitions to fetch.",   "fieldsabout": [
"The group identifier." },
       { "name": "PartitionIndexMemberId", "type": "int32string", "versions": "0+",
    "nullableVersions": "0+",
      "about": "The partitionmember indexID." },
        { "name": "PartitionMaxBytesShareSessionEpoch", "type": "int32", "versions": "0+",
          "about": "The maximumcurrent bytesshare tosession fetch from this partition. epoch: 0 whento onlyopen acknowledgementa withshare no fetching is required. See KIP-74 for cases where this limit may not be honoredsession; -1 to close it; otherwise increments for consecutive requests." },
           { "name": "AcknowledgementBatchesTopics", "type": "[]AcknowledgementBatchAcknowledgeTopic", "versions": "0+",
          "about": "Record batchesThe topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "BaseOffsetPartitions", "type": "int64[]AcknowledgePartition", "versions": "0+",
            "about": "BaseThe offset of batch ofpartitions containing records to acknowledge."},
 "fields": [
        { "name": "LastOffsetPartitionIndex", "type": "int64int32", "versions": "0+",
            "about": "LastThe offset (inclusive) of batch of records to acknowledgepartition index." },
          { "name": "GapOffsetsAcknowledgementBatches", "type": "[]int64AcknowledgementBatch", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},Record batches to acknowledge.", "fields": [
          { "name": "AcknowledgeTypeBaseOffset", "type": "int8int64", "versions": "0+", "default": "0",
            "about": "The typeBase offset of batch of acknowledgementrecords - 0:Accept,1:Release,2:Rejectto acknowledge."},
        ]}
    ]},
    { "name": "ForgottenTopicsDataLastOffset", "type": "[]ForgottenTopicint64", "versions": "0+", "ignorable": false,

            "about": "The partitions to remove from this share session.", "fields": [
Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "TopicIdAcknowledgeTypes", "type": "uuid[]int8", "versions": "0+",
 "ignorable": true, "about": "The unique topic ID."},
      { "nameabout": "Partitions", "type": "[]int32", "versions": "0+",Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        "about": "The partitions indexes to forget." ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponseShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // Supported errors: - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - GROUPTOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_AUTHORIZATIONOR_FAILEDPARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITIONID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - NOTKAFKA_LEADER_OR_FOLLOWERSTORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_TOPICSERVER_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "ThrottleTimeMsTopicId", "type": "int32uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,

          "about": "The top level response error code error code, or 0 if there was no error." },
        { "name": "ResponsesCurrentLeader", "type": "[]ShareFetchableTopicResponseLeaderIdAndEpoch", "versions": "0+",
      "aboutfields": "The[
 response topics.", "fields": [
      { "name": "TopicIdLeaderId", "type": "uuidint32", "versions": "0+", "ignorable": true,
            "about": "The unique topic ID."ID of the current leader or -1 if the leader is unknown." },
          { "name": "PartitionsLeaderEpoch", "type": "[]PartitionDataint32", "versions": "0+",
            "about": "The latest topicknown leader partitionsepoch.", "fields": [ }
        ]}
      ]}
    ]},
    { "name": "PartitionIndexNodeEndpoints", "type": "int32[]NodeEndpoint", "versions": "0+",
          "about": "The partition index." },
  Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "ErrorCodeNodeId", "type": "int16int32", "versions": "0+",
        "mapKey": true, "aboutentityType": "brokerId"The error code, or 0 if there was no fetch error"about": "The ID of the associated node." },
        { "name": "AcknowledgeErrorCodeHost", "type": "int16string", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge errornode's hostname." },
         { "name": "CurrentLeaderPort", "type": "LeaderIdAndEpochint32", "versions": "0+",
        "fieldsabout": [
    "The node's port." },
      { "name": "LeaderIdRack", "type": "int32string", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
        "about": "The IDrack of the current leadernode, or -1null if the leader is unknown it has not been assigned to a rack." },
    ]}
  ]
}

AlterShareGroupOffsets API

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type":    { "request",
  "listeners": ["broker"],
  "name": "LeaderEpochAlterShareGroupOffsetsRequest", 
  "typevalidVersions": "int320", 
  "versionsflexibleVersions": "0+",
            "aboutfields": "The latest known leader epoch." }
        ]},
[
         { "name": "RecordsGroupId", "type": "recordsstring", "versions": "0+", "nullableVersionsentityType": "0+groupId", 
      "about": "The recordgroup dataidentifier." },
        { "name": "AcquiredRecordsTopics", "type": "[]AcquiredRecordsAlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The acquired records topics to alter offsets for.",  "fields":  [
         { {"name": "BaseOffsetTopicName", "type":  "int64string", "versions": "0+", "aboutentityType": "topicName", "The earliest offset in this batch of acquired records.""mapKey": true,
        "about": "The topic name." },
         { {"name": "LastOffsetPartitions", "type": "int64[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "TheEach lastpartition offsetto ofalter this batch of acquired recordsoffsets for."},
 "fields": [
        { "name": "DeliveryCountPartitionIndex", "type": "int16int32", "versions": "0+",
 "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    "about": "The partition index." },
        { "name": "NodeEndpointsStartOffset", "type": "[]NodeEndpointint64", "versions": "0+",
                "about": "EndpointsThe forshare-partition allstart current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [offset." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "nameapiKey": "NodeId"NN,
  "type": "int32response",
  "versionsname": "0+AlterShareGroupOffsetsResponse",
        "mapKeyvalidVersions": true"0",
  "entityTypeflexibleVersions": "brokerId0+", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
  
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "RackThrottleTimeMs", "type": "stringint32", "versions": "0+", "nullableVersionsignorable": "0+", "default": "null",true,
        "about": "The rackduration ofin themilliseconds node,for orwhich nullthe ifrequest itwas has not been assigned to a rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the BaseOffsets  must be ascending order and the ranges must be non-overlapping.

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "GroupIdResponses", "type": "string[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
  "nullableVersions": "0+", "default": "null", "entityTypeabout": "groupId",
The results for each topic.",  "aboutfields": "The[
  group identifier." },
    { "name": "MemberIdTopicName", "type": "string", "versions": "0+", "nullableVersionsentityType": "0+topicName",
        "about": "The membertopic IDname." },
      { "name": "ShareSessionEpochTopicId", "type": "int32uuid", "versions": "0+", "ignorable": true,
        "about": "The currentunique share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requeststopic ID." },
        { "name": "TopicsPartitions", "type": "[]AcknowledgeTopicAlterShareGroupOffsetsResponsePartition", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
        { "name": "TopicIdPartitionIndex", "type": "uuidint32", "versions": "0+",
          "about": "The uniquepartition topic IDindex." },
        { "name": "PartitionsErrorCode", "type": "[]AcknowledgePartitionint16", "versions": "0+",
          "about": "The partitions containing records to acknowledge.", "fields": [ error code, or 0 if there was no error." },
        { "name": "PartitionIndexErrorMessage", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The partition index error message, or null if there was no error." },
      ]}
    ]}
  ]
}

DeleteShareGroupOffsets API

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
{
  "apiKey": NN,
  "nametype": "AcknowledgementBatchesrequest",
  "listeners": "type["broker"],
  "name": "[]AcknowledgementBatchDeleteShareGroupOffsetsRequest", 
  "versionsvalidVersions": "0+",
          "about  "flexibleVersions": "Record batches to acknowledge.0+", 
  "fields": [
          { "name": "BaseOffsetGroupId", "type": "int64string", "versions": "0+",
            "entityType": "groupId",
      "about": "Base offset of batch of records to acknowledge."The group identifier." },
          { "name": "LastOffsetTopics", "type": "int64[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
            "about": "LastThe offsettopics (inclusive)to ofdelete batch of records to acknowledgeoffsets for."},
    "fields": [
      { "name": "GapOffsetsTopicName", "type": "[]int64string", "versions": "0+",
    "entityType": "topicName",
        "about": "ArrayThe of offsets in this range which do not correspond to records."topic name." },
          { "name": "AcknowledgeTypePartitions", "type": "int8[]int32", "versions": "0+", "default": "0",
            "about": "The type of acknowledgement - 0:Accept,1:Release,2:Rejectpartitions."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponseDeleteShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPICNOT_AUTHORIZATION_FAILEDCOORDINATOR (version 0+)
  // - UNKNOWNCOORDINATOR_TOPICNOT_OR_PARTITIONAVAILABLE (version 0+)
  // - SHARECOORDINATOR_SESSIONLOAD_NOTIN_FOUNDPROGRESS (version 0+)
  // - INVALIDGROUP_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWERID_NOT_FOUND (version 0+)
  // - UNKNOWNGROUP_TOPICNOT_IDEMPTY (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
   "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCodeResponses", "type": "int16", "versions": "0+[]DeleteShareGroupOffsetsResponseTopic", "ignorableversions": true"0+",
      "about": "The topresults levelfor response error code." },
each topic.", "fields": [
      { "name": "ResponsesTopicName", "type": "[]ShareAcknowledgeTopicResponsestring", "versions": "0+", "entityType": "topicName",
        "about": "The responsetopic topicsname.", "fields": [ },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]PartitionDataDeleteShareGroupOffsetsResponsePartition", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "CurrentLeaderErrorMessage", "type": "LeaderIdAndEpochstring", "versions": "0+", "fields": [
          { "name"nullableVersions": "LeaderId0+", "typeignorable": "int32"true, "versionsdefault": "0+null",
            "about": "The IDerror of the current leader message, or -1null if thethere leaderwas isno unknownerror." },
      ]}
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "NodeIdGroupId", "type": "int32string", "versions": "0+",
        "mapKey": true, "entityType": "brokerIdgroupId", 
      "about": "The ID of the associated nodegroup identifier." },
      { "name": "HostTopics", "type": "string[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
        "about": "The node's hostnametopics to describe offsets for." },,  "fields": [
      { "name": "PortTopicName", "type": "int32string", "versions": "0+", "entityType": "topicName",
        "about": "The node'stopic portname." },
      { "name": "RackPartitions", "type": "string[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." "about": "The partitions." }
      ]}
    ]}
   ]
}

AlterShareGroupOffsets API

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The share-partition leader handles this API.

...

Response schema

Code Block
{
    "apiKey": NN,
    "type": "requestresponse",
    "listenersname": ["brokerDescribeShareGroupOffsetsResponse"],
    "namevalidVersions": "AlterShareGroupOffsetsRequest0",
    "validVersionsflexibleVersions": "0+",
  "flexibleVersions": "0+",
  
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "GroupIdThrottleTimeMs", "type": "stringint32", "versions": "0+", "entityTypeignorable": "groupId"true,
            "about": "The group identifier duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "TopicsResponses", "type": "[]AlterShareGroupOffsetsRequestTopicDescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The topicsresults tofor altereach offsets fortopic.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
 "mapKey       "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic nameID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartitionDescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "aboutname": "PartitionIndex"Each partition to alter offsets for.", "fields": [, "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionIndexStartOffset", "type": "int32int64", "versions": "0+",
          "about": "The share-partition start index." },
        offset."},
        { "name": "StartOffsetErrorCode", "type": "int64int16", "versions": "0+",
                    "about": "The share-partition start offset." }
      ]}
error code, or 0 if there was no error." },
      ]}
  ]
}

Response schema

Code Block
{
  "apiKeyname": NN"ErrorMessage",
  "type": "responsestring",
  "nameversions": "AlterShareGroupOffsetsResponse0+",
  "validVersionsnullableVersions": "0+",
  "ignorable": true, "flexibleVersionsdefault": "0+null",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED  "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

InitializeShareGroupStateAPI

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMsGroupId", "type": "int32string", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quotagroup identifier." },
    { "name": "ResponsesTopics", "type": "[]AlterShareGroupOffsetsResponseTopicInitializeStateData", "versions": "0+",
      "about": "The resultsdata for eachthe topictopics.", "fields": [
      { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+", "entityType": "topicName",
        "about": "The topic nameidentifier." },
      { "name": "TopicIdPartitions", "type": "uuid[]PartitionData", "versions": "0+", "ignorable": true,
        "about":  "The uniquedata topicfor ID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+the partitions.", "fields": [
        { "name": "PartitionIndexPartition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCodeStateEpoch", "type": "int16int32", "versions": "0+",
          "about": "The errorstate code,epoch orfor 0 if there was no errorthis share-partition." },
        { "name": "ErrorMessageStartOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The share-partition errorstart messageoffset, or null-1 if there was no error the start offset is not being initialized." }
      ]}
    ]}
  ]
}

DeleteShareGroupOffsets API

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The share-partition leader handles this API.

...

Response schema

Code Block
{
    "apiKey": NN,
    "type": "requestresponse",
  "listeners": ["broker"],
  "  "name": "DeleteShareGroupOffsetsRequestInitializeShareGroupStateResponse",
    "validVersions": "0",
    "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "TopicsResults", "type": "[]DeleteShareGroupOffsetsRequestTopicInitializeStateResult", "versions": "0+",
      "about": "The topics to delete offsets for.initialization results",  "fields": [
      { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+", "entityType": "topicName",
        "about": "The topic name.identifier" },
      { "name": "Partitions", "type": "[]int32PartitionResult", "versions": "0+",
        "about" : "The results for the partitions." }
 , "fields": [
     ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponsePartition",
  "validVersionstype": "0int32",
  "flexibleVersionsversions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ThrottleTimeMsReadShareGroupStateRequest",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+",
  "ignorablefields": true,[
     { "aboutname": "GroupId"The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota, "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "ResponsesTopics", "type": "[]DeleteShareGroupOffsetsResponseTopicReadStateData", "versions": "0+",
      "about": "The resultsdata for eachthe topictopics.", "fields": [
      { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+", "entityType": "topicName",
        "about": "The topic nameidentifier." },
      { "name": "TopicIdPartitions", "type": "uuid[]PartitionData", "versions": "0+", "ignorable": true,
        "about":  "The data uniquefor topicthe IDpartitions.", "fields": },[
        { "name": "PartitionsPartition", "type": "[]DeleteShareGroupOffsetsResponsePartitionint32", "versions": "0+",
    "fields      "about": [
  "The partition index." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "PartitionIndexReadShareGroupStateResponse",
  "validVersions": "0",
  "type": "int32", "versions": "0+",
          "about": "The partition index." },
        flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCodeResults", "type": "int16[]ReadStateResult", "versions": "0+",
          "about": "The errorread code, or 0 if there was no error." },
 results", "fields": [
       { "name": "ErrorMessageTopicId", "type": "stringuuid", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  ,
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "GroupIdPartition", "type": "stringint32", "versions": "0+", "entityType": "groupId",
      "
          "about": "The grouppartition identifierindex." },
        { "name": "TopicsErrorCode", "type": "[]DescribeShareGroupOffsetsRequestTopicint16", "versions": "0+",
          "about": "The topics to describe offsets for.",  "fields": [
 error code, or 0 if there was no error." },
        { "name": "TopicNameStateEpoch", "type": "stringint32", "versions": "0+",
 "entityType": "topicName",
        "about": "The topic name state epoch for this share-partition." },
        { "name": "PartitionsStartOffset", "type": "[]int32int64", "versions": "0+",
          "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
 share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "DescribeShareGroupOffsetsResponseStateBatches",
  "validVersionstype": "0[]StateBatch",
  "flexibleVersionsversions": "0+",
  // Supported errors: "fields":[
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
{ "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset of this state batch." },
          { "name": "ThrottleTimeMsLastOffset", "type": "int32int64", "versions": "0+",
            "ignorableabout": true "The last offset of this state batch." },
      "about    { "name": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quotaState", "type": "int8", "versions": "0+",
            "about": "The state - 0:Available,2:Acked,4:Archived." },
          { "name": "ResponsesDeliveryCount", "type": "[]DescribeShareGroupOffsetsResponseTopicint16", "versions": "0+",
            "about": "The results for each topic.", "fields": [delivery count." }
        ]}
      ]}
    ]}
  ]
}

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "nameapiKey": "TopicName"NN,
  "type": "stringrequest",
  "versionslisteners": ["0+broker"],
  "entityTypename": "topicNameWriteShareGroupStateRequest",
     "validVersions": "0",
   "aboutflexibleVersions": "0+"The,
 topic name."fields": },[
      { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+", "ignorable": true,
        "about": "The uniquegroup topic IDidentifier." },
      { "name": "PartitionsTopics", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versionsWriteStateData", "versions": "0+",
      "about": "0+The data for the topics.", "fields": [
        { "name": "PartitionIndexTopicId", "type": "int32uuid", "versions": "0+",
          "about": "The partitiontopic indexidentifier." },
        { "name": "StartOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+",
          "about":  "The data share-partitionfor startthe offsetpartitions."}, "fields": [
        { "name": "ErrorCodePartition", "type": "int16int32", "versions": "0+",
          "about": "The error code, or 0 if there was no errorpartition index." },
        { "name": "ErrorMessageStateEpoch", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The errorstate message,epoch orfor null if there was no errorthis share-partition." },
      ]}
    ]}
  ]
}

InitializeShareGroupStateAPI

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKeyname": NN"StartOffset",
  "type": "requestint64",
  "listenersversions": ["broker0+"],
    "name      "about": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "GroupIdStateBatches", "type": "string[]StateBatch", "versions": "0+",
 "fields": [
    "about": "The group identifier." },
    { "name": "TopicIdBaseOffset", "type": "uuidint64", "versions": "0+",
            "about": "The topic identifier base offset of this state batch." },
          { "name": "PartitionLastOffset", "type": "int32int64", "versions": "0+",
            "about": "The partition indexlast offset of this state batch." },
          { "name": "StateEpochState", "type": "int32int8", "versions": "0+",
            "about": "The state epoch for this share-partition.- 0:Available,2:Acked,4:Archived" },
              { "name": "StartOffsetDeliveryCount", "type": "int64int16", "versions": "0+",
            "about": "The share-partition start offset, or -1 if the start offset is not being initialized."  delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "InitializeShareGroupStateResponseWriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - COORDINATORGROUP_ID_NOT_AVAILABLEFOUND (version 0+)
  // - COORDINATORUNKNOWN_LOADTOPIC_INOR_PROGRESSPARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
   // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCodeResults", "type": "int16[]WriteStateResult", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    write results", "fields": [
      { "name": "GroupIdTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The grouptopic identifier." },
        { "name": "TopicIdPartitions", "type": "uuid[]PartitionResult", "versions": "0+",
        "about" : "The results for topicthe identifierpartitions." },
   , "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
      "about": "The partition index." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "typeabout": "response"The partition index." },
        { "name": "ReadShareGroupStateResponseErrorCode",
  "validVersionstype": "0int16",
  "flexibleVersionsversions": "0+",
   // - NOT_COORDINATOR (version 0+)  
 "about": "The error code, or 0 if there was no error." }
      ]}
    ]}
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [ // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
   "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StateEpochGroupId", "type": "int32string", "versions": "0+",
      "about": "The state epoch for this share-partitiongroup identifier." },
    { "name": "StartOffsetTopics", "type": "int64[]DeleteStateData", "versions": "0+",
      "about": "The data share-partitionfor startthe offsettopics." },, "fields": [
      { "name": "StateBatchesTopicId", "type": "[]StateBatchuuid", "versions": "0+",
        "fieldsabout":[ "The topic identifier." },
      { "name": "BaseOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+",
        "about":  "The basedata offsetfor of this state batch." },the partitions.", "fields": [
        { "name": "LastOffsetPartition", "type": "int64int32", "versions": "0+",
          "about": "The last offset of this state batch." },
      { partition index." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "StateDeleteShareGroupStateResponse",
  "typevalidVersions": "int80",
  "versionsflexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  "about": "The state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0+",
        "about": "The delivery count." }
    ]}
  ]
}

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "fields": [
      { "name": "GroupIdTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The grouptopic identifier." },
      { "name": "TopicIdPartitions", "type": "uuid[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
 "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpochErrorCode", "type": "int32int16", "versions": "0+",
          "about": "The state epoch for this share-partition error code, or 0 if there was no error." },
      ]}
    ]}
  ]
}

ReadShareGroupOffsetsState API

The ReadShareGroupOffsetsState API is used by the group coordinator to read the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
    { "name": "StartOffset", "type": "int64request",
  "versionslisteners": ["0+broker"],
    "name": "ReadShareGroupOffsetsStateRequest",
  "aboutvalidVersions": "0"The,
 share-partition start offset, or -1 if the start offset is not being written." }, "flexibleVersions": "0+",
  "fields": [
    { "name": "StateBatchesGroupId", "type": "[]StateBatchstring", "versions": "0+",
      "fieldsabout":"The [
group identifier." },
    { "name": "BaseOffsetTopics", "type": "int64[]ReadOffsetsStateData", "versions": "0+",
        "about": "The basedata offsetfor of this state batch." },the topics.", "fields": [
      { "name": "LastOffsetTopicId", "type": "int64uuid", "versions": "0+",
        "about": "The last offset of this state batchtopic identifier." },
      { "name": "StatePartitions", "type": "int8[]PartitionData", "versions": "0+",
        "about":  "The data statefor - 0:Available,2:Acked,4:Archived" },
the partitions.", "fields": [
        { "name": "DeliveryCountPartition", "type": "int16int32", "versions": "0+",
          "about": "The deliverypartition countindex." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponseReadShareGroupOffsetsStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
 - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadOffsetsStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "GroupIdTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The grouptopic identifier." },
      { "name": "TopicIdPartitions", "type": "uuid[]PartitionResult", "versions": "0+",
        "about" : "The topic identifier." },results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
    ]
}

Response schema

Code Block
{
   "apiKey": NN,
 { "typename": "responseErrorCode",
  "nametype": "DeleteShareGroupStateResponseint16",
  "validVersionsversions": "0+",
          "flexibleVersionsabout": "0+",
The error //code, - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
or 0 if there was no error." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "ErrorCodeStartOffset", "type": "int16int64", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
  ]
}

ReadShareGroupOffsetsState API

The ReadShareGroupOffsetsState API is used by the group coordinator to read the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

 share-partition start offset." }
      ]}
    ]}
  ]
}

Records

This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ConsumerGroupMetadata record is written. When the group is deleted, a tombstone record is written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ConsumerGroupMetadataKey

This is included for completeness. There is no change to this record.

Code Block
{
  "apiKeytype": NN"data",
  "typename": "requestConsumerGroupMetadataKey",
  "listenersvalidVersions": ["broker3"],
  "nameflexibleVersions": "ReadShareGroupOffsetsStateRequestnone",
  "validVersionsfields": [
    { "name": "0GroupId",
  "flexibleVersionstype": "0+string",
  "fieldsversions": ["3",
       { "nameabout": "GroupId", The group id." }
  ]
}

ConsumerGroupMetadataValue

A new version of the record value is introduced contains the Type  field. For a share group, the type will be "share" . For a consumer group, the type is the default value of 0. The values are from the ordinal values of the enumeration org.apache.kafka.coordinator.Group.GroupType .

Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataValue",
  "validVersions": "0-1",
  "flexibleVersions""type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0+",
      "aboutfields": "The[
 topic identifier." },
    { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
      "about": "The partitiongroup indexepoch." },
   ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type // Version 1 adds Type field
    { "name": "responseType",
  "nametype": "ReadShareGroupOffsetsStateResponseint8",
  "validVersionsversions": "01+",
  "flexibleVersionsdefault": "0+",
  // - NOT_COORDINATOR  "about": "The group type - 0:consumer, 1:classic, 3:share." }
  ]
}

ShareGroupMemberMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataKey",
  "validVersions": "10",
  "flexibleVersions": "none",
  "fields": [(version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StateEpochGroupId", "type": "int32string", "versions": "0+10",
      "about": "The state epoch for this share-partitiongroup id." },
    { "name": "StartOffsetMemberId", "type": "int64string", "versions": "0+10",
      "about": "The share-partition start offsetmember id." }
  ]
}

Records

This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ConsumerGroupMetadata record is written. When the group is deleted, a tombstone record is written.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ConsumerGroupMetadataKey

...

ShareGroupMemberMetadataValue

Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataKeyShareGroupMemberMetadataValue",
  "validVersions": "30",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "RackId", "GroupId"versions": "0+", "typenullableVersions": "string0+", "versionstype": "3string",
      "about": "The (optional) grouprack id." },
    {  ]
}

ConsumerGroupMetadataValue

A new version of the record value is introduced contains the Type  field. For a share group, the type will be "share" . For a consumer group, the type can be omitted (null) or "consumer" .

Code Block
{
  "type": "data",
 "name": "ClientId", "versions": "0+", "type": "string",
      "about": "The client id." },
    { "name": "ConsumerGroupMetadataValueClientHost",
  "validVersionsversions": "0-1+",
  "flexibleVersionstype": "0+string",
      "fieldsabout": ["The client host." },
    { "name": "EpochSubscribedTopicNames", "typeversions": "int320+", "versionstype": "0+[]string",
      "about": "The group epoch list of subscribed topic names." },
    // Version 1 adds Type field
    { "name": "TypeRebalanceTimeoutMs", "type": "stringint32", "versions": "10+", "nullableVersionsdefault": "-1+",
      "about": "The group type - null indicates consumer group.rebalance timeout" }
  ]
}

ShareGroupPartitionMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataKey",
  "validVersions": "9",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "39",
      "about": "The group id." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "90",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32" },
    { "name": "TopicsInitializedTopics", "versions": "0+", "type": "[]TopicMetadata" },
    { "name": "InitializingTopics", "versions": "0+", "type": "[]TopicIndexMetadata" },
    { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicMetadata" }
  ],
  "commonStructs": [
    { "name": "TopicMetadata", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]},
    { "name": "TopicIndexMetadata", "versions": "0+", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid" },
      { "name": "StartPartitionIndex", "versions": "0+", "type": "int32" },
      { "name": "EndPartitionIndex", "versions": "0+", "type": "int32" }
    ]}
  ]
}

The InitializingTopics  field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When the share coordinator successfully responds to InitializeShareGroupState  , the topic-partitions are moved into the Topics InitializedTopics  field.

In a similar way, the DeletingTopics  field is used as the first stage of a two-stage process to delete the persistent state for a set of share-partitions.

...